Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
A
airflow-dags-hub
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
唐香港
airflow-dags-hub
Commits
4f2c89c8
Commit
4f2c89c8
authored
Aug 27, 2019
by
edz
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add dags and script
parent
2759b7ca
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
439 additions
and
0 deletions
+439
-0
.gitkeep
dags/.gitkeep
+0
-0
airflow_heartbeat_detection.py
dags/airflow/airflow_heartbeat_detection.py
+52
-0
clear_tasks_container_logs.py
dags/airflow/clear_tasks_container_logs.py
+37
-0
alarm_flink_job.py
dags/flink/alarm_flink_job.py
+147
-0
.gitkeep
dags/kafka/.gitkeep
+0
-0
alarm_kafka_connect.py
dags/kafka/alarm_kafka_connect.py
+175
-0
.gitkeep
script/.gitkeep
+0
-0
ding.sh
script/ding.sh
+28
-0
No files found.
dags/.gitkeep
0 → 100644
View file @
4f2c89c8
dags/airflow/airflow_heartbeat_detection.py
0 → 100644
View file @
4f2c89c8
# -*- coding: utf-8 -*-
import
airflow
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
import
os
,
logging
#variable parameter
DINGDING
=
'https://oapi.dingtalk.com/robot/send?access_token=4e00d7f7b3b8686ea7d37bd01264f86e197294f9f995ef8e12cc853760a30c60'
DING_PATH
=
'/opt/bitnami/airflow/dags/*/script/ding.sh'
default_args
=
{
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'start_date'
:
datetime
.
now
()
-
timedelta
(
minutes
=
60
),
'retries'
:
3
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
dag
=
DAG
(
dag_id
=
'airflow_heartbeat_detection'
,
default_args
=
default_args
,
schedule_interval
=
timedelta
(
minutes
=
60
)
)
def
heartbeat_alarm
(
**
kwargs
):
logging
.
info
(
'start heartbeat alarm'
)
title
=
'airflow_heartbeat_detection'
msg
=
'
\n
- I am airflow, I will notify you once every two hours.
\
If I have not notified after one hours, I have already hanged up.SOS...'
message
=
'''I am airflow, I am still alive.!!!
\n
#### DAG_ID:
%
s
\n
#### TASKID:
%
s
\n
#### CONTENT:
%
s
\n
> **For more details, please check the airflow task log.**
'''
%
(
kwargs
[
'task_instance'
]
.
dag_id
,
kwargs
[
'task_instance'
]
.
task_id
,
msg
)
logging
.
info
(
'message :
\n
'
+
message
)
cmd
=
" bash "
+
DING_PATH
+
"
\'
%
s
\'
\'
%
s
\'
\'
%
s
\'
"
%
(
title
,
message
,
DINGDING
)
os
.
system
(
cmd
)
task
=
PythonOperator
(
task_id
=
'heartbeat_alarm'
,
provide_context
=
True
,
python_callable
=
heartbeat_alarm
,
dag
=
dag
,
)
if
__name__
==
'__main__'
:
dag
.
cli
()
dags/airflow/clear_tasks_container_logs.py
0 → 100644
View file @
4f2c89c8
# -*- coding: utf-8 -*-
import
airflow
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
import
os
,
logging
default_args
=
{
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'start_date'
:
datetime
.
now
()
-
timedelta
(
days
=
1
),
'retries'
:
2
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
dag
=
DAG
(
dag_id
=
'clear_tasks_container_logs'
,
default_args
=
default_args
,
schedule_interval
=
timedelta
(
days
=
1
)
)
def
clear_worker_logs
():
dt
=
datetime
.
now
()
-
timedelta
(
days
=
3
)
time_str
=
dt
.
strftime
(
'
%
Y-
%
m-
%
d'
)
cmd
=
'rm -rf /opt/bitnami/airflow/logs/*/*/
%
s*'
%
(
time_str
)
logging
.
info
(
'exec cmd : '
+
cmd
)
os
.
system
(
cmd
)
task
=
PythonOperator
(
task_id
=
'clear_worker_logs'
,
python_callable
=
clear_worker_logs
,
dag
=
dag
,
)
if
__name__
==
'__main__'
:
dag
.
cli
()
dags/flink/alarm_flink_job.py
0 → 100644
View file @
4f2c89c8
# -*- coding: utf-8 -*-
'''
Different services are run by different operators.
The service name is the same as the task id.
'''
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
import
os
,
json
,
logging
,
time
,
signal
,
platform
,
subprocess
import
airflow
#variable parameter
DAG_ID
=
'alarm_flink_job'
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
10
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
10
)
SERVICES
=
[
'cpc-cluster01-prod-flink-jobmanager'
,
'mv-cluster01-prod-flink-jobmanager'
]
DINGDING
=
'https://oapi.dingtalk.com/robot/send?access_token=4e00d7f7b3b8686ea7d37bd01264f86e197294f9f995ef8e12cc853760a30c60'
DING_PATH
=
'/opt/bitnami/airflow/dags/*/script/ding.sh'
class
TimeoutError
(
Exception
):
pass
def
run_command
(
cmd
,
timeout
=
60
):
is_linux
=
platform
.
system
()
==
'Linux'
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
STDOUT
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
t_beginning
=
time
.
time
()
seconds_passed
=
0
while
True
:
if
p
.
poll
()
is
not
None
:
break
seconds_passed
=
time
.
time
()
-
t_beginning
if
timeout
and
seconds_passed
>
timeout
:
if
is_linux
:
os
.
killpg
(
p
.
pid
,
signal
.
SIGTERM
)
else
:
p
.
terminate
()
raise
TimeoutError
(
cmd
,
timeout
)
time
.
sleep
(
0.1
)
return
p
.
stdout
.
read
()
default_args
=
{
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'start_date'
:
START_DATE
,
}
dag
=
DAG
(
dag_id
=
DAG_ID
,
default_args
=
default_args
,
schedule_interval
=
SCHEDULE_INTERVAL
)
def
failure_callback
(
context
):
title
=
'Flink job error alarm'
message
=
'''SERVICE:
%
s
\n
#### DAG_ID:
%
s
\n
#### TASKID:
%
s
\n
#### CONTENT:
%
s
\n
> **For more details, please check the airflow task log.**
'''
%
(
context
[
'task_instance'
]
.
task_id
,
context
[
'task_instance'
]
.
dag_id
,
context
[
'task_instance'
]
.
task_id
,
context
[
'exception'
])
logging
.
error
(
'message :
\n
'
+
message
)
cmd
=
" bash "
+
DING_PATH
+
"
\'
%
s
\'
\'
%
s
\'
\'
%
s
\'
"
%
(
title
,
message
,
DINGDING
)
os
.
system
(
cmd
)
def
get_jobs
(
cmd
):
logging
.
info
(
'exec cmd:'
+
cmd
)
jobs_dict
=
{}
timeout
=
10
try
:
result
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
if
'Could not resolve host'
in
result
:
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
raise
Exception
(
msg
)
jobs_dict
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
if
'errors'
in
jobs_dict
:
msg
=
'
\n
- **errors**: '
+
jobs_dict
[
'errors'
][
0
]
logging
.
error
(
msg
)
raise
Exception
(
msg
)
except
TimeoutError
:
msg
=
'
\n
- **error_msg**: excute command=(
%
s) timeout after
%
i s'
%
(
cmd
,
timeout
)
logging
.
error
(
msg
)
raise
Exception
(
msg
)
return
jobs_dict
def
judge_job_status
(
job
):
if
'errors'
in
job
:
msg
=
"
\n
- **errors**:
%
s"
%
(
job
[
'errors'
])
logging
.
error
(
msg
)
return
(
False
,
msg
)
if
'RUNNING'
!=
job
[
'status'
]:
msg
=
"
\n
- **job_id**:
%
s, **status**:
%
s"
%
(
job
[
'id'
],
job
[
'status'
])
logging
.
error
(
msg
)
return
(
False
,
msg
)
msg
=
"
\n
- **job_id**:
%
s, **status**:
%
s"
%
(
job
[
'id'
],
job
[
'status'
])
logging
.
info
(
msg
)
return
(
True
,
msg
)
def
python_callable
(
**
kwargs
):
logging
.
info
(
'start kafka connect status analyze .'
)
curl
=
'curl '
flink_jobmanager_url
=
'http://'
+
kwargs
[
'task_instance'
]
.
task_id
+
':8081/jobs/'
jobs_dict
=
get_jobs
(
curl
+
flink_jobmanager_url
)
logging
.
info
(
'exec cmd: '
+
curl
+
flink_jobmanager_url
+
' success!'
)
jobs_list
=
jobs_dict
[
'jobs'
]
error_msg
=
""
all_job
=
len
(
jobs_list
)
running_job
=
0
for
job
in
jobs_list
:
(
isrunning
,
msg
)
=
judge_job_status
(
job
)
if
not
isrunning
:
error_msg
+=
msg
else
:
running_job
+=
1
if
error_msg
:
msg
=
"
\n
- **running_job/all_job**:
%
s/
%
s"
%
(
running_job
,
all_job
)
error_msg
+=
msg
raise
Exception
(
error_msg
)
logging
.
info
(
'Flink job status ok!'
)
for
service
in
SERVICES
:
task
=
PythonOperator
(
task_id
=
service
,
provide_context
=
True
,
python_callable
=
python_callable
,
on_failure_callback
=
failure_callback
,
dag
=
dag
,
)
if
__name__
==
'__main__'
:
dag
.
cli
()
dags/kafka/.gitkeep
0 → 100644
View file @
4f2c89c8
dags/kafka/alarm_kafka_connect.py
0 → 100644
View file @
4f2c89c8
# -*- coding: utf-8 -*-
'''
Different services are run by different operators.
The service name is the same as the task id.
'''
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
import
os
,
json
,
logging
,
time
,
signal
,
platform
,
subprocess
import
airflow
#variable parameter
DAG_ID
=
'alarm_kafka_connect'
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
10
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
10
)
SERVICES
=
[
'cpc-v2-prod-cp-kafka-connect'
,
'dbz-alpha-prod-cp-kafka-connect'
,
'dbz-commodity-prod-cp-kafka-connect'
,
'es-gmei-prod-cp-kafka-connect'
]
DINGDING
=
'https://oapi.dingtalk.com/robot/send?access_token=4e00d7f7b3b8686ea7d37bd01264f86e197294f9f995ef8e12cc853760a30c60'
DING_PATH
=
'/opt/bitnami/airflow/dags/*/script/ding.sh'
class
TimeoutError
(
Exception
):
pass
def
run_command
(
cmd
,
timeout
=
60
):
is_linux
=
platform
.
system
()
==
'Linux'
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
STDOUT
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
t_beginning
=
time
.
time
()
seconds_passed
=
0
while
True
:
if
p
.
poll
()
is
not
None
:
break
seconds_passed
=
time
.
time
()
-
t_beginning
if
timeout
and
seconds_passed
>
timeout
:
if
is_linux
:
os
.
killpg
(
p
.
pid
,
signal
.
SIGTERM
)
else
:
p
.
terminate
()
raise
TimeoutError
(
cmd
,
timeout
)
time
.
sleep
(
0.1
)
return
p
.
stdout
.
read
()
default_args
=
{
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'start_date'
:
START_DATE
,
}
dag
=
DAG
(
dag_id
=
DAG_ID
,
default_args
=
default_args
,
schedule_interval
=
SCHEDULE_INTERVAL
)
def
failure_callback
(
context
):
title
=
'Kafka connect error alarm'
message
=
'''SERVICE:
%
s
\n
#### DAG_ID:
%
s
\n
#### TASKID:
%
s
\n
#### CONTENT:
%
s
\n
> **For more details, please check the airflow task log.**
'''
%
(
context
[
'task_instance'
]
.
task_id
,
context
[
'task_instance'
]
.
dag_id
,
context
[
'task_instance'
]
.
task_id
,
context
[
'exception'
])
logging
.
error
(
'message :
\n
'
+
message
)
cmd
=
" bash "
+
DING_PATH
+
"
\'
%
s
\'
\'
%
s
\'
\'
%
s
\'
"
%
(
title
,
message
,
DINGDING
)
os
.
system
(
cmd
)
def
get_kafka_connectors
(
cmd
):
logging
.
info
(
'exec cmd:'
+
cmd
)
connectors_list
=
[]
timeout
=
10
try
:
result
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
if
'Could not resolve host'
in
result
:
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
raise
Exception
(
msg
)
connectors_list
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
if
'error_code'
in
connectors_list
:
msg
=
'
\n
- **error_code**: '
+
connectors_list
[
'error_code'
]
+
'**error_msg**: '
+
connectors_list
[
'message'
]
logging
.
error
(
msg
)
raise
Exception
(
msg
)
except
TimeoutError
:
msg
=
'
\n
- **error_msg**: excute command=(
%
s) timeout after
%
i'
%
(
cmd
,
timeout
)
logging
.
error
(
msg
)
raise
Exception
(
msg
)
logging
.
info
(
str
(
connectors_list
))
return
connectors_list
def
get_connector_status
(
cmd
):
logging
.
info
(
'exec cmd: '
+
cmd
)
timeout
=
10
outdict
=
{}
try
:
result
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
outdict
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
except
TimeoutError
:
msg
=
'
\n
- **error_msg**: excute command=(
%
s) timeout after
%
i s'
%
(
cmd
,
timeout
)
logging
.
error
(
msg
)
errdict
=
{
'error_code'
:
'600'
,
'message'
:
'excute command=(
%
s) timeout after
%
i s'
%
(
cmd
,
timeout
)}
return
errdict
logging
.
info
(
'get connector status :
\n
'
+
str
(
outdict
))
return
outdict
def
judge_connector_status
(
connector_status_dict
,
connector
):
if
'error_code'
in
connector_status_dict
:
msg
=
""
if
connector_status_dict
[
'error_code'
]
==
'600'
:
msg
=
msg
=
"
\n
- **connector_name**:
%
s, **error_msg**:
%
s"
%
(
connector
,
connector_status_dict
[
'message'
])
else
:
msg
=
"
\n
- **connector_name**:
%
s, **error_code**:
%
s, **error_msg**:
%
s"
%
(
connector
,
connector_status_dict
[
'error_code'
],
connector_status_dict
[
'message'
])
logging
.
error
(
msg
)
return
(
False
,
msg
)
if
'RUNNING'
!=
connector_status_dict
[
'connector'
][
'state'
]:
msg
=
"
\n
- **connector_name**:
%
s, **is_running**: false"
%
(
connector
)
logging
.
error
(
msg
)
return
(
False
,
msg
)
tasks_list
=
connector_status_dict
[
'tasks'
]
error_tasks
=
0
for
task
in
tasks_list
:
if
task
[
'state'
]
!=
'RUNNING'
:
error_tasks
+=
1
if
error_tasks
:
all_tasks
=
len
(
tasks_list
)
running_tasks
=
all_tasks
-
error_tasks
msg
=
"
\n
- **connector_name**:
%
s, **is_running**: true, **running_tasks/all_tasks**:
%
s/
%
s"
%
(
connector
,
str
(
running_tasks
),
str
(
all_tasks
)
)
logging
.
error
(
msg
)
return
(
False
,
msg
)
return
(
True
,
str
(
connector_status_dict
))
def
python_callable
(
**
kwargs
):
logging
.
info
(
'start kafka connect status analyze .'
)
curl
=
'curl '
kafka_connect_url
=
'http://'
+
kwargs
[
'task_instance'
]
.
task_id
+
':8083/connectors/'
status
=
'/status'
connectors_list
=
get_kafka_connectors
(
curl
+
kafka_connect_url
)
logging
.
info
(
'exec cmd: '
+
curl
+
kafka_connect_url
+
' success!'
)
error_msg
=
""
for
connector
in
connectors_list
:
connector_status_dict
=
get_connector_status
(
curl
+
kafka_connect_url
+
connector
+
status
)
(
isrunning
,
msg
)
=
judge_connector_status
(
connector_status_dict
,
connector
)
if
not
isrunning
:
error_msg
+=
msg
if
error_msg
:
raise
Exception
(
error_msg
)
logging
.
info
(
'kafka connect status ok!'
)
for
service
in
SERVICES
:
task
=
PythonOperator
(
task_id
=
service
,
provide_context
=
True
,
python_callable
=
python_callable
,
on_failure_callback
=
failure_callback
,
dag
=
dag
,
)
if
__name__
==
'__main__'
:
dag
.
cli
()
script/.gitkeep
0 → 100644
View file @
4f2c89c8
script/ding.sh
0 → 100644
View file @
4f2c89c8
#!/bin/bash
#set -x
TITLE
=
$1
DESC
=
$2
DINGDING
=
$3
#echo $TITLE, $DESC
function
ding
()
{
# hostname action
curl
$DINGDING
\
-H
'Content-Type: application/json'
\
-d
"{
'msgtype': 'markdown',
'markdown': {
'title':
\"
$TITLE
\"
,
'text':
\"
##
$DESC
\n
> **
`
date
`
**
\"
},
'at': {
'atMobiles': ['17864308072'],
'isAtAll': true
}
}"
}
ding
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment