1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-
import pymysql
import logging
import json
import urllib2
import time
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf8')
syncer_monitor_home = "/srv/apps/flink-monitor/libs"
date_str = time.strftime('%Y%m%d', time.localtime())
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
# mysql操作工具类
class MysqlOperator:
def __init__(self, host, port, user, password, db, charset='utf8'):
self.connect = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset, )
def __execute_sql(self, sql):
with self.connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
self.connect.commit()
return data
def select_data(self, sql):
data = self.__execute_sql(sql)
return data
def execute_sql(self, sql):
self.__execute_sql(sql)
def close_connect(self):
self.connect.close()
# 获取日志解析异常数
def get_err_count(sql):
# operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
rs = operator.select_data(sql)[0][0]
operator.close_connect()
return rs
# 获取查询结果集合
def get_rs_list(sql):
# operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'dw_ods')
operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
rs = operator.select_data(sql)
operator.close_connect()
return rs
# 发送钉钉消息
def send_dingding(summary_msg):
ding_talk = {
"msgtype": "text",
"text": {
"content": summary_msg
},
"at": {
# online
# "atMobiles": ["13021286565"],
# test
"atMobiles": ["13051007691"],
"isAtAll": False
}
}
# online
# ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f '
# test
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '
ding_content = json.dumps(ding_talk)
ding_header = {'Content-Type': 'application/json;charset=UTF-8'}
req = urllib2.Request(url=ding_url, data=ding_content, headers=ding_header)
res = urllib2.urlopen(req)
print res
logging.info(res)
# 字符串格式化
def strip_str(str):
return str.strip().replace(' ', '').replace('\n', '').replace('\t', '').replace('\r', '').strip()
# 校验画像打点是否正常
def check_is_ok():
logging.basicConfig(level=logging.INFO,
filename='/data/log/flink-monitor/flink-monitor.log.' + date_str,
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
)
is_send = False
error_msg = "用户画像打点异常预警(近十分钟):\n"
mapping_sql = "select action,name from tbl_mapping_action_name"
err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
err_count = get_err_count(err_sql)
mapping_list = get_rs_list(mapping_sql)
shd_list = get_rs_list(shd_sql)
suc_list = get_rs_list(suc_sql)
if err_count > 0:
is_send = True
error_msg += "\t日志解析异常的条数为:" + bytes(err_count) + ", 请核实!\n"
mapping_dic = {}
for mapping in mapping_list:
mapping_dic[mapping[0]] = mapping[1]
shd_dic = {}
for shd in shd_list:
shd_dic[shd[0]] = shd[1]
for suc in suc_list:
if shd_dic[suc[0]] != suc[1]:
is_send = True
error_msg += "【" + mapping_dic[suc[0]] + "】打点异常,应打点个数为:" + bytes(shd_dic[suc[0]]) + ",实际打点个数为:" + bytes(
suc[1]) + ", 请核实!\n"
if is_send:
logging.error(error_msg)
send_dingding(error_msg)
# 主入口
if __name__ == '__main__':
check_is_ok()