celery_stats_monitor.py 3.12 KB
# -*- coding:UTF-8 -*-
# @Time  : 2020/12/2 15:12
# @File  : celery_stats_monitor.py
# @email : litao@igengmei.com
# @author : litao
# 索引同步redis队列监测
import redis
from send_msg_to_dingding.send_msg import send_msg_to_dingtalk
import datetime, time
token_dict = {
    'gaia-dbmw':{'secret':"SECba5212dadad3794b3da51c903c828f60ab8342897af2675f1f48fceb8858eb5c",'access_token':"df546521ce46bfb35025ca266efc2d7e8d708d1c8ada9b15ae487786ad06ad12"},
    'tapir-gaia-service':{'secret':"SEC65d8ac5f9c92677cf0c98624810abc407cf433fd4f2713649dc41310b4658fb5",'access_token':"ca4ca402653c7fe6011c18ff5ac385b2b0f4ab6cab61c545f09f4d8830db6870"},
    'mimas-dbmw':{'secret':"SECba5212dadad3794b3da51c903c828f60ab8342897af2675f1f48fceb8858eb5c",'access_token':"df546521ce46bfb35025ca266efc2d7e8d708d1c8ada9b15ae487786ad06ad12"},
}

# send_msg_to_dingtalk("123",secret=secret,access_token=access_token)



redis_old_gaia = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN11@172.16.40.166:6379/9", decode_responses=True)
redis_old_mimas = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN3@172.16.40.145:6379/5", decode_responses=True)


class Parse_data:
    def __init__(self,top=200,consume=500):
        self.top = top
        self.consume = consume
        self.offset_dict = {}

    def parse(self,key_name,len_data,per_sec=10):
        now = datetime.datetime.now()
        time_ts = now.strftime("%Y-%m-%d %H:%M:%S")
        if len_data <= self.top:
            self.offset_dict[key_name] = len_data
            return None
        if self.offset_dict.get(key_name):
            old_len_data = self.offset_dict.get(key_name)
            len_data_diff = old_len_data - len_data
            rate = len_data_diff / per_sec
            self.offset_dict[key_name] = len_data
            if len_data_diff > 0:
                use_time = round(len_data/rate/(60/per_sec)/60,1)
                return "{} 目前{}队列中有{}条数据,预计{}小时后处理完成".format(time_ts,key_name,str(len_data),str(use_time))
            else:
                if abs(rate) > 0:
                    return "{} {}队列增长中,目前有{}条数据".format(time_ts, key_name,str(len_data))


        self.offset_dict[key_name] = len_data
        return None


def len_list_and_send_msg(redis_clint,key_name,rules,per_sec=10):
    # redis_clint = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN3@172.16.40.145:6379/5", decode_responses=True)
    res = redis_clint.llen(key_name)
    # print(key_name,res)
    str_res = rules.parse(key_name,res,per_sec=per_sec)
    if str_res:
        send_msg_to_dingtalk(str_res,secret=token_dict[key_name]['secret'],access_token=token_dict[key_name]['access_token'])


if __name__ == "__main__":
    rules = Parse_data()
    redis_clicnt_dct = {
        "gaia-dbmw": redis_old_gaia,
        "tapir-gaia-service": redis_old_gaia,
        "mimas-dbmw":redis_old_mimas
    }
    per_sec = 10
    while True:
        now = datetime.datetime.now()
        if now.second % per_sec == 0:
            for redis_clint_key in redis_clicnt_dct:
                len_list_and_send_msg(redis_clicnt_dct[redis_clint_key],redis_clint_key,rules,per_sec)
        time.sleep(1)