1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# -*- coding:UTF-8 -*-
# @Time : 2020/12/2 15:12
# @File : celery_stats_monitor.py
# @email : litao@igengmei.com
# @author : litao
# 索引同步redis队列监测
import redis
from send_msg_to_dingding.send_msg import send_msg_to_dingtalk
import datetime, time
token_dict = {
'gaia-dbmw':{'secret':"SECba5212dadad3794b3da51c903c828f60ab8342897af2675f1f48fceb8858eb5c",'access_token':"df546521ce46bfb35025ca266efc2d7e8d708d1c8ada9b15ae487786ad06ad12"},
'tapir-gaia-service':{'secret':"SEC65d8ac5f9c92677cf0c98624810abc407cf433fd4f2713649dc41310b4658fb5",'access_token':"ca4ca402653c7fe6011c18ff5ac385b2b0f4ab6cab61c545f09f4d8830db6870"},
'mimas-dbmw':{'secret':"SECba5212dadad3794b3da51c903c828f60ab8342897af2675f1f48fceb8858eb5c",'access_token':"df546521ce46bfb35025ca266efc2d7e8d708d1c8ada9b15ae487786ad06ad12"},
}
# send_msg_to_dingtalk("123",secret=secret,access_token=access_token)
redis_old_gaia = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN11@172.16.40.166:6379/9", decode_responses=True)
redis_old_mimas = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN3@172.16.40.145:6379/5", decode_responses=True)
class Parse_data:
def __init__(self,top=200,consume=500):
self.top = top
self.consume = consume
self.offset_dict = {}
def parse(self,key_name,len_data,per_sec=10):
now = datetime.datetime.now()
time_ts = now.strftime("%Y-%m-%d %H:%M:%S")
if len_data <= self.top:
self.offset_dict[key_name] = len_data
return None
if self.offset_dict.get(key_name):
old_len_data = self.offset_dict.get(key_name)
len_data_diff = old_len_data - len_data
rate = len_data_diff / per_sec
self.offset_dict[key_name] = len_data
if len_data_diff > 0:
use_time = round(len_data/rate/(60/per_sec)/60,1)
return "{} 目前{}队列中有{}条数据,预计{}小时后处理完成".format(time_ts,key_name,str(len_data),str(use_time))
else:
if abs(rate) > 0:
return "{} {}队列增长中,目前有{}条数据".format(time_ts, key_name,str(len_data))
self.offset_dict[key_name] = len_data
return None
def len_list_and_send_msg(redis_clint,key_name,rules,per_sec=10):
# redis_clint = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN3@172.16.40.145:6379/5", decode_responses=True)
res = redis_clint.llen(key_name)
# print(key_name,res)
str_res = rules.parse(key_name,res,per_sec=per_sec)
if str_res:
send_msg_to_dingtalk(str_res,secret=token_dict[key_name]['secret'],access_token=token_dict[key_name]['access_token'])
if __name__ == "__main__":
rules = Parse_data()
redis_clicnt_dct = {
"gaia-dbmw": redis_old_gaia,
"tapir-gaia-service": redis_old_gaia,
"mimas-dbmw":redis_old_mimas
}
per_sec = 10
while True:
now = datetime.datetime.now()
if now.second % per_sec == 0:
for redis_clint_key in redis_clicnt_dct:
len_list_and_send_msg(redis_clicnt_dct[redis_clint_key],redis_clint_key,rules,per_sec)
time.sleep(1)