import elasticsearch import smtplib from email.message import EmailMessage import datetime import sys es = elasticsearch.Elasticsearch(hosts='192.168.17.13', port=9200) hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) def releaser_fans_alert(st, et): platform_list = ("toutiao", "new_tudou", "腾讯视频", "haokan", "网易新闻", "腾讯新闻", "kwai", "抖音", "weibo") fans_text = "" for platform in platform_list: search_body = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, {"range": {"fetch_time": {"gte": st, "lt": et}}} ] } } } search_resp = es.search(index="releaser_fans", doc_type="doc", body=search_body, request_timeout=100) video_num = search_resp['hits']['total'] fans_text += "平台 "+ platform + " 昨日抓取粉丝量条目数为 %s\n"% video_num return fans_text def videonumber_alert_for_ccr(days_from_running, f_log=None): video_num_criteria = { 'short-video': { 'observed': { 'toutiao': 15000000, 'new_tudou': 1500000, '腾讯视频': 4000000, # 'iqiyi':300, # 'youku':2000, 'haokan': 1200000, "网易新闻": 7000000, '腾讯新闻': 300000, "kwai": 150000, "抖音":150000, }, 'new_released': { 'toutiao': 30000, 'new_tudou': 30000, '腾讯视频': 30000, # 'iqiyi': 100, # 'youku': 100, 'haokan': 17000, '腾讯新闻': 6000, "网易新闻": 5000, "kwai": 5000, "抖音": 1500, } } } email_group = { 'short-video': [ 'zhouyujiang@csm.com.cn', 'hanye@csm.com.cn', "litao@csm.com.cn", "gengdi@csm.com.cn" ] } email_msg_suffix = ('\n\n\n' + '-' * 80 + '\n' + '这是自动发送的邮件,可以不用回复。\n' + 'This is an automatically sent message. You do NOT need to reply.\n') index_short_video = 'crawler-data-raw' doc_type_short_video = 'doc' idx_dict = { 'short-video': { 'index': index_short_video, 'doc_type': doc_type_short_video, } } p = 86400000 # crr = 10800000 today = datetime.datetime.now() - datetime.timedelta(days=days_from_running) t = datetime.datetime(today.year, today.month, today.day) # print(t) stamp = int(t.timestamp() * 1000) # print(stamp) if f_log == None: path = '/home/hanye/project_data/Python/Projects/proj-short-videos/maintenance/log/' log_fn = 'email_alert_for_craw_%s_log' % datetime.datetime.strftime(today, '%b-%Y') f_log = open(path + log_fn, 'a') else: f_log = sys.stdout print('*' * 80, file=f_log) print('log timestamp ', datetime.datetime.now(), file=f_log) print('Checking video number for fetch_date', today.isoformat()[:10], file=f_log) alert_msg = {} for idx in video_num_criteria: alert_msg[idx] = {} for stats_type in ['observed']: alert_msg[idx][stats_type] = [] for platform in video_num_criteria[idx][stats_type]: if stats_type == 'observed': search_body = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, {"range": {"fetch_time": {"gte": stamp, "lt": stamp + p}}} ] } } } else: search_body = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, {"range": {"fetch_time": {"gte": stamp, "lt": stamp + p}}}, {"range": {"release_time": {"gte": stamp - p, "lt": stamp}}} ] } } } search_resp = es.search(index=idx_dict[idx]['index'], doc_type=idx_dict[idx]['doc_type'], body=search_body, request_timeout=100) video_num = search_resp['hits']['total'] fetch_date_str = today.isoformat()[:10] chk_source = idx_dict[idx]['index'] if video_num < video_num_criteria[idx][stats_type][platform]: msg = {'fetch_date': fetch_date_str, 'platform': platform, 'video_num': video_num, 'alert_criteria': video_num_criteria[idx][stats_type][platform], 'short_perct': (video_num_criteria[idx][stats_type][platform] - video_num) / video_num_criteria[idx][stats_type][platform] * 100, 'index': idx, 'chk_source': chk_source, } alert_msg[idx][stats_type].append(msg) else: print('%s %s OK' % (idx, platform), file=f_log) fans_text = releaser_fans_alert(stamp-p,stamp) # send the alert email csm_mail_service = 'mail.csm.com.cn' sender = 'zhouyujiang@csm.com.cn' stats_type_dict = { 'observed': '总视频条目数', 'new_released': '新增视频条目数' } for idx in alert_msg: email_subj = '[爬虫数据日常预警] 数据条目预警 %s %s' % (idx, today.isoformat()[:10]) email_msg_body = '' for stats_type in alert_msg[idx]: if len(alert_msg[idx][stats_type]) > 0: email_msg_body += ('%s %s 预警:\n' % (idx, stats_type_dict[stats_type])) for ml in alert_msg[idx][stats_type]: msgline = '平台 %s 抓取 %s 视频条目数:%d,低于预警值(%d) %.2f%% \n' % ( ml['platform'], ml['fetch_date'], ml['video_num'], ml['alert_criteria'], ml['short_perct']) email_msg_body += msgline email_msg_body += "\n\n%s\n" % fans_text email_msg_body += '\nchecking data source index name: %s\n\n\n' % alert_msg[idx][stats_type][0][ 'chk_source'] if email_msg_body != '': email_msg_body += email_msg_suffix print('email_msg_body:\n', email_msg_body, file=f_log) email_msg = EmailMessage() email_msg.set_content(email_msg_body) email_msg['Subject'] = email_subj email_msg['From'] = sender email_msg['to'] = email_group[idx] try: server = smtplib.SMTP(host=csm_mail_service) server.send_message(email_msg) server.quit() print('Successfully sent email to %s for %s' % (email_group[idx], idx), datetime.datetime.now(), file=f_log) print('email_msg:\n', email_msg, file=f_log) except: print('Failed to connect email server.', datetime.datetime.now(), file=f_log) else: print('All platforms is ok for %s on fetch_date %s, %s' % ( idx, today.isoformat()[:10], datetime.datetime.now()), file=f_log) print('Alert criteria:\n', video_num_criteria[idx], file=f_log) print('\n\n', file=f_log) f_log.close() if __name__ == '__main__': videonumber_alert_for_ccr(0)