zidongfenfa_es_monitor.py 3.76 KB
# coding:utf-8
"""
自动分发数据监控
airflow定时监控,每隔10分钟监控一次
mysql 里面的数据和es里面的数据是否一致
"""
import pymysql
import datetime
from dingtalkchatbot.chatbot import DingtalkChatbot
import settings_local
from elasticsearch import Elasticsearch


def main():
   
    es = Elasticsearch(settings_local.ES_CONFIG, http_auth=settings_local.ES_HTTP_AUTH, timeout=3600)
    zhengxing = settings_local.DATABASES["zhengxing"]
    zhengxing_db = pymysql.connect(
        host=zhengxing["HOST"],
        user=zhengxing["USER"],
        passwd=zhengxing["PASSWORD"],
        database=zhengxing["NAME"],
        port=int(zhengxing["PORT"]))
    zhengxing_cursor = zhengxing_db.cursor()
    limit = 100
    assigners_sql = """
        select 
            hera.user_id as adviser_id, 
            hera.is_leader as is_leader, 
            hera.is_receive as is_accept_newuser, 
            hera.is_delete as is_deleted, 
            w.job_grade as job_grade,
            w.limit_count as clue_num_upper_bound
        from hera_bdtransfer_auto_assignor as hera 
            left join api_bdtransfer_jobgrade_limit as w on hera.jobgrade_limit_id = w.id
        group by hera.user_id"""

    try:
        zhengxing_cursor.execute(assigners_sql)
        assigners = zhengxing_cursor.fetchall()

        adviser_ids = [item[0] for item in assigners]

        total = len(adviser_ids)
        mysql_advisers = {}
        for item in assigners:
            mysql_advisers[item[0]] = {}
            mysql_advisers[item[0]]['is_leader'] = item[1] 
            mysql_advisers[item[0]]['is_accept_newuser'] = item[2] 
            mysql_advisers[item[0]]['is_deleted'] = item[3] 
            mysql_advisers[item[0]]['job_grade'] = item[4]
            mysql_advisers[item[0]]['clue_num_upper_bound'] = item[5]
        # 批量获取es
        query = {
            "_source": ["adviser_id", "is_leader", "is_accept_newuser", "is_deleted", "job_grade", "clue_num_upper_bound"],
            "query": {"bool": {
                "filter": [
                    {"terms": {"adviser_id": adviser_ids}}]}}}
        es_result = es.search(query, settings_local.ES_ZIDONGFENFA_INDEX)
        num = 0.0
        error_users = []
        for item in es_result["hits"]["hits"]:
            adviser_id = item['_source'].pop('adviser_id')
            es_result = item['_source']
            mysql_result = mysql_advisers[adviser_id]
            if (es_result==mysql_result):
                pass
            else:
                print("""
                    adviser_id: {},
                    mysql_result: {},
                    es_result: {}""".format(
                        adviser_id,
                        mysql_result,
                        es_result
                    ))
                num = num + 1
                error_users.append(adviser_id)
        print(num)
        if num / total > 0.02:
            msg = "数据同步监控,自动分发总共{}个用户,有{}个数据存在差异,具体用户id:{}".format(total, len(error_users), error_users)
            webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(
                "1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869")
            xiaoding = DingtalkChatbot(webhook)
            at_mobiles = []
            xiaoding.send_text(msg=msg, at_mobiles=at_mobiles)

    except Exception as e:
        msg = "数据同步监控,数据diff出现问题"
        webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(
            "1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869")
        xiaoding = DingtalkChatbot(webhook)
        at_mobiles = ["17794411132"]
        # at_mobiles = []
        xiaoding.send_text(msg=msg, at_mobiles=at_mobiles)


if __name__ == '__main__':
    main()