Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
A
airflow-dags-hub
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
唐香港
airflow-dags-hub
Commits
38971627
Commit
38971627
authored
Aug 27, 2019
by
edz
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add kafka connect v2
parent
fee427a6
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
168 additions
and
15 deletions
+168
-15
airflow_heartbeat_detection.py
dags/airflow/airflow_heartbeat_detection.py
+4
-4
clear_tasks_container_logs.py
dags/airflow/clear_tasks_container_logs.py
+3
-3
alarm_flink_job.py
dags/flink/alarm_flink_job.py
+4
-2
alarm_kafka_connect.py
dags/kafka/alarm_kafka_connect.py
+7
-6
alarm_kafka_connect_v2.py
dags/kafka/alarm_kafka_connect_v2.py
+150
-0
No files found.
dags/airflow/airflow_heartbeat_detection.py
View file @
38971627
...
@@ -13,7 +13,7 @@ DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
...
@@ -13,7 +13,7 @@ DING_PATH = '/opt/bitnami/airflow/dags/*/script/ding.sh'
default_args
=
{
default_args
=
{
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'depends_on_past'
:
False
,
'start_date'
:
datetime
.
now
()
-
timedelta
(
minutes
=
10
0
),
'start_date'
:
datetime
.
now
()
-
timedelta
(
minutes
=
5
0
),
'retries'
:
3
,
'retries'
:
3
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
}
...
@@ -21,14 +21,14 @@ default_args = {
...
@@ -21,14 +21,14 @@ default_args = {
dag
=
DAG
(
dag
=
DAG
(
dag_id
=
'airflow_heartbeat_detection'
,
dag_id
=
'airflow_heartbeat_detection'
,
default_args
=
default_args
,
default_args
=
default_args
,
schedule_interval
=
timedelta
(
minutes
=
10
0
)
schedule_interval
=
timedelta
(
minutes
=
5
0
)
)
)
def
heartbeat_alarm
(
**
kwargs
):
def
heartbeat_alarm
(
**
kwargs
):
logging
.
info
(
'start heartbeat alarm'
)
logging
.
info
(
'start heartbeat alarm'
)
title
=
'airflow_heartbeat_detection'
title
=
'airflow_heartbeat_detection'
msg
=
'
\n
- I am airflow, I will notify you once every
two hour
s.
\
msg
=
'
\n
- I am airflow, I will notify you once every
50 minute
s.
\
If I have not notified after
10
0 minutes, I have already hanged up.SOS...'
If I have not notified after
5
0 minutes, I have already hanged up.SOS...'
message
=
'''I am airflow, I am still alive.!!!
message
=
'''I am airflow, I am still alive.!!!
\n
#### DAG_ID:
%
s
\n
#### DAG_ID:
%
s
\n
#### TASKID:
%
s
\n
#### TASKID:
%
s
...
...
dags/airflow/clear_tasks_container_logs.py
View file @
38971627
...
@@ -9,7 +9,7 @@ import os,logging
...
@@ -9,7 +9,7 @@ import os,logging
default_args
=
{
default_args
=
{
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'depends_on_past'
:
False
,
'start_date'
:
datetime
.
now
()
-
timedelta
(
days
=
1
),
'start_date'
:
datetime
.
now
()
-
timedelta
(
minutes
=
1445
),
'retries'
:
2
,
'retries'
:
2
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
}
...
@@ -17,11 +17,11 @@ default_args = {
...
@@ -17,11 +17,11 @@ default_args = {
dag
=
DAG
(
dag
=
DAG
(
dag_id
=
'clear_tasks_container_logs'
,
dag_id
=
'clear_tasks_container_logs'
,
default_args
=
default_args
,
default_args
=
default_args
,
schedule_interval
=
timedelta
(
days
=
1
)
schedule_interval
=
timedelta
(
minutes
=
1445
)
)
)
def
clear_worker_logs
():
def
clear_worker_logs
():
dt
=
datetime
.
now
()
-
timedelta
(
days
=
3
)
dt
=
datetime
.
now
()
-
timedelta
(
minutes
=
1445
)
time_str
=
dt
.
strftime
(
'
%
Y-
%
m-
%
d'
)
time_str
=
dt
.
strftime
(
'
%
Y-
%
m-
%
d'
)
cmd
=
'rm -rf /opt/bitnami/airflow/logs/*/*/
%
s*'
%
(
time_str
)
cmd
=
'rm -rf /opt/bitnami/airflow/logs/*/*/
%
s*'
%
(
time_str
)
logging
.
info
(
'exec cmd : '
+
cmd
)
logging
.
info
(
'exec cmd : '
+
cmd
)
...
...
dags/flink/alarm_flink_job.py
View file @
38971627
...
@@ -13,8 +13,8 @@ import airflow
...
@@ -13,8 +13,8 @@ import airflow
#variable parameter
#variable parameter
DAG_ID
=
'alarm_flink_job'
DAG_ID
=
'alarm_flink_job'
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
1
0
)
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
1
5
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
5
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
1
5
)
SERVICES
=
[
SERVICES
=
[
'cpc-cluster01-prod-flink-jobmanager'
,
'cpc-cluster01-prod-flink-jobmanager'
,
...
@@ -50,6 +50,8 @@ default_args = {
...
@@ -50,6 +50,8 @@ default_args = {
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'depends_on_past'
:
False
,
'start_date'
:
START_DATE
,
'start_date'
:
START_DATE
,
'retries'
:
3
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
}
dag
=
DAG
(
dag
=
DAG
(
...
...
dags/kafka/alarm_kafka_connect.py
View file @
38971627
...
@@ -13,14 +13,13 @@ import airflow
...
@@ -13,14 +13,13 @@ import airflow
#variable parameter
#variable parameter
DAG_ID
=
'alarm_kafka_connect'
DAG_ID
=
'alarm_kafka_connect'
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
1
0
)
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
1
5
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
5
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
1
5
)
SERVICES
=
[
SERVICES
=
[
'cpc-v2-prod-cp-kafka-connect'
,
'cpc-v2-prod-cp-kafka-connect'
,
'dbz-alpha-prod-cp-kafka-connect'
,
'dbz-alpha-prod-cp-kafka-connect'
,
'dbz-commodity-prod-cp-kafka-connect'
,
'dbz-commodity-prod-cp-kafka-connect'
'es-gmei-prod-cp-kafka-connect'
]
]
DINGDING
=
'https://oapi.dingtalk.com/robot/send?access_token=4e00d7f7b3b8686ea7d37bd01264f86e197294f9f995ef8e12cc853760a30c60'
DINGDING
=
'https://oapi.dingtalk.com/robot/send?access_token=4e00d7f7b3b8686ea7d37bd01264f86e197294f9f995ef8e12cc853760a30c60'
...
@@ -52,6 +51,8 @@ default_args = {
...
@@ -52,6 +51,8 @@ default_args = {
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'depends_on_past'
:
False
,
'start_date'
:
START_DATE
,
'start_date'
:
START_DATE
,
'retries'
:
3
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
}
dag
=
DAG
(
dag
=
DAG
(
...
@@ -77,7 +78,7 @@ def failure_callback(context):
...
@@ -77,7 +78,7 @@ def failure_callback(context):
def
get_kafka_connectors
(
cmd
):
def
get_kafka_connectors
(
cmd
):
logging
.
info
(
'exec cmd:'
+
cmd
)
logging
.
info
(
'exec cmd:'
+
cmd
)
connectors_list
=
[]
connectors_list
=
[]
timeout
=
3
0
timeout
=
1
0
try
:
try
:
result
=
run_command
(
cmd
,
timeout
)
result
=
run_command
(
cmd
,
timeout
)
...
@@ -100,7 +101,7 @@ def get_kafka_connectors(cmd):
...
@@ -100,7 +101,7 @@ def get_kafka_connectors(cmd):
def
get_connector_status
(
cmd
):
def
get_connector_status
(
cmd
):
logging
.
info
(
'exec cmd: '
+
cmd
)
logging
.
info
(
'exec cmd: '
+
cmd
)
timeout
=
3
0
timeout
=
1
0
outdict
=
{}
outdict
=
{}
try
:
try
:
...
...
dags/kafka/alarm_kafka_connect_v2.py
0 → 100644
View file @
38971627
# -*- coding: utf-8 -*-
'''
Different services are run by different operators.
The service name is the same as the task id.
'''
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
import
os
,
json
,
logging
,
time
,
signal
,
platform
,
subprocess
import
airflow
#variable parameter
DAG_ID
=
'alarm_kafka_connect_v2'
START_DATE
=
datetime
.
now
()
-
timedelta
(
minutes
=
15
)
SCHEDULE_INTERVAL
=
timedelta
(
minutes
=
15
)
SERVICES
=
[
'es-gmei-prod-cp-kafka-connect'
,
]
DINGDING
=
'https://oapi.dingtalk.com/robot/send?access_token=4e00d7f7b3b8686ea7d37bd01264f86e197294f9f995ef8e12cc853760a30c60'
DING_PATH
=
'/opt/bitnami/airflow/dags/*/script/ding.sh'
class
TimeoutError
(
Exception
):
pass
def
run_command
(
cmd
,
timeout
=
60
):
is_linux
=
platform
.
system
()
==
'Linux'
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
STDOUT
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
t_beginning
=
time
.
time
()
seconds_passed
=
0
while
True
:
if
p
.
poll
()
is
not
None
:
break
seconds_passed
=
time
.
time
()
-
t_beginning
if
timeout
and
seconds_passed
>
timeout
:
if
is_linux
:
os
.
killpg
(
p
.
pid
,
signal
.
SIGTERM
)
else
:
p
.
terminate
()
raise
TimeoutError
(
cmd
,
timeout
)
time
.
sleep
(
0.1
)
return
p
.
stdout
.
read
()
default_args
=
{
'owner'
:
'tangxianggang'
,
'depends_on_past'
:
False
,
'start_date'
:
START_DATE
,
'retries'
:
3
,
'retry_delay'
:
timedelta
(
seconds
=
5
)
}
dag
=
DAG
(
dag_id
=
DAG_ID
,
default_args
=
default_args
,
schedule_interval
=
SCHEDULE_INTERVAL
)
def
failure_callback
(
context
):
title
=
'Kafka connect error alarm'
message
=
'''SERVICE:
%
s
\n
#### DAG_ID:
%
s
\n
#### TASKID:
%
s
\n
#### CONTENT:
%
s
\n
> **For more details, please check the airflow task log.**
'''
%
(
context
[
'task_instance'
]
.
task_id
,
context
[
'task_instance'
]
.
dag_id
,
context
[
'task_instance'
]
.
task_id
,
context
[
'exception'
])
logging
.
error
(
'message :
\n
'
+
message
)
cmd
=
" bash "
+
DING_PATH
+
"
\'
%
s
\'
\'
%
s
\'
\'
%
s
\'
"
%
(
title
,
message
,
DINGDING
)
os
.
system
(
cmd
)
def
get_kafka_connectors_status
(
cmd
):
logging
.
info
(
'exec cmd:'
+
cmd
)
connectors_dict
=
[]
timeout
=
10
try
:
result
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
if
'Could not resolve host'
in
result
:
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
raise
Exception
(
msg
)
connectors_dict
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
if
'error_code'
in
connectors_dict
:
msg
=
'
\n
- **error_code**: '
+
connectors_dict
[
'error_code'
]
+
'**error_msg**: '
+
connectors_dict
[
'message'
]
logging
.
error
(
msg
)
raise
Exception
(
msg
)
except
TimeoutError
:
msg
=
'
\n
- **error_msg**: excute command=(
%
s) timeout after
%
i'
%
(
cmd
,
timeout
)
logging
.
error
(
msg
)
raise
Exception
(
msg
)
logging
.
info
(
str
(
connectors_dict
))
return
connectors_dict
def
judge_connector_status
(
connector_status_dict
,
connector
):
if
'error_code'
in
connector_status_dict
:
msg
=
"
\n
- **connector_name**:
%
s, **error_code**:
%
s, **error_msg**:
%
s"
%
(
connector
,
connector_status_dict
[
'error_code'
],
connector_status_dict
[
'message'
])
logging
.
error
(
msg
)
return
(
False
,
msg
)
if
'RUNNING'
!=
connector_status_dict
[
'status'
][
'connector'
][
'state'
]:
msg
=
"
\n
- **connector_name**:
%
s, **is_running**: false"
%
(
connector
)
logging
.
error
(
msg
)
return
(
False
,
msg
)
tasks_list
=
connector_status_dict
[
'status'
][
'tasks'
]
error_tasks
=
0
for
task
in
tasks_list
:
if
task
[
'state'
]
!=
'RUNNING'
:
error_tasks
+=
1
if
error_tasks
:
all_tasks
=
len
(
tasks_list
)
running_tasks
=
all_tasks
-
error_tasks
msg
=
"
\n
- **connector_name**:
%
s, **is_running**: true, **running_tasks/all_tasks**:
%
s/
%
s"
%
(
connector
,
str
(
running_tasks
),
str
(
all_tasks
)
)
logging
.
error
(
msg
)
return
(
False
,
msg
)
return
(
True
,
str
(
connector_status_dict
))
def
python_callable
(
**
kwargs
):
logging
.
info
(
'start kafka connect status analyze .'
)
curl
=
'curl '
kafka_connect_url
=
'http://'
+
kwargs
[
'task_instance'
]
.
task_id
+
':8083/connectors/?expand=status'
connectors_dict
=
get_kafka_connectors_status
(
curl
+
kafka_connect_url
)
logging
.
info
(
'exec cmd: '
+
curl
+
kafka_connect_url
+
' success!'
)
error_msg
=
""
for
connector
in
connectors_dict
:
(
isrunning
,
msg
)
=
judge_connector_status
(
connectors_dict
[
connector
],
connector
)
if
not
isrunning
:
error_msg
+=
msg
if
error_msg
:
raise
Exception
(
error_msg
)
logging
.
info
(
'kafka connect status ok!'
)
for
service
in
SERVICES
:
task
=
PythonOperator
(
task_id
=
service
,
provide_context
=
True
,
python_callable
=
python_callable
,
on_failure_callback
=
failure_callback
,
dag
=
dag
,
)
if
__name__
==
'__main__'
:
dag
.
cli
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment