Commit 87fd5ddc authored by 唐香港's avatar 唐香港

Update alarm_kafka_connect.py

parent fe7de63b
......@@ -8,7 +8,7 @@
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os, json, time, signal, platform, subprocess
import os, json, logging, time, signal, platform, subprocess
import airflow
#variable parameter
......@@ -23,6 +23,9 @@ SERVICES = [
DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TimeoutError(Exception):
pass
......@@ -69,12 +72,12 @@ def failure_callback(context):
\n> **For more details, please check the airflow task log.**
''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception'])
print('message : \n' + message)
logger.info('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd)
def get_kafka_connectors(cmd):
print('exec cmd:' + cmd)
logger.info('exec cmd:' + cmd)
connectors_list = []
timeout = 10
......@@ -82,8 +85,8 @@ def get_kafka_connectors(cmd):
(stdout, stderr) = run_command(cmd, timeout)
stdout = stdout.decode()
stderr = stderr.decode()
print(stderr)
print(stdout)
logger.info(stderr)
logger.info(stdout)
if 'Could not resolve host' in stderr and not stdout:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
......@@ -94,17 +97,17 @@ def get_kafka_connectors(cmd):
connectors_list = json.loads(stdout)
if 'error_code' in connectors_list:
msg = '\n- **error_code**: ' + connectors_list['error_code'] + '**error_msg**: ' + connectors_list['message']
print(msg)
logger.error(msg)
raise Exception(msg)
except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i' % (cmd, timeout)
print(msg)
logger.info(msg)
raise Exception(msg)
return connectors_list
def get_connector_status(cmd):
print('exec cmd: ' + cmd)
logger.info('exec cmd: ' + cmd)
timeout = 10
outdict = {}
......@@ -112,17 +115,17 @@ def get_connector_status(cmd):
(stdout, stderr) = run_command(cmd, timeout)
stdout = stdout.decode()
stderr = stderr.decode()
print(stderr)
print(stdout)
logger.info(stderr)
logger.info(stdout)
outdict = json.loads(stdout)
except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
print(msg)
logger.error(msg)
errdict = {'error_code':'600','message':'excute command=(%s) timeout after %i s' % (cmd, timeout)}
return errdict
print('get connector status : \n' + str(outdict))
logger.info('get connector status : \n' + str(outdict))
return outdict
def judge_connector_status(connector_status_dict, connector):
......@@ -132,11 +135,11 @@ def judge_connector_status(connector_status_dict, connector):
msg = msg = "\n- **connector_name**: %s, **error_msg**: %s" % (connector, connector_status_dict['message'])
else:
msg = "\n- **connector_name**: %s, **error_code**: %s, **error_msg**: %s" % (connector, connector_status_dict['error_code'], connector_status_dict['message'])
print(msg)
logger.error(msg)
return (False, msg)
if 'RUNNING' != connector_status_dict['connector']['state']:
msg = "\n- **connector_name**: %s, **is_running**: false" % (connector)
print(msg)
logger.error(msg)
return (False, msg)
tasks_list = connector_status_dict['tasks']
error_tasks = 0
......@@ -147,19 +150,19 @@ def judge_connector_status(connector_status_dict, connector):
all_tasks = len(tasks_list)
running_tasks = all_tasks - error_tasks
msg = "\n- **connector_name**: %s, **is_running**: true, **running_tasks/all_tasks**: %s/%s" % (connector, str(running_tasks), str(all_tasks) )
print(msg)
logger.error(msg)
return (False, msg)
return (True, str(connector_status_dict))
def python_callable(**kwargs):
print('start kafka connect status analyze .')
logger.info('start kafka connect status analyze .')
curl = 'curl '
kafka_connect_url = 'http://' + kwargs['task_instance'].task_id + ':8083/connectors/'
status = '/status'
connectors_list = get_kafka_connectors(curl + kafka_connect_url)
print('exec cmd: ' + curl + kafka_connect_url + ' success!')
logger.info('exec cmd: ' + curl + kafka_connect_url + ' success!')
error_msg = ""
for connector in connectors_list:
......@@ -169,7 +172,7 @@ def python_callable(**kwargs):
error_msg += msg
if error_msg:
raise Exception(error_msg)
print('kafka connect status ok!')
logger.error('kafka connect status ok!')
for service in SERVICES:
task = PythonOperator(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment