Commit 1efecf30 authored by 唐香港's avatar 唐香港

Update alarm_flink_job.py get job name

parent 42434c82
......@@ -106,16 +106,42 @@ def get_jobs(cmd):
raise Exception(msg)
return jobs_dict
def judge_job_status(job):
def get_name(cmd):
logger.info('exec cmd:' + cmd)
config_dict = {}
timeout = 10
try:
(stdout, stderr) = run_command(cmd, timeout)
stdout = stdout.decode()
stderr = stderr.decode()
logger.info(stderr)
logger.info(stdout)
config_dict = json.loads(stdout)
if 'errors' in config_dict:
msg = '\n- **errors**: ' + config_dict['errors'][0]
logger.error(msg)
raise Exception(msg)
except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
logger.error(msg)
raise Exception(msg)
return config_dict['name']
def judge_job_status(job, cmd):
if 'errors' in job:
msg = "\n- **errors**: %s" % (job['errors'])
logger.error(msg)
return (False, msg)
job_id = job['id']
job_status = job['status']
job_name = get_name(cmd + job_id + "/config")
if 'RUNNING' != job['status']:
msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status'])
msg = "\n- **job_id**: %s, **job_name**: %s, **status**: %s" % (job_id,job_name,job_status)
logger.error(msg)
return (False, msg)
msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status'])
msg = "\n- **job_id**: %s, **job_name**: %s, **status**: %s" % (job_id,job_name,job_status)
logger.info(msg)
return (True, msg)
......@@ -135,7 +161,7 @@ def python_callable(**kwargs):
running_job = 0
for job in jobs_list:
(isrunning,msg) = judge_job_status(job)
(isrunning,msg) = judge_job_status(job, curl + flink_jobmanager_url)
if not isrunning:
error_msg += msg
else :
......@@ -156,4 +182,4 @@ for service in SERVICES:
)
if __name__ == '__main__':
dag.cli()
\ No newline at end of file
dag.cli()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment