Commit fe7de63b authored by 唐香港's avatar 唐香港

Update alarm_kafka_connect.py

parent e7bd5f7b
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
from airflow import DAG from airflow import DAG
from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta from datetime import datetime, timedelta
import os, json, logging, time, signal, platform, subprocess import os, json, time, signal, platform, subprocess
import airflow import airflow
#variable parameter #variable parameter
...@@ -69,12 +69,12 @@ def failure_callback(context): ...@@ -69,12 +69,12 @@ def failure_callback(context):
\n> **For more details, please check the airflow task log.** \n> **For more details, please check the airflow task log.**
''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception']) ''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception'])
logging.error('message : \n' + message) print('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING) cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd) os.system(cmd)
def get_kafka_connectors(cmd): def get_kafka_connectors(cmd):
logging.info('exec cmd:' + cmd) print('exec cmd:' + cmd)
connectors_list = [] connectors_list = []
timeout = 10 timeout = 10
...@@ -82,8 +82,8 @@ def get_kafka_connectors(cmd): ...@@ -82,8 +82,8 @@ def get_kafka_connectors(cmd):
(stdout, stderr) = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
stdout = stdout.decode() stdout = stdout.decode()
stderr = stderr.decode() stderr = stderr.decode()
logging.info(stderr) print(stderr)
logging.info(stdout) print(stdout)
if 'Could not resolve host' in stderr and not stdout: if 'Could not resolve host' in stderr and not stdout:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd) msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
...@@ -94,17 +94,17 @@ def get_kafka_connectors(cmd): ...@@ -94,17 +94,17 @@ def get_kafka_connectors(cmd):
connectors_list = json.loads(stdout) connectors_list = json.loads(stdout)
if 'error_code' in connectors_list: if 'error_code' in connectors_list:
msg = '\n- **error_code**: ' + connectors_list['error_code'] + '**error_msg**: ' + connectors_list['message'] msg = '\n- **error_code**: ' + connectors_list['error_code'] + '**error_msg**: ' + connectors_list['message']
logging.error(msg) print(msg)
raise Exception(msg) raise Exception(msg)
except TimeoutError: except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i' % (cmd, timeout) msg = '\n- **error_msg**: excute command=(%s) timeout after %i' % (cmd, timeout)
logging.error(msg) print(msg)
raise Exception(msg) raise Exception(msg)
return connectors_list return connectors_list
def get_connector_status(cmd): def get_connector_status(cmd):
logging.info('exec cmd: ' + cmd) print('exec cmd: ' + cmd)
timeout = 10 timeout = 10
outdict = {} outdict = {}
...@@ -112,17 +112,17 @@ def get_connector_status(cmd): ...@@ -112,17 +112,17 @@ def get_connector_status(cmd):
(stdout, stderr) = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
stdout = stdout.decode() stdout = stdout.decode()
stderr = stderr.decode() stderr = stderr.decode()
logging.info(stderr) print(stderr)
logging.info(stdout) print(stdout)
outdict = json.loads(stdout) outdict = json.loads(stdout)
except TimeoutError: except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout) msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
logging.error(msg) print(msg)
errdict = {'error_code':'600','message':'excute command=(%s) timeout after %i s' % (cmd, timeout)} errdict = {'error_code':'600','message':'excute command=(%s) timeout after %i s' % (cmd, timeout)}
return errdict return errdict
logging.info('get connector status : \n' + str(outdict)) print('get connector status : \n' + str(outdict))
return outdict return outdict
def judge_connector_status(connector_status_dict, connector): def judge_connector_status(connector_status_dict, connector):
...@@ -132,11 +132,11 @@ def judge_connector_status(connector_status_dict, connector): ...@@ -132,11 +132,11 @@ def judge_connector_status(connector_status_dict, connector):
msg = msg = "\n- **connector_name**: %s, **error_msg**: %s" % (connector, connector_status_dict['message']) msg = msg = "\n- **connector_name**: %s, **error_msg**: %s" % (connector, connector_status_dict['message'])
else: else:
msg = "\n- **connector_name**: %s, **error_code**: %s, **error_msg**: %s" % (connector, connector_status_dict['error_code'], connector_status_dict['message']) msg = "\n- **connector_name**: %s, **error_code**: %s, **error_msg**: %s" % (connector, connector_status_dict['error_code'], connector_status_dict['message'])
logging.error(msg) print(msg)
return (False, msg) return (False, msg)
if 'RUNNING' != connector_status_dict['connector']['state']: if 'RUNNING' != connector_status_dict['connector']['state']:
msg = "\n- **connector_name**: %s, **is_running**: false" % (connector) msg = "\n- **connector_name**: %s, **is_running**: false" % (connector)
logging.error(msg) print(msg)
return (False, msg) return (False, msg)
tasks_list = connector_status_dict['tasks'] tasks_list = connector_status_dict['tasks']
error_tasks = 0 error_tasks = 0
...@@ -147,19 +147,19 @@ def judge_connector_status(connector_status_dict, connector): ...@@ -147,19 +147,19 @@ def judge_connector_status(connector_status_dict, connector):
all_tasks = len(tasks_list) all_tasks = len(tasks_list)
running_tasks = all_tasks - error_tasks running_tasks = all_tasks - error_tasks
msg = "\n- **connector_name**: %s, **is_running**: true, **running_tasks/all_tasks**: %s/%s" % (connector, str(running_tasks), str(all_tasks) ) msg = "\n- **connector_name**: %s, **is_running**: true, **running_tasks/all_tasks**: %s/%s" % (connector, str(running_tasks), str(all_tasks) )
logging.error(msg) print(msg)
return (False, msg) return (False, msg)
return (True, str(connector_status_dict)) return (True, str(connector_status_dict))
def python_callable(**kwargs): def python_callable(**kwargs):
logging.info('start kafka connect status analyze .') print('start kafka connect status analyze .')
curl = 'curl ' curl = 'curl '
kafka_connect_url = 'http://' + kwargs['task_instance'].task_id + ':8083/connectors/' kafka_connect_url = 'http://' + kwargs['task_instance'].task_id + ':8083/connectors/'
status = '/status' status = '/status'
connectors_list = get_kafka_connectors(curl + kafka_connect_url) connectors_list = get_kafka_connectors(curl + kafka_connect_url)
logging.info('exec cmd: ' + curl + kafka_connect_url + ' success!') print('exec cmd: ' + curl + kafka_connect_url + ' success!')
error_msg = "" error_msg = ""
for connector in connectors_list: for connector in connectors_list:
...@@ -169,7 +169,7 @@ def python_callable(**kwargs): ...@@ -169,7 +169,7 @@ def python_callable(**kwargs):
error_msg += msg error_msg += msg
if error_msg: if error_msg:
raise Exception(error_msg) raise Exception(error_msg)
logging.info('kafka connect status ok!') print('kafka connect status ok!')
for service in SERVICES: for service in SERVICES:
task = PythonOperator( task = PythonOperator(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment