Commit 5380ae20 authored by 唐香港's avatar 唐香港

Update alarm_flink_job.py

parent 3596f295
...@@ -24,6 +24,9 @@ SERVICES = [ ...@@ -24,6 +24,9 @@ SERVICES = [
DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd' DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh' DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TimeoutError(Exception): class TimeoutError(Exception):
pass pass
...@@ -70,12 +73,12 @@ def failure_callback(context): ...@@ -70,12 +73,12 @@ def failure_callback(context):
\n> **For more details, please check the airflow task log.** \n> **For more details, please check the airflow task log.**
''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception']) ''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception'])
logging.error('message : \n' + message) logger.error('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING) cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd) os.system(cmd)
def get_jobs(cmd): def get_jobs(cmd):
logging.info('exec cmd:' + cmd) logger.info('exec cmd:' + cmd)
jobs_dict = {} jobs_dict = {}
timeout = 10 timeout = 10
...@@ -83,8 +86,8 @@ def get_jobs(cmd): ...@@ -83,8 +86,8 @@ def get_jobs(cmd):
(stdout, stderr) = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
stdout = stdout.decode() stdout = stdout.decode()
stderr = stderr.decode() stderr = stderr.decode()
logging.info(stderr) logger.info(stderr)
logging.info(stdout) logger.info(stdout)
if 'Could not resolve host' in stderr and not stdout: if 'Could not resolve host' in stderr and not stdout:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd) msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
...@@ -95,36 +98,36 @@ def get_jobs(cmd): ...@@ -95,36 +98,36 @@ def get_jobs(cmd):
jobs_dict = json.loads(stdout) jobs_dict = json.loads(stdout)
if 'errors' in jobs_dict: if 'errors' in jobs_dict:
msg = '\n- **errors**: ' + jobs_dict['errors'][0] msg = '\n- **errors**: ' + jobs_dict['errors'][0]
logging.error(msg) logger.error(msg)
raise Exception(msg) raise Exception(msg)
except TimeoutError: except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout) msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
logging.error(msg) logger.error(msg)
raise Exception(msg) raise Exception(msg)
return jobs_dict return jobs_dict
def judge_job_status(job): def judge_job_status(job):
if 'errors' in job: if 'errors' in job:
msg = "\n- **errors**: %s" % (job['errors']) msg = "\n- **errors**: %s" % (job['errors'])
logging.error(msg) logger.error(msg)
return (False, msg) return (False, msg)
if 'RUNNING' != job['status']: if 'RUNNING' != job['status']:
msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status']) msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status'])
logging.error(msg) logger.error(msg)
return (False, msg) return (False, msg)
msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status']) msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status'])
logging.info(msg) logger.info(msg)
return (True, msg) return (True, msg)
def python_callable(**kwargs): def python_callable(**kwargs):
logging.info('start kafka connect status analyze .') logger.info('start kafka connect status analyze .')
curl = 'curl ' curl = 'curl '
flink_jobmanager_url = 'http://'+ kwargs['task_instance'].task_id +':8081/jobs/' flink_jobmanager_url = 'http://'+ kwargs['task_instance'].task_id +':8081/jobs/'
jobs_dict = get_jobs(curl + flink_jobmanager_url) jobs_dict = get_jobs(curl + flink_jobmanager_url)
logging.info('exec cmd: ' + curl + flink_jobmanager_url + ' success!') logger.info('exec cmd: ' + curl + flink_jobmanager_url + ' success!')
jobs_list = jobs_dict['jobs'] jobs_list = jobs_dict['jobs']
error_msg = "" error_msg = ""
...@@ -141,7 +144,7 @@ def python_callable(**kwargs): ...@@ -141,7 +144,7 @@ def python_callable(**kwargs):
msg = "\n- **running_job/all_job**: %s/%s" % (running_job, all_job) msg = "\n- **running_job/all_job**: %s/%s" % (running_job, all_job)
error_msg += msg error_msg += msg
raise Exception(error_msg) raise Exception(error_msg)
logging.info('Flink job status ok!') logger.info('Flink job status ok!')
for service in SERVICES: for service in SERVICES:
task = PythonOperator( task = PythonOperator(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment