Commit cb7e1580 authored by 唐香港's avatar 唐香港

Add new file

parent dcfd1205
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os,logging
#variable parameter
DINGDING = 'https://oapi.dingtalk.com/robot/send?access_token=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
default_args = {
'owner': 'tangxianggang',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=260),
'retries': 3,
'retry_delay': timedelta(seconds=5)
}
dag = DAG(
dag_id='airflow_heartbeat_detection',
default_args=default_args,
schedule_interval=timedelta(minutes=130)
)
def heartbeat_alarm(**kwargs):
logger.info('start heartbeat alarm')
title = 'airflow_heartbeat_detection'
msg = '\n- I am airflow, I will notify you once every 130 minutes.\
If I have not notified after 130 minutes, I have already hanged up.SOS...'
message = '''I am airflow, I am still alive.!!!
\n#### DAG_ID: %s
\n#### TASKID: %s
\n#### CONTENT:
%s
\n> **For more details, please check the airflow task log.**
''' % (kwargs['task_instance'].dag_id, kwargs['task_instance'].task_id, msg)
logger.info('message : \n' + message)
cmd = " bash " + DING_PATH + " \'%s\' \'%s\' \'%s\' " % (title, message, DINGDING)
os.system(cmd)
task = PythonOperator(
task_id='heartbeat_alarm',
provide_context = True,
python_callable=heartbeat_alarm,
dag=dag,
)
if __name__ == '__main__':
dag.cli()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment