1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# coding:utf-8
"""
自动分发数据监控
airflow定时监控,每隔10分钟监控一次
mysql 里面的数据和es里面的数据是否一致
"""
import pymysql
import datetime
from dingtalkchatbot.chatbot import DingtalkChatbot
import settings_local
from elasticsearch import Elasticsearch
def main():
es = Elasticsearch(settings_local.ES_CONFIG, http_auth=settings_local.ES_HTTP_AUTH, timeout=3600)
zhengxing = settings_local.DATABASES["zhengxing"]
zhengxing_db = pymysql.connect(
host=zhengxing["HOST"],
user=zhengxing["USER"],
passwd=zhengxing["PASSWORD"],
database=zhengxing["NAME"],
port=int(zhengxing["PORT"]))
zhengxing_cursor = zhengxing_db.cursor()
limit = 100
assigners_sql = """
select
hera.user_id as adviser_id,
hera.is_leader as is_leader,
hera.is_receive as is_accept_newuser,
hera.is_delete as is_deleted,
w.job_grade as job_grade,
w.limit_count as clue_num_upper_bound
from hera_bdtransfer_auto_assignor as hera
left join api_bdtransfer_jobgrade_limit as w on hera.jobgrade_limit_id = w.id
group by hera.user_id"""
try:
zhengxing_cursor.execute(assigners_sql)
assigners = zhengxing_cursor.fetchall()
adviser_ids = [item[0] for item in assigners]
total = len(adviser_ids)
mysql_advisers = {}
for item in assigners:
mysql_advisers[item[0]] = {}
mysql_advisers[item[0]]['is_leader'] = item[1]
mysql_advisers[item[0]]['is_accept_newuser'] = item[2]
mysql_advisers[item[0]]['is_deleted'] = item[3]
mysql_advisers[item[0]]['job_grade'] = item[4]
mysql_advisers[item[0]]['clue_num_upper_bound'] = item[5]
# 批量获取es
query = {
"_source": ["adviser_id", "is_leader", "is_accept_newuser", "is_deleted", "job_grade", "clue_num_upper_bound"],
"query": {"bool": {
"filter": [
{"terms": {"adviser_id": adviser_ids}}]}}}
es_result = es.search(query, settings_local.ES_ZIDONGFENFA_INDEX)
num = 0.0
error_users = []
for item in es_result["hits"]["hits"]:
adviser_id = item['_source'].pop('adviser_id')
es_result = item['_source']
mysql_result = mysql_advisers[adviser_id]
if (es_result==mysql_result):
pass
else:
print("""
adviser_id: {},
mysql_result: {},
es_result: {}""".format(
adviser_id,
mysql_result,
es_result
))
num = num + 1
error_users.append(adviser_id)
print(num)
if num / total > 0.02:
msg = "数据同步监控,自动分发总共{}个用户,有{}个数据存在差异,具体用户id:{}".format(total, len(error_users), error_users)
webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(
"1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869")
xiaoding = DingtalkChatbot(webhook)
at_mobiles = []
xiaoding.send_text(msg=msg, at_mobiles=at_mobiles)
except Exception as e:
msg = "数据同步监控,数据diff出现问题"
webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(
"1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869")
xiaoding = DingtalkChatbot(webhook)
at_mobiles = ["17794411132"]
# at_mobiles = []
xiaoding.send_text(msg=msg, at_mobiles=at_mobiles)
if __name__ == '__main__':
main()