Commit 8a0f5562 authored by 唐香港's avatar 唐香港

Add new file

parent 023e1d5b
# -*- coding: utf-8 -*-
"""
pip install confluent-kafka==1.2.0
pip install requests==2.6.0
"""
import json
from confluent_kafka import Consumer, KafkaError
import requests
import datetime
# kafka获取数据的允许延迟时间
MESSAGE_POLL_TIMEOUT_S = 3
# kafka批量获取数据的时间间隔,例如:每10秒表示一个批次进行数据处理
TIME_INTERVAL = 10
# 一个批次的最大数据条数
MAX_NUM = 1000
# 黑名单
"""
blacklist = {
'module1': ['action_not_alret', 'action_not_alret_2'],
# 如果module为空,表示该module的所有action全部过滤
'module_not_alret':[]
}
"""
blacklist = {
'module1': ['action_not_alret', 'action_not_alret_2'],
'hermes': []
}
# 白名单
"""
whitelist = {
'module':{
'tp50':
'tp95':
'tp99':
'action':{
'tp50':
'tp95':
'tp99':
}
}
# tp50阀值
'tp50':
# tp95阀值
'tp95':
# tp99阀值
'tp99':
}
"""
whitelist = {
'tp50': 30.0,
'tp95': 60.0,
'tp99': 120.0,
'gaia': {
'tp50': 5.0,
'tp99': 20.0,
},
'mimas': {
'tp50': 5.0,
'tp99': 20.0,
},
'doris': {
'tp50': 5.0,
'tp99': 20.0,
},
'flag-ship': {
'tp50': 5.0,
'tp99': 20.0,
},
'ship': {
'tp50': 5.0,
'tp99': 20.0,
},
'backend': {
'tp50': 5.0,
'tp99': 12.0,
'api/service/detail/v1$': {
'tp50': 1.5,
'tp99': 5.0,
},
'api/index/v7': {
'tp50': 1.2,
'tp99': 5.0,
},
'api/search/v2/service$': {
'tp50': 4.0,
'tp99': 10.0,
},
'api/personal/recommends$': {
'tp50': 1.5,
'tp99': 3.0,
},
'api/service/home/v3$': {
'tp50': 1.5,
'tp99': 4.0,
},
'api/search/v5/content$': {
'tp50': 3.0,
'tp99': 8.0,
},
'api/settlement/preview/v1$': {
'tp50': 1.5,
'tp99': 3.0,
},
}
}
# kafka topic server
SERVER = '172.16.44.25:9092'
# kafka condumer group
GROUP = 'python-alert'
# kafka consumer client id
CLIENT = 'client01'
# kafka topic
TOPIC = ['rt_gm_logging_perf_data_window']
# dingding url
API_URL = 'https://oapi.dingtalk.com/robot/send?access_token' \
'=dac084248b38ef564c30e7f7d0c3901f3967c8e5ffdb33efe188495d5b058fdd'
def alert(text):
headers = {'Content-Type': 'application/json;charset=utf-8'}
json_text = {
"msgtype": "text",
"at": {
"atMobiles": [
"17864308072"
],
"isAtAll": False
},
"text": {
"content": text
}
}
text = json.dumps(json_text)
# print(text)
requests.post(API_URL, text, headers=headers)
DATA_DICT = {}
def data_dict_init(key):
DATA_DICT[key] = {
'alert': False,
'module': None,
'action': None,
'numbers': 0,
'counts': 0,
'tp50': 0.0,
'tp95': 0.0,
'tp99': 0.0,
'start_time': None
}
def add_alert_message(key):
return '''
module名字: %s,
action名字: %s,
本批次中出现module_action的次数: %d,
本批次中module_action的总的请求数量(counts): %d,
出现module_action的次数中最大的tp50: %f
出现module_action的次数中最大的tp95: %f
出现module_action的次数中最大的tp99: %f
本批次中tp值最晚达到阈值的start_time: %s
-------------------------------------------
''' % (
str(DATA_DICT[key]['module']),
str(DATA_DICT[key]['action']),
int(DATA_DICT[key]['numbers']),
int(DATA_DICT[key]['counts']),
float(DATA_DICT[key]['tp50']),
float(DATA_DICT[key]['tp95']),
float(DATA_DICT[key]['tp99']),
str(DATA_DICT[key]['start_time'])
)
def deal_with_data(msg_list):
# print('deal with data start .....')
DATA_DICT.clear()
try:
for val in msg_list:
val_dict = json.loads(val)
module = val_dict['module']
action = val_dict['action']
# 黑名单过滤
if module in blacklist:
if blacklist[module]:
if action in blacklist[module]:
# print(str(module) + ' ' + str(action) + ' 被过滤')
continue
else:
# print(str(module) + ' 被过滤')
continue
# 白名单过滤
TP50_THRESHOLD = 999999.999
TP95_THRESHOLD = 999999.999
TP99_THRESHOLD = 999999.999
if 'tp50' in whitelist:
TP50_THRESHOLD = whitelist['tp50']
if 'tp95' in whitelist:
TP95_THRESHOLD = whitelist['tp95']
if 'tp99' in whitelist:
TP99_THRESHOLD = whitelist['tp99']
if module in whitelist:
if 'tp50' in whitelist[module]:
TP50_THRESHOLD = whitelist[module]['tp50']
if 'tp95' in whitelist[module]:
TP95_THRESHOLD = whitelist[module]['tp95']
if 'tp99' in whitelist[module]:
TP99_THRESHOLD = whitelist[module]['tp99']
if action in whitelist[module]:
if 'tp50' in whitelist[module][action]:
TP50_THRESHOLD = whitelist[module][action]['tp50']
if 'tp95' in whitelist[module][action]:
TP95_THRESHOLD = whitelist[module][action]['tp95']
if 'tp99' in whitelist[module][action]:
TP99_THRESHOLD = whitelist[module][action]['tp99']
key = val_dict['module'] + '_' + val_dict['action']
# print('白名单 : ' + str(key) + ' ' + str(TP50_THRESHOLD) + ' ' + str(TP95_THRESHOLD) + ' ' + str(TP99_THRESHOLD))
if key not in DATA_DICT:
data_dict_init(key)
DATA_DICT[key]['numbers'] += 1
DATA_DICT[key]['counts'] += val_dict['count']
DATA_DICT[key]['module'] = module
DATA_DICT[key]['action'] = action
if DATA_DICT[key]['tp50'] < val_dict['tp50']:
DATA_DICT[key]['tp50'] = val_dict['tp50']
if DATA_DICT[key]['tp50'] >= TP50_THRESHOLD:
DATA_DICT[key]['alert'] = True
DATA_DICT[key]['start_time'] = val_dict['start_time']
if DATA_DICT[key]['tp95'] < val_dict['tp95']:
DATA_DICT[key]['tp95'] = val_dict['tp95']
if DATA_DICT[key]['tp95'] >= TP95_THRESHOLD:
DATA_DICT[key]['alert'] = True
DATA_DICT[key]['start_time'] = val_dict['start_time']
if DATA_DICT[key]['tp99'] < val_dict['tp99']:
DATA_DICT[key]['tp99'] = val_dict['tp99']
if DATA_DICT[key]['tp99'] >= TP99_THRESHOLD:
DATA_DICT[key]['alert'] = True
DATA_DICT[key]['start_time'] = val_dict['start_time']
alert_message = ''
for keys in DATA_DICT:
# print(DATA_DICT[keys])
if DATA_DICT[keys]['alert']:
alert_message += add_alert_message(keys)
# 触发报警
if alert_message is not None and len(alert_message) > 0:
alert(alert_message)
except Exception as e:
alert('python消费耗时指标报警脚本,function of deal with data , error message is ' + str(e))
raise e
def consumer_message():
consumer = create_consumer()
print('[info] create and get consumer client api')
try:
while True:
# 开始时间
start_time = datetime.datetime.now()
# 当前时间,用于计算批次时间
now_time = datetime.datetime.now()
# 记录单批次中非空的最后的message,用于commit message's offset
notNone_msg = None
msg_list = []
msg_list_len = 0
while (now_time - start_time).total_seconds() <= TIME_INTERVAL and msg_list_len <= MAX_NUM:
now_time = datetime.datetime.now()
try:
msg = consumer.poll(MESSAGE_POLL_TIMEOUT_S)
if msg is None:
# print(msg)
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
pass
else:
# message poll出现错误,抛出异常
print('[info] kafka poll message is error, error message is ' + str(msg.error()))
raise
# 如果msg是空行
if not msg.value():
continue
notNone_msg = msg
# print(msg.value())
# 向msg_list中添加msg.value
msg_list.append(msg.value())
msg_list_len += 1
except Exception as e:
# 如果出现异常需要钉钉报警
alert('python消费耗时指标报警脚本,function of consumer message , error message is ' + str(e))
raise e
if notNone_msg:
consumer.commit(notNone_msg)
if msg_list_len > 0:
deal_with_data(msg_list)
except Exception as e:
# 抛出异常后报警
alert('python消费耗时指标报警脚本,function of consumer message , error message is ' + str(e))
raise e
finally:
if consumer:
print('[info] close consumer')
consumer.close()
def create_consumer():
print('[info] go into create consumer function')
try:
consumer_config = {
'bootstrap.servers': SERVER,
'group.id': GROUP,
'enable.auto.commit': False,
'default.topic.config': {
'auto.offset.reset': 'largest' # 'smallest'
}
}
print('[info] consumer config is: ' + str(consumer_config))
consumer = Consumer(consumer_config)
consumer.subscribe(TOPIC)
print('[info] consumer subscribe topic is ' + str(TOPIC))
return consumer
except Exception as e:
# 抛出异常后报警
alert('python消费耗时指标报警脚本,function of create consumer , error message is ' + str(e.message))
raise e
if __name__ == '__main__':
consumer_message()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment