# -*- coding:utf-8 -*- # @Time : 2019/4/15 17:46 # @Author : litao from elasticsearch.helpers import scan import elasticsearch import json, os import pandas as pd import datetime, argparse import func_send_email_with_file parser = argparse.ArgumentParser(description='Specify a platform name.') parser.add_argument('-m', '--mail', default=None, type=str, help=('send to all or just us. like -m all')) args = parser.parse_args() hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) read_index = "target_releasers" read_type = "doc" write_index = "short-video-all-time-url" write_type = "all-time-url" def es_read(read_index, read_type, write_index, write_type, search_body): search_re = scan(index=read_index, doc_type=read_type, query=search_body, request_timeout=500, scroll='5m', client=es) change_name_list = [] for res in search_re: platform = res["_source"]["platform"] releaser = res["_source"]["releaser"] releaserUrl = res["_source"]["releaserUrl"] releaser_id = res["_source"].get("releaser_id") key_releaser = res["_source"].get("key_releaser") releaser_id_str = res["_source"].get("releaser_id_str") if platform not in releaser_id: releaser_id = platform + "_" + releaser_id # print(releaser_id) if releaser_id: search_body_all_time = { "query": { "bool": { "filter": [ {"term": {"releaser_id_str": releaser_id}}, {"range": {"release_time": {"gte": int( datetime.datetime.now().timestamp() * 1e3) - 2678400000}}} ] } }, "sort": [ {"fetch_time": {"order": "desc"}} ], "size": 1 } single_res = es.search(index=write_index, doc_type=write_type, body=search_body_all_time) if single_res["hits"]["hits"]: new_releaser = single_res["hits"]["hits"][0]["_source"]["releaser"] if new_releaser != releaser: search_old_name_body = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, {"term": {"releaser.keyword": releaser}} ] } }, "sort": [ {"fetch_time": {"order": "desc"}} ], "size": 1 } old_relaser = es.search(index=write_index, doc_type=write_type, body=search_old_name_body) if old_relaser["hits"]["hits"]: change_name_time = old_relaser["hits"]["hits"][0]["_source"]["release_time"] else: change_name_time = "" print(platform, releaser, new_releaser) change_name_list.append( { "platform": platform, "releaser": releaser, "releaserUrl": releaserUrl, "key_releaser": key_releaser, "releaser_id_str":releaser_id_str, "new_name": new_releaser, "change_time": datetime.datetime.fromtimestamp(change_name_time / 1000).strftime( "%Y-%m-%d %H:%M:%S") if change_name_time else change_name_time } ) else: pass return change_name_list def send_email(csv_path): f_log = None # if f_log == None: # path = '/home/zhouyujiang/email_log/' # log_fn = 'releaser_video_num_alert_for_%s_log' % datetime.datetime.strftime(today, '%b-%Y') # f_log = open(path + log_fn, 'a') # # pass # else: # f_log = sys.stdout # # print('*' * 80, file=f_log) # print('log timestamp ', datetime.datetime.now(), file=f_log) # print('Checking task for fetch_date', today.isoformat()[:10], file=f_log) email_group = [ 'litao@csm.com.cn', 'zhouyujiang@csm.com.cn', "gengdi@csm.com.cn" ] today = datetime.datetime.now() if args.mail: email_group = [ 'zhouyujiang@csm.com.cn', 'litao@csm.com.cn', 'hanye@csm.com.cn', 'zhangtianli@csm.com.cn', 'zhangminghui@csm.com.cn', "luojia@csm.com.cn", "gengdi@csm.com.cn", "jiangyanhong@csm.com.cn", "liuning@csm.com.cn" ] email_msg_suffix = ('\n\n\n' + '-' * 80 + '\n' + '这是自动发送的邮件,可以不用回复。\n' + 'This is an automatically sent message. You do NOT need to reply.\n') # # send the alert email csm_mail_service = 'mail.csm.com.cn' sender = 'litao@csm.com.cn' email_subj = '发布者改名 URL错误预警 %s' % (today.isoformat()[:10]) email_msg_body = '问好:\n 附件为' email_msg_body += '发布者改名预警数据和URL错误数据\n\n' email_msg_body += "如有任何问题联系" if email_msg_body != '': email_msg_body += email_msg_suffix print('email_msg_body:\n', email_msg_body, file=f_log) # if args.mail: # pop_value = ["observed_video_num", "observed_play_count_sum", "observed_favorite_count_sum", # "observed_comment_count_sum", "observed小于new_release"] # for p in pop_value: # i = columns.index(p) # columns.pop(i) today_str = today.strftime("%Y-%m-%d") func_send_email_with_file.send_file_email( file_path=csv_path , email_msg_body_str=email_msg_body, email_group=email_group, cc_group=[], title_str=email_subj, data_str=today_str, sender="litao@csm.com.cn") print('\n\n', file=f_log) def change_name_alert(): platform_list = [ "haokan", "miaopai", "toutiao", "抖音", "腾讯视频", "腾讯新闻", "kwai", "new_tudou", "网易新闻" ] all_list = [] for target_form in platform_list: search_body = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": target_form}} ], "must": [ {"exists": {"field": "releaser_id"}} ] } } } res = es_read(read_index=read_index, read_type=read_type, write_index=write_index, write_type=write_type, search_body=search_body) all_list += res if all_list: data = pd.DataFrame(all_list) s = datetime.datetime.now() ss = str(s)[0:19].replace(' ', '-').replace(':', '-') columns = ['releaser', 'platform', "releaserUrl", "releaser_id_str", "new_name", "change_time", "key_releaser", ] csv_path = '发布者改名预警' this_path = os.getcwd() + "/%s/" % csv_path try: print("fail to create folder %s" % this_path) this_path = "/home/hanye/project_data/Python/Projects/proj-short-videos/maintenance/send_recev_email/发布者改名预警" data.to_csv("%s/%s" % (this_path, '发布者改名预警.csv'), encoding='gb18030', columns=columns, mode="w") except: os.makedirs(this_path) data.to_csv("%s/%s" % (this_path, '发布者改名预警.csv'), encoding='gb18030', columns=columns, mode="w") print(this_path) return this_path def wrong_url_alert(this_path): platform_list = [ "haokan", "miaopai", "toutiao", "抖音", "腾讯视频", "腾讯新闻", "kwai", "new_tudou", "网易新闻" ] all_list = [] search_body = { "query": { "bool": { "filter": [ {"terms": {"platform.keyword": platform_list}}, {"term": {"is_valid": "false"}} ] } } } search_re = scan(index=read_index, doc_type=read_type, query=search_body, request_timeout=500, scroll='5m', client=es) for res in search_re: platform = res["_source"]["platform"] releaser = res["_source"]["releaser"] releaserUrl = res["_source"]["releaserUrl"] post_by = res["_source"].get("post_by") key_releaser = res["_source"].get("key_releaser") timestamp = res["_source"].get("timestamp") all_list.append( { 'releaser': releaser, 'platform': platform, "releaserUrl": releaserUrl, "key_releaser": key_releaser, "post_by": post_by, "change_time":datetime.datetime.fromtimestamp(timestamp / 1000).strftime( "%Y-%m-%d %H:%M:%S") if timestamp else timestamp } ) if all_list: data = pd.DataFrame(all_list) s = datetime.datetime.now() ss = str(s)[0:19].replace(' ', '-').replace(':', '-') columns = ['releaser', 'platform', "releaserUrl", "key_releaser", "post_by", "change_time" ] csv_path = 'releaserUrl错误预警' try: print("fail to create folder %s" % this_path) # this_path = "/home/hanye/project_data/Python/Projects/proj-short-videos/maintenance/send_recev_email/发布者改名预警" data.to_csv("%s/%s" % (this_path, '错误URL预警.csv'), encoding='gb18030', columns=columns, mode="w") except: this_path = os.getcwd() + "/%s/" % csv_path try: os.makedirs(this_path) except: pass finally: data.to_csv("%s/%s" % (this_path, '错误URL预警.csv'), encoding='gb18030', columns=columns, mode="w") print(this_path) return this_path if __name__ == "__main__": default_path = "/home/hanye/project_data/Python/Projects/proj-short-videos/maintenance/send_recev_email/发布者改名预警" default_path = change_name_alert() default_path = wrong_url_alert(default_path) send_email(default_path)