Commit 7021165e authored by 王瑞环's avatar 王瑞环

添加自动分发任务监控

parent 9ee3064c
# coding:utf-8
"""
自动分发数据监控
airflow定时监控,每隔10分钟监控一次
mysql 里面的数据和es里面的数据是否一致
"""
import pymysql
import datetime
from dingtalkchatbot.chatbot import DingtalkChatbot
import settings_local
from elasticsearch import Elasticsearch
def main():
es = Elasticsearch(settings_local.ES_CONFIG, http_auth=settings_local.ES_HTTP_AUTH, timeout=3600)
zhengxing = settings_local.DATABASES["zhengxing"]
zhengxing_db = pymysql.connect(
host=zhengxing["HOST"],
user=zhengxing["USER"],
passwd=zhengxing["PASSWORD"],
database=zhengxing["NAME"],
port=int(zhengxing["PORT"]))
zhengxing_cursor = zhengxing_db.cursor()
limit = 100
assigners_sql = """
select
hera.user_id as adviser_id,
hera.is_leader as is_leader,
hera.is_receive as is_accept_newuser,
hera.is_delete as is_deleted,
w.job_grade as job_grade,
w.limit_count as clue_num_upper_bound
from hera_bdtransfer_auto_assignor as hera
left join api_bdtransfer_jobgrade_limit as w on hera.jobgrade_limit_id = w.id
group by hera.user_id"""
try:
zhengxing_cursor.execute(assigners_sql)
assigners = zhengxing_cursor.fetchall()
adviser_ids = [item[0] for item in assigners]
total = len(adviser_ids)
mysql_advisers = {}
for item in assigners:
mysql_advisers[item[0]] = {}
mysql_advisers[item[0]]['is_leader'] = item[1]
mysql_advisers[item[0]]['is_accept_newuser'] = item[2]
mysql_advisers[item[0]]['is_deleted'] = item[3]
mysql_advisers[item[0]]['job_grade'] = item[4]
mysql_advisers[item[0]]['clue_num_upper_bound'] = item[5]
# 批量获取es
query = {
"_source": ["adviser_id", "is_leader", "is_accept_newuser", "is_deleted", "job_grade", "clue_num_upper_bound"],
"query": {"bool": {
"filter": [
{"terms": {"adviser_id": adviser_ids}}]}}}
es_result = es.search(query, settings_local.ES_ZIDONGFENFA_INDEX)
num = 0.0
error_users = []
for item in es_result["hits"]["hits"]:
adviser_id = item['_source'].pop('adviser_id')
es_result = item['_source']
mysql_result = mysql_advisers[adviser_id]
if (es_result==mysql_result):
pass
else:
print("""
adviser_id: {},
mysql_result: {},
es_result: {}""".format(
adviser_id,
mysql_result,
es_result
))
num = num + 1
error_users.append(adviser_id)
print(num)
if num / total > 0.02:
msg = "数据同步监控,自动分发总共{}个用户,有{}个数据存在差异,具体用户id:{}".format(total, len(error_users), error_users)
webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(
"1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869")
xiaoding = DingtalkChatbot(webhook)
at_mobiles = []
xiaoding.send_text(msg=msg, at_mobiles=at_mobiles)
except Exception as e:
msg = "数据同步监控,数据diff出现问题"
webhook = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(
"1475fb9e6beff63126fef5464a378b6b77fa8655933a407d8159a2a2b1b8c869")
xiaoding = DingtalkChatbot(webhook)
at_mobiles = ["17794411132"]
# at_mobiles = []
xiaoding.send_text(msg=msg, at_mobiles=at_mobiles)
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -12,6 +12,18 @@ DATABASES = {
"charset": "utf8mb4",
},
},
'zhengxing': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'zhengxing',
'USER': 'work',
'PASSWORD': 'Gengmei1',
'HOST': 'bj-cdb-6slgqwlc.sql.tencentcdb.com',
'PORT': '62120',
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
},
}
ES_CONFIG = [
......@@ -23,3 +35,5 @@ ES_CONFIG = [
ES_HTTP_AUTH = ('elastic', 'gm_test')
ES_INDEX = 'zhuanzhen'
ES_ZIDONGFENFA_INDEX = 'gm_test-adviser-write'
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment