# coding:utf-8 """ 自动分发数据监控 airflow定时监控,每隔10分钟监控一次 mysql 里面的数据和es里面的数据是否一致 """ import pymysql import datetime from dingtalkchatbot.chatbot import DingtalkChatbot import settings_local from elasticsearch import Elasticsearch def main(): es = Elasticsearch(settings_local.ES_CONFIG, http_auth=settings_local.ES_HTTP_AUTH, timeout=3600) zhengxing = settings_local.DATABASES["zhengxing"] zhengxing_db = pymysql.connect( host=zhengxing["HOST"], user=zhengxing["USER"], passwd=zhengxing["PASSWORD"], database=zhengxing["NAME"], port=int(zhengxing["PORT"])) zhengxing_cursor = zhengxing_db.cursor() limit = 100 assigners_sql = """ select hera.user_id as adviser_id, hera.is_leader as is_leader, hera.is_receive as is_accept_newuser, hera.is_delete as is_deleted, w.job_grade as job_grade, w.limit_count as clue_num_upper_bound from hera_bdtransfer_auto_assignor as hera left join api_bdtransfer_jobgrade_limit as w on hera.jobgrade_limit_id = w.id group by hera.user_id""" try: zhengxing_cursor.execute(assigners_sql) assigners = zhengxing_cursor.fetchall() adviser_ids = [item[0] for item in assigners] total = len(adviser_ids) mysql_advisers = {} for item in assigners: mysql_advisers[item[0]] = {} mysql_advisers[item[0]]['is_leader'] = item[1] mysql_advisers[item[0]]['is_accept_newuser'] = item[2] mysql_advisers[item[0]]['is_deleted'] = item[3] mysql_advisers[item[0]]['job_grade'] = item[4] mysql_advisers[item[0]]['clue_num_upper_bound'] = item[5] # 批量获取es query = { "_source": ["adviser_id", "is_leader", "is_accept_newuser", "is_deleted", "job_grade", "clue_num_upper_bound"], "query": {"bool": { "filter": [ {"terms": {"adviser_id": adviser_ids}}]}}} es_result = es.search(query, settings_local.ES_ZIDONGFENFA_INDEX) num = 0.0 error_users = [] for item in es_result["hits"]["hits"]: adviser_id = item['_source'].pop('adviser_id') es_result = item['_source'] mysql_result = mysql_advisers[adviser_id] if (es_result==mysql_result): pass else: print(""" adviser_id: {}, mysql_result: {}, es_result: {}""".format( adviser_id, mysql_result, es_result )) num = num + 1 error_users.append(adviser_id) print(num) if num / total > 0.02: msg = "数据同步监控,自动分发总共{}个用户,有{}个数据存在差异,具体用户id:{}".format(total, len(error_users), error_users) webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format( "1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869") xiaoding = DingtalkChatbot(webhook) at_mobiles = [] xiaoding.send_text(msg=msg, at_mobiles=at_mobiles) except Exception as e: msg = "数据同步监控,数据diff出现问题" webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format( "1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869") xiaoding = DingtalkChatbot(webhook) at_mobiles = ["17794411132"] # at_mobiles = [] xiaoding.send_text(msg=msg, at_mobiles=at_mobiles) if __name__ == '__main__': main()