statistics.py 4.75 KB
#!/usr/local/bin/python2.7
# -*- coding:utf-8 -*-

import pymysql
import logging
import json
import urllib2
import time
import datetime
import sys

reload(sys)
sys.setdefaultencoding('utf8')
syncer_monitor_home = "/srv/apps/flink-monitor/libs"
date_str = time.strftime('%Y%m%d', time.localtime())
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ten_minites_ago_time_str = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")


# mysql操作工具类
class MysqlOperator:
    def __init__(self, host, port, user, password, db, charset='utf8'):
        self.connect = pymysql.connect(
            host=host,
            port=port,
            user=user,
            password=password,
            db=db,
            charset=charset, )

    def __execute_sql(self, sql):
        with self.connect.cursor() as cursor:
            cursor.execute(sql)
            data = cursor.fetchall()
        self.connect.commit()
        return data

    def select_data(self, sql):
        data = self.__execute_sql(sql)
        return data

    def execute_sql(self, sql):
        self.__execute_sql(sql)

    def close_connect(self):
        self.connect.close()


# 获取日志解析异常数
def get_err_count(sql):
    # operator = MysqlOperator('172.16.30.143', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing')
    operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
    rs = operator.select_data(sql)[0][0]
    operator.close_connect()
    return rs


# 获取查询结果集合
def get_rs_list(sql):
    # operator = MysqlOperator('172.16.40.170', 4000, 'data_user', 'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6', 'dw_ods')
    operator = MysqlOperator('test002', 3306, 'root', '5OqYM^zLwotJ3oSo', 'flink_monitor')
    rs = operator.select_data(sql)
    operator.close_connect()
    return rs


# 发送钉钉消息
def send_dingding(summary_msg):
    ding_talk = {
        "msgtype": "text",
        "text": {
            "content": summary_msg
        },
        "at": {
            # online
            # "atMobiles": ["13021286565"],
            # test
            "atMobiles": ["13051007691"],
            "isAtAll": False
        }
    }
    # online
    # ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=68d7d6e9aaf81ebbf0f5228a3eadf769f1af0a7b0cb3dcb8fb8885dc5d93054f '
    # test
    ding_url = 'https://oapi.dingtalk.com/robot/send?access_token=f7706a17b6de3ab6318806d1ed3d31fc68642bee99693280ee9a1591ab978c51 '

    ding_content = json.dumps(ding_talk)
    ding_header = {'Content-Type': 'application/json;charset=UTF-8'}
    req = urllib2.Request(url=ding_url, data=ding_content, headers=ding_header)
    res = urllib2.urlopen(req)
    print res
    logging.info(res)


# 字符串格式化
def strip_str(str):
    return str.strip().replace(' ', '').replace('\n', '').replace('\t', '').replace('\r', '').strip()


# 校验画像打点是否正常
def check_is_ok():
    logging.basicConfig(level=logging.INFO,
                        filename='/data/log/flink-monitor/flink-monitor.log.' + date_str,
                        filemode='a',
                        format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
                        )
    is_send = False
    error_msg = "用户画像打点异常预警(近十分钟):\n"

    mapping_sql = "select action,name from tbl_mapping_action_name"
    err_sql = "select count(1) from tbl_monitor_portrait_err where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
    shd_sql = "select action_shd,count_shd from tbl_monitor_portrait_shd where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"
    suc_sql = "select action_suc,count_suc from tbl_monitor_portrait_suc where monitor_time >= '" + ten_minites_ago_time_str + "' and monitor_time <= '" + current_time_str + "'"

    err_count = get_err_count(err_sql)
    mapping_list = get_rs_list(mapping_sql)
    shd_list = get_rs_list(shd_sql)
    suc_list = get_rs_list(suc_sql)

    if err_count > 0:
        is_send = True
        error_msg += "\t日志解析异常的条数为:" + bytes(err_count) + ", 请核实!\n"

    mapping_dic = {}
    for mapping in mapping_list:
        mapping_dic[mapping[0]] = mapping[1]

    shd_dic = {}
    for shd in shd_list:
        shd_dic[shd[0]] = shd[1]

    for suc in suc_list:
        if shd_dic[suc[0]] != suc[1]:
            is_send = True
            error_msg += "【" + mapping_dic[suc[0]] + "】打点异常,应打点个数为:" + bytes(shd_dic[suc[0]]) + ",实际打点个数为:" + bytes(
                suc[1]) + ", 请核实!\n"

    if is_send:
        logging.error(error_msg)
        send_dingding(error_msg)


# 主入口
if __name__ == '__main__':
    check_is_ok()