# -*- coding:UTF-8 -*- # @Time : 2020/12/2 15:12 # @File : celery_stats_monitor.py # @email : litao@igengmei.com # @author : litao import redis from send_msg_to_dingding.send_msg import send_msg_to_dingtalk import datetime, time token_dict = { 'gaia-dbmw':{'secret':"SECba5212dadad3794b3da51c903c828f60ab8342897af2675f1f48fceb8858eb5c",'access_token':"df546521ce46bfb35025ca266efc2d7e8d708d1c8ada9b15ae487786ad06ad12"}, 'tapir-gaia-service':{'secret':"SEC65d8ac5f9c92677cf0c98624810abc407cf433fd4f2713649dc41310b4658fb5",'access_token':"ca4ca402653c7fe6011c18ff5ac385b2b0f4ab6cab61c545f09f4d8830db6870"}, 'mimas-dbmw':{'secret':"SECba5212dadad3794b3da51c903c828f60ab8342897af2675f1f48fceb8858eb5c",'access_token':"df546521ce46bfb35025ca266efc2d7e8d708d1c8ada9b15ae487786ad06ad12"}, } # send_msg_to_dingtalk("123",secret=secret,access_token=access_token) redis_old_gaia = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN11@172.16.40.166:6379/9", decode_responses=True) redis_old_mimas = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN3@172.16.40.145:6379/5", decode_responses=True) class Parse_data: def __init__(self,top=200,consume=500): self.top = top self.consume = consume self.offset_dict = {} def parse(self,key_name,len_data,per_sec=10): now = datetime.datetime.now() time_ts = now.strftime("%Y-%m-%d %H:%M:%S") if len_data <= self.top: self.offset_dict[key_name] = len_data return None if self.offset_dict.get(key_name): old_len_data = self.offset_dict.get(key_name) len_data_diff = old_len_data - len_data rate = len_data_diff / per_sec self.offset_dict[key_name] = len_data if len_data_diff > 0: use_time = round(len_data/rate/(60/per_sec)/60,1) return "{} 目前{}队列中有{}条数据,预计{}小时后处理完成".format(time_ts,key_name,str(len_data),str(use_time)) else: if abs(rate) > 0: return "{} {}队列增长中,目前有{}条数据".format(time_ts, key_name,str(len_data)) self.offset_dict[key_name] = len_data return None def len_list_and_send_msg(redis_clint,key_name,rules,per_sec=10): # redis_clint = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN3@172.16.40.145:6379/5", decode_responses=True) res = redis_clint.llen(key_name) # print(key_name,res) str_res = rules.parse(key_name,res,per_sec=per_sec) if str_res: send_msg_to_dingtalk(str_res,secret=token_dict[key_name]['secret'],access_token=token_dict[key_name]['access_token']) if __name__ == "__main__": rules = Parse_data() redis_clicnt_dct = { "gaia-dbmw": redis_old_gaia, "tapir-gaia-service": redis_old_gaia, "mimas-dbmw":redis_old_mimas } per_sec = 10 while True: now = datetime.datetime.now() if now.second % per_sec == 0: for redis_clint_key in redis_clicnt_dct: len_list_and_send_msg(redis_clicnt_dct[redis_clint_key],redis_clint_key,rules,per_sec) time.sleep(1)