#!/usr/local/bin/python2.7 # -*- coding:utf-8 -*- import pymysql import logging import json import urllib2 import time import datetime import sys reload(sys) sys.setdefaultencoding('utf8') syncer_monitor_home = "/srv/apps/flink-monitor/libs" date_str = time.strftime('%Y%m%d', time.localtime()) current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S") # mysql操作工具类 class MysqlOperator: def __init__(self, host, port, user, password, db, charset='utf8'): self.connect = pymysql.connect( host=host, port=port, user=user, password=password, db=db, charset=charset, ) def __execute_sql(self, sql): with self.connect.cursor() as cursor: cursor.execute(sql) data = cursor.fetchall() self.connect.commit() return data def select_data(self, sql): data = self.__execute_sql(sql) return data def execute_sql(self, sql): self.__execute_sql(sql) def close_connect(self): self.connect.close() # 获取日志解析异常数 def get_err_count(sql): # operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing') operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor') rs = operator.select_data(sql)[0][0] operator.close_connect() return rs # 获取查询结果集合 def get_rs_list(sql): # operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'dw_ods') operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor') rs = operator.select_data(sql) operator.close_connect() return rs # 发送钉钉消息 def send_dingding(summary_msg): ding_talk = { "msgtype": "text", "text": { "content": summary_msg }, "at": { # online # "atMobiles": ["13021286565"], # test "atMobiles": ["13051007691"], "isAtAll": False } } # online # ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f ' # test ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 ' ding_content = json.dumps(ding_talk) ding_header = {'Content-Type': 'application/json;charset=UTF-8'} req = urllib2.Request(url=ding_url, data=ding_content, headers=ding_header) res = urllib2.urlopen(req) print res logging.info(res) # 字符串格式化 def strip_str(str): return str.strip().replace(' ', '').replace('\n', '').replace('\t', '').replace('\r', '').strip() # 校验画像打点是否正常 def check_is_ok(): logging.basicConfig(level=logging.INFO, filename='/data/log/flink-monitor/flink-monitor.log.' + date_str, filemode='a', format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s' ) is_send = False error_msg = "用户画像打点异常预警(近十分钟):\n" mapping_sql = "select action,name from tbl_mapping_action_name" err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'" shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'" suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'" err_count = get_err_count(err_sql) mapping_list = get_rs_list(mapping_sql) shd_list = get_rs_list(shd_sql) suc_list = get_rs_list(suc_sql) if err_count > 0: is_send = True error_msg += "\t日志解析异常的条数为:" + bytes(err_count) + ", 请核实!\n" mapping_dic = {} for mapping in mapping_list: mapping_dic[mapping[0]] = mapping[1] shd_dic = {} for shd in shd_list: shd_dic[shd[0]] = shd[1] for suc in suc_list: if shd_dic.get(suc[0]) != suc[1]: is_send = True error_msg += "【" + mapping_dic.get(suc[0]) + "】打点异常,应打点个数为:" + bytes(shd_dic.get(suc[0])) + ",实际打点个数为:" + bytes(suc[1]) + ", 请核实!\n" else: print "all is ok!" if is_send: logging.error(error_msg) send_dingding(error_msg) # 主入口 if __name__ == '__main__': check_is_ok()