Commit 0f9f05c8 authored by edz's avatar edz

add dags and script

parent 2759b7ca
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os,logging
#variable parameter
DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
default_args = {
'owner': 'tangxianggang',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=60),
'retries': 3,
'retry_delay': timedelta(seconds=5)
}
dag = DAG(
dag_id='airflow_heartbeat_detection',
default_args=default_args,
schedule_interval=timedelta(minutes=60)
)
def heartbeat_alarm(**kwargs):
logging.info('start heartbeat alarm')
title = 'airflow_heartbeat_detection'
msg = '\n- I am airflow, I will notify you once every two hours.\
If I have not notified after one hours, I have already hanged up.SOS...'
message = '''I am airflow, I am still alive.!!!
\n#### DAG_ID: %s
\n#### TASKID: %s
\n#### CONTENT:
%s
\n> **For more details, please check the airflow task log.**
''' % (kwargs['task_instance'].dag_id, kwargs['task_instance'].task_id, msg)
logging.info('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd)
task = PythonOperator(
task_id='heartbeat_alarm',
provide_context = True,
python_callable=heartbeat_alarm,
dag=dag,
)
if __name__ == '__main__':
dag.cli()
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os,logging
default_args = {
'owner': 'tangxianggang',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(days=1),
'retries': 2,
'retry_delay': timedelta(seconds=5)
}
dag = DAG(
dag_id='clear_tasks_container_logs',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
def clear_worker_logs():
dt = datetime.now() - timedelta(days=3)
time_str = dt.strftime('%Y-%m-%d')
cmd = 'rm -rf /opt/bitnami/airflow/logs/*/*/%s*' % (time_str)
logging.info('exec cmd : ' + cmd)
os.system(cmd)
task = PythonOperator(
task_id='clear_worker_logs',
python_callable=clear_worker_logs,
dag=dag,
)
if __name__ == '__main__':
dag.cli()
# -*- coding: utf-8 -*-
'''
Different services are run by different operators.
The service name is the same as the task id.
'''
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os, json, logging, time, signal,platform,subprocess
import airflow
#variable parameter
DAG_ID = 'alarm_flink_job'
START_DATE = datetime.now() - timedelta(minutes=10)
SCHEDULE_INTERVAL = timedelta(minutes=10)
SERVICES = [
'mv-cluster9001-test-flin-jobmanager',
'cpc-cluster9001-test-fli-jobmanager'
]
DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
class TimeoutError(Exception):
pass
def run_command(cmd, timeout=60):
is_linux = platform.system() == 'Linux'
p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None)
t_beginning = time.time()
seconds_passed = 0
while True:
if p.poll() is not None:
break
seconds_passed = time.time() - t_beginning
if timeout and seconds_passed > timeout:
if is_linux:
os.killpg(p.
pid, signal.SIGTERM)
else:
p.terminate()
raise TimeoutError(cmd, timeout)
time.sleep(0.1)
return p.stdout.read()
default_args = {
'owner': 'tangxianggang',
'depends_on_past': False,
'start_date': START_DATE,
}
dag = DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL
)
def failure_callback(context):
title = 'Flink job error alarm'
message = '''SERVICE: %s
\n#### DAG_ID: %s
\n#### TASKID: %s
\n#### CONTENT:
%s
\n> **For more details, please check the airflow task log.**
''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception'])
logging.error('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd)
def get_jobs(cmd):
logging.info('exec cmd:' + cmd)
jobs_dict = {}
timeout = 10
try:
result = run_command(cmd, timeout)
result = result.decode()
if 'Could not resolve host' in result:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
raise Exception(msg)
jobs_dict = json.loads(result.split('\n')[-1])
if 'errors' in jobs_dict:
msg = '\n- **errors**: ' + jobs_dict['errors'][0]
logging.error(msg)
raise Exception(msg)
except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
logging.error(msg)
raise Exception(msg)
return jobs_dict
def judge_job_status(job):
if 'errors' in job:
msg = "\n- **errors**: %s" % (job['errors'])
logging.error(msg)
return (False, msg)
if 'RUNNING' != job['status']:
msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status'])
logging.error(msg)
return (False, msg)
msg = "\n- **job_id**: %s, **status**: %s" % (job['id'],job['status'])
logging.info(msg)
return (True, msg)
def python_callable(**kwargs):
logging.info('start kafka connect status analyze .')
curl = 'curl '
flink_jobmanager_url = 'http://'+ kwargs['task_instance'].task_id +':8081/jobs/'
jobs_dict = get_jobs(curl + flink_jobmanager_url)
logging.info('exec cmd: ' + curl + flink_jobmanager_url + ' success!')
jobs_list = jobs_dict['jobs']
error_msg = ""
all_job = len(jobs_list)
running_job = 0
for job in jobs_list:
(isrunning,msg) = judge_job_status(job)
if not isrunning:
error_msg += msg
else :
running_job += 1
if error_msg:
msg = "\n- **running_job/all_job**: %s/%s" % (running_job, all_job)
error_msg += msg
raise Exception(error_msg)
logging.info('Flink job status ok!')
for service in SERVICES:
task = PythonOperator(
task_id=service,
provide_context = True,
python_callable=python_callable,
on_failure_callback=failure_callback,
dag=dag,
)
if __name__ == '__main__':
dag.cli()
# -*- coding: utf-8 -*-
'''
Different services are run by different operators.
The service name is the same as the task id.
'''
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os, json, logging, time, signal, platform, subprocess
import airflow
#variable parameter
DAG_ID = 'alarm_kafka_connect'
START_DATE = datetime.now() - timedelta(minutes=10)
SCHEDULE_INTERVAL = timedelta(minutes=10)
SERVICES = [
'es-gmei-test-cp-kafka-connect',
'dbz-alpha-test-cp-kafka-connect',
'dbz-commodity-test-cp-kafka-connect',
'cpc-test-cp-kafka-connect'
]
DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
class TimeoutError(Exception):
pass
def run_command(cmd, timeout=60):
is_linux = platform.system() == 'Linux'
p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None)
t_beginning = time.time()
seconds_passed = 0
while True:
if p.poll() is not None:
break
seconds_passed = time.time() - t_beginning
if timeout and seconds_passed > timeout:
if is_linux:
os.killpg(p.
pid, signal.SIGTERM)
else:
p.terminate()
raise TimeoutError(cmd, timeout)
time.sleep(0.1)
return p.stdout.read()
default_args = {
'owner': 'tangxianggang',
'depends_on_past': False,
'start_date': START_DATE,
}
dag = DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL
)
def failure_callback(context):
title = 'Kafka connect error alarm'
message = '''SERVICE: %s
\n#### DAG_ID: %s
\n#### TASKID: %s
\n#### CONTENT:
%s
\n> **For more details, please check the airflow task log.**
''' % (context['task_instance'].task_id, context['task_instance'].dag_id, context['task_instance'].task_id, context['exception'])
logging.error('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd)
def get_kafka_connectors(cmd):
logging.info('exec cmd:' + cmd)
connectors_list = []
timeout = 10
try:
result = run_command(cmd, timeout)
result = result.decode()
if 'Could not resolve host' in result:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
raise Exception(msg)
connectors_list = json.loads(result.split('\n')[-1])
if 'error_code' in connectors_list:
msg = '\n- **error_code**: ' + connectors_list['error_code'] + '**error_msg**: ' + connectors_list['message']
logging.error(msg)
raise Exception(msg)
except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i' % (cmd, timeout)
logging.error(msg)
raise Exception(msg)
logging.info(str(connectors_list))
return connectors_list
def get_connector_status(cmd):
logging.info('exec cmd: ' + cmd)
timeout = 10
outdict = {}
try:
result = run_command(cmd, timeout)
result = result.decode()
outdict = json.loads(result.split('\n')[-1])
except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
logging.error(msg)
errdict = {'error_code':'600','message':'excute command=(%s) timeout after %i s' % (cmd, timeout)}
return errdict
logging.info('get connector status : \n' + str(outdict))
return outdict
def judge_connector_status(connector_status_dict, connector):
if 'error_code' in connector_status_dict:
msg = ""
if connector_status_dict['error_code'] == '600':
msg = msg = "\n- **connector_name**: %s, **error_msg**: %s" % (connector, connector_status_dict['message'])
else:
msg = "\n- **connector_name**: %s, **error_code**: %s, **error_msg**: %s" % (connector, connector_status_dict['error_code'], connector_status_dict['message'])
logging.error(msg)
return (False, msg)
if 'RUNNING' != connector_status_dict['connector']['state']:
msg = "\n- **connector_name**: %s, **is_running**: false" % (connector)
logging.error(msg)
return (False, msg)
tasks_list = connector_status_dict['tasks']
error_tasks = 0
for task in tasks_list:
if task['state'] != 'RUNNING':
error_tasks += 1
if error_tasks:
all_tasks = len(tasks_list)
running_tasks = all_tasks - error_tasks
msg = "\n- **connector_name**: %s, **is_running**: true, **running_tasks/all_tasks**: %s/%s" % (connector, str(running_tasks), str(all_tasks) )
logging.error(msg)
return (False, msg)
return (True, str(connector_status_dict))
def python_callable(**kwargs):
logging.info('start kafka connect status analyze .')
curl = 'curl '
kafka_connect_url = 'http://' + kwargs['task_instance'].task_id + ':8083/connectors/'
status = '/status'
connectors_list = get_kafka_connectors(curl + kafka_connect_url)
logging.info('exec cmd: ' + curl + kafka_connect_url + ' success!')
error_msg = ""
for connector in connectors_list:
connector_status_dict = get_connector_status(curl + kafka_connect_url + connector + status)
(isrunning,msg) = judge_connector_status(connector_status_dict, connector)
if not isrunning:
error_msg += msg
if error_msg:
raise Exception(error_msg)
logging.info('kafka connect status ok!')
for service in SERVICES:
task = PythonOperator(
task_id=service,
provide_context = True,
python_callable=python_callable,
on_failure_callback=failure_callback,
dag=dag,
)
if __name__ == '__main__':
dag.cli()
#!/bin/bash
#set -x
TITLE=$1
DESC=$2
DINGDING=$3
#echo $TITLE, $DESC
function ding() {
# hostname action
curl $DINGDING \
-H 'Content-Type: application/json' \
-d "{
'msgtype': 'markdown',
'markdown': {
'title': \"$TITLE\",
'text': \"## $DESC \n > **`date`**\"
},
'at': {
'atMobiles': ['17864308072'],
'isAtAll': true
}
}"
}
ding
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment