Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
A
airflow-dags-hub
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
唐香港
airflow-dags-hub
Commits
dcfd1205
Commit
dcfd1205
authored
Feb 04, 2020
by
唐香港
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Delete AlertScript.py
parent
5b6456bc
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
0 additions
and
345 deletions
+0
-345
AlertScript.py
script/AlertScript.py
+0
-345
No files found.
script/AlertScript.py
deleted
100644 → 0
View file @
5b6456bc
# -*- coding: utf-8 -*-
"""
pip install confluent-kafka==1.2.0
pip install requests==2.6.0
"""
import
json
from
confluent_kafka
import
Consumer
,
KafkaError
import
requests
import
datetime
# kafka获取数据的允许延迟时间
MESSAGE_POLL_TIMEOUT_S
=
3
# kafka批量获取数据的时间间隔,例如:每10秒表示一个批次进行数据处理
TIME_INTERVAL
=
10
# 一个批次的最大数据条数
MAX_NUM
=
1000
# 黑名单
"""
blacklist = {
'module1': ['action_not_alret', 'action_not_alret_2'],
# 如果module为空,表示该module的所有action全部过滤
'module_not_alret':[]
}
"""
blacklist
=
{
'module1'
:
[
'action_not_alret'
,
'action_not_alret_2'
],
'hermes'
:
[]
}
# 白名单
"""
whitelist = {
'module':{
'tp50':
'tp95':
'tp99':
'action':{
'tp50':
'tp95':
'tp99':
}
}
# tp50阀值
'tp50':
# tp95阀值
'tp95':
# tp99阀值
'tp99':
}
"""
whitelist
=
{
'tp50'
:
30.0
,
'tp95'
:
60.0
,
'tp99'
:
120.0
,
'gaia'
:
{
'tp50'
:
5.0
,
'tp99'
:
20.0
,
},
'mimas'
:
{
'tp50'
:
5.0
,
'tp99'
:
20.0
,
},
'doris'
:
{
'tp50'
:
5.0
,
'tp99'
:
20.0
,
},
'flag-ship'
:
{
'tp50'
:
5.0
,
'tp99'
:
20.0
,
},
'ship'
:
{
'tp50'
:
5.0
,
'tp99'
:
20.0
,
},
'backend'
:
{
'tp50'
:
5.0
,
'tp99'
:
12.0
,
'api/service/detail/v1$'
:
{
'tp50'
:
1.5
,
'tp99'
:
5.0
,
},
'api/index/v7'
:
{
'tp50'
:
1.2
,
'tp99'
:
5.0
,
},
'api/search/v2/service$'
:
{
'tp50'
:
4.0
,
'tp99'
:
10.0
,
},
'api/personal/recommends$'
:
{
'tp50'
:
1.5
,
'tp99'
:
3.0
,
},
'api/service/home/v3$'
:
{
'tp50'
:
1.5
,
'tp99'
:
4.0
,
},
'api/search/v5/content$'
:
{
'tp50'
:
3.0
,
'tp99'
:
8.0
,
},
'api/settlement/preview/v1$'
:
{
'tp50'
:
1.5
,
'tp99'
:
3.0
,
},
}
}
# kafka topic server
SERVER
=
'172.16.44.25:9092'
# kafka condumer group
GROUP
=
'python-alert'
# kafka consumer client id
CLIENT
=
'client01'
# kafka topic
TOPIC
=
[
'rt_gm_logging_perf_data_window'
]
# dingding url
API_URL
=
'https://oapi.dingtalk.com/robot/send?access_token'
\
'=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
def
alert
(
text
):
headers
=
{
'Content-Type'
:
'application/json;charset=utf-8'
}
json_text
=
{
"msgtype"
:
"text"
,
"at"
:
{
"atMobiles"
:
[
"17864308072"
],
"isAtAll"
:
False
},
"text"
:
{
"content"
:
text
}
}
text
=
json
.
dumps
(
json_text
)
# print(text)
requests
.
post
(
API_URL
,
text
,
headers
=
headers
)
DATA_DICT
=
{}
def
data_dict_init
(
key
):
DATA_DICT
[
key
]
=
{
'alert'
:
False
,
'module'
:
None
,
'action'
:
None
,
'numbers'
:
0
,
'counts'
:
0
,
'tp50'
:
0.0
,
'tp95'
:
0.0
,
'tp99'
:
0.0
,
'start_time'
:
None
}
def
add_alert_message
(
key
):
return
'''
module名字:
%
s,
action名字:
%
s,
本批次中出现module_action的次数:
%
d,
本批次中module_action的总的请求数量(counts):
%
d,
出现module_action的次数中最大的tp50:
%
f
出现module_action的次数中最大的tp95:
%
f
出现module_action的次数中最大的tp99:
%
f
本批次中tp值最晚达到阈值的start_time:
%
s
-------------------------------------------
'''
%
(
str
(
DATA_DICT
[
key
][
'module'
]),
str
(
DATA_DICT
[
key
][
'action'
]),
int
(
DATA_DICT
[
key
][
'numbers'
]),
int
(
DATA_DICT
[
key
][
'counts'
]),
float
(
DATA_DICT
[
key
][
'tp50'
]),
float
(
DATA_DICT
[
key
][
'tp95'
]),
float
(
DATA_DICT
[
key
][
'tp99'
]),
str
(
DATA_DICT
[
key
][
'start_time'
])
)
def
deal_with_data
(
msg_list
):
# print('deal with data start .....')
DATA_DICT
.
clear
()
try
:
for
val
in
msg_list
:
val_dict
=
json
.
loads
(
val
)
module
=
val_dict
[
'module'
]
action
=
val_dict
[
'action'
]
# 黑名单过滤
if
module
in
blacklist
:
if
blacklist
[
module
]:
if
action
in
blacklist
[
module
]:
# print(str(module) + ' ' + str(action) + ' 被过滤')
continue
else
:
# print(str(module) + ' 被过滤')
continue
# 白名单过滤
TP50_THRESHOLD
=
999999.999
TP95_THRESHOLD
=
999999.999
TP99_THRESHOLD
=
999999.999
if
'tp50'
in
whitelist
:
TP50_THRESHOLD
=
whitelist
[
'tp50'
]
if
'tp95'
in
whitelist
:
TP95_THRESHOLD
=
whitelist
[
'tp95'
]
if
'tp99'
in
whitelist
:
TP99_THRESHOLD
=
whitelist
[
'tp99'
]
if
module
in
whitelist
:
if
'tp50'
in
whitelist
[
module
]:
TP50_THRESHOLD
=
whitelist
[
module
][
'tp50'
]
if
'tp95'
in
whitelist
[
module
]:
TP95_THRESHOLD
=
whitelist
[
module
][
'tp95'
]
if
'tp99'
in
whitelist
[
module
]:
TP99_THRESHOLD
=
whitelist
[
module
][
'tp99'
]
if
action
in
whitelist
[
module
]:
if
'tp50'
in
whitelist
[
module
][
action
]:
TP50_THRESHOLD
=
whitelist
[
module
][
action
][
'tp50'
]
if
'tp95'
in
whitelist
[
module
][
action
]:
TP95_THRESHOLD
=
whitelist
[
module
][
action
][
'tp95'
]
if
'tp99'
in
whitelist
[
module
][
action
]:
TP99_THRESHOLD
=
whitelist
[
module
][
action
][
'tp99'
]
key
=
val_dict
[
'module'
]
+
'_'
+
val_dict
[
'action'
]
# print('白名单 : ' + str(key) + ' ' + str(TP50_THRESHOLD) + ' ' + str(TP95_THRESHOLD) + ' ' + str(TP99_THRESHOLD))
if
key
not
in
DATA_DICT
:
data_dict_init
(
key
)
DATA_DICT
[
key
][
'numbers'
]
+=
1
DATA_DICT
[
key
][
'counts'
]
+=
val_dict
[
'count'
]
DATA_DICT
[
key
][
'module'
]
=
module
DATA_DICT
[
key
][
'action'
]
=
action
if
DATA_DICT
[
key
][
'tp50'
]
<
val_dict
[
'tp50'
]:
DATA_DICT
[
key
][
'tp50'
]
=
val_dict
[
'tp50'
]
if
DATA_DICT
[
key
][
'tp50'
]
>=
TP50_THRESHOLD
:
DATA_DICT
[
key
][
'alert'
]
=
True
DATA_DICT
[
key
][
'start_time'
]
=
val_dict
[
'start_time'
]
if
DATA_DICT
[
key
][
'tp95'
]
<
val_dict
[
'tp95'
]:
DATA_DICT
[
key
][
'tp95'
]
=
val_dict
[
'tp95'
]
if
DATA_DICT
[
key
][
'tp95'
]
>=
TP95_THRESHOLD
:
DATA_DICT
[
key
][
'alert'
]
=
True
DATA_DICT
[
key
][
'start_time'
]
=
val_dict
[
'start_time'
]
if
DATA_DICT
[
key
][
'tp99'
]
<
val_dict
[
'tp99'
]:
DATA_DICT
[
key
][
'tp99'
]
=
val_dict
[
'tp99'
]
if
DATA_DICT
[
key
][
'tp99'
]
>=
TP99_THRESHOLD
:
DATA_DICT
[
key
][
'alert'
]
=
True
DATA_DICT
[
key
][
'start_time'
]
=
val_dict
[
'start_time'
]
alert_message
=
''
for
keys
in
DATA_DICT
:
# print(DATA_DICT[keys])
if
DATA_DICT
[
keys
][
'alert'
]:
alert_message
+=
add_alert_message
(
keys
)
# 触发报警
if
alert_message
is
not
None
and
len
(
alert_message
)
>
0
:
alert
(
alert_message
)
except
Exception
as
e
:
alert
(
'python消费耗时指标报警脚本,function of deal with data , error message is '
+
str
(
e
))
raise
e
def
consumer_message
():
consumer
=
create_consumer
()
print
(
'[info] create and get consumer client api'
)
try
:
while
True
:
# 开始时间
start_time
=
datetime
.
datetime
.
now
()
# 当前时间,用于计算批次时间
now_time
=
datetime
.
datetime
.
now
()
# 记录单批次中非空的最后的message,用于commit message's offset
notNone_msg
=
None
msg_list
=
[]
msg_list_len
=
0
while
(
now_time
-
start_time
)
.
total_seconds
()
<=
TIME_INTERVAL
and
msg_list_len
<=
MAX_NUM
:
now_time
=
datetime
.
datetime
.
now
()
try
:
msg
=
consumer
.
poll
(
MESSAGE_POLL_TIMEOUT_S
)
if
msg
is
None
:
# print(msg)
continue
if
msg
.
error
():
if
msg
.
error
()
.
code
()
==
KafkaError
.
PARTITION_EOF
:
pass
else
:
# message poll出现错误,抛出异常
print
(
'[info] kafka poll message is error, error message is '
+
str
(
msg
.
error
()))
raise
# 如果msg是空行
if
not
msg
.
value
():
continue
notNone_msg
=
msg
# print(msg.value())
# 向msg_list中添加msg.value
msg_list
.
append
(
msg
.
value
())
msg_list_len
+=
1
except
Exception
as
e
:
# 如果出现异常需要钉钉报警
alert
(
'python消费耗时指标报警脚本,function of consumer message , error message is '
+
str
(
e
))
raise
e
if
notNone_msg
:
consumer
.
commit
(
notNone_msg
)
if
msg_list_len
>
0
:
deal_with_data
(
msg_list
)
except
Exception
as
e
:
# 抛出异常后报警
alert
(
'python消费耗时指标报警脚本,function of consumer message , error message is '
+
str
(
e
))
raise
e
finally
:
if
consumer
:
print
(
'[info] close consumer'
)
consumer
.
close
()
def
create_consumer
():
print
(
'[info] go into create consumer function'
)
try
:
consumer_config
=
{
'bootstrap.servers'
:
SERVER
,
'group.id'
:
GROUP
,
'enable.auto.commit'
:
False
,
'default.topic.config'
:
{
'auto.offset.reset'
:
'largest'
# 'smallest'
}
}
print
(
'[info] consumer config is: '
+
str
(
consumer_config
))
consumer
=
Consumer
(
consumer_config
)
consumer
.
subscribe
(
TOPIC
)
print
(
'[info] consumer subscribe topic is '
+
str
(
TOPIC
))
return
consumer
except
Exception
as
e
:
# 抛出异常后报警
alert
(
'python消费耗时指标报警脚本,function of create consumer , error message is '
+
str
(
e
.
message
))
raise
e
if
__name__
==
'__main__'
:
consumer_message
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment