Commit 9f62ee57 authored by litaolemo's avatar litaolemo

update

parent 311953e8
......@@ -3,4 +3,6 @@
1. 部署在BJ-GM-Prod-Cos-faiss001/srv/apps/
2. 切换权限 sudo su - gmuser
3. source /root/anaconda3/bin/activate
4. 创建虚拟环境 conda activate crawler_env/conda deactivate
\ No newline at end of file
4. 创建虚拟环境 conda activate crawler_env/conda deactivate
5. 抓取程序 python /srv/apps/crawler/crawler_sys/framework/update_data_in_target_releasers_multi_process_by_date_from_redis.py
6. 写入抓取url程序 python /srv/apps/crawler/crawler_sys/framework/write_releasers_to_redis.py -p weibo -d 1 -proxies 5
\ No newline at end of file
# -*- coding:UTF-8 -*-
# @Time : 2020/7/28 8:53
# @File : cal_ni_and_put_to_backend.py
# @email : litao@igengmei.com
# @author : litao
import redis
import json
import datetime
import time
import sys
from maintenance.func_send_email_with_file import send_file_email
rds = redis.StrictRedis(host='154.8.190.251', port=6379, db=19)
class push_rule(object):
def __init__(self, repost_count_ni=None, comment_count_ni=None, favorite_count_ni=None, time_range=5):
"""
传入增量计算规则
如 5分钟点赞量增长200
faverite_count_ni = 200
time_range = 5
:param repost_count_ni: Int 转发增长值
:param comment_count_ni: Int 评论增长值
:param favorite_count_ni: Int 点赞增长值
:param time_range: Int 间隔分钟
"""
self._repost_count_ni = repost_count_ni
self._comment_count_ni = comment_count_ni
self._favorite_count_ni = favorite_count_ni
self._time_range = time_range
try:
self.repost_per_min = self._repost_count_ni / time_range
except:
self.repost_per_min = -100
try:
self.comment_per_min = self._comment_count_ni / time_range
except:
self.comment_per_min = -100
try:
self.favorite_per_min = self._favorite_count_ni / time_range
except:
self.favorite_per_min = -100
def parse_data(self, fetch_time_last=None, repost_count_last=None, comment_count_last=None,
favorite_count_last=None, fetch_time=None, repost_count=None, comment_count=None,
favorite_count=None, parse_mode="and") -> bool:
"""
:param fetch_time_last:
:param repost_count_last:
:param comment_count_last:
:param favorite_count_last:
:param fetch_time:
:param repost_count:
:param comment_count:
:param favoratie_count:
:param parse_mode: str "and" or "or" 用于判断条件是同时满足还是满足任一条件
:return:
"""
if fetch_time_last and fetch_time:
time_diff = (fetch_time_last - fetch_time) / 60 / 1e3
else:
raise KeyError("time input error")
if isinstance(comment_count_last, int) and isinstance(comment_count, int):
comment_diff = comment_count_last - comment_count
else:
comment_diff = -100
if isinstance(favorite_count_last, int) and isinstance(favorite_count, int):
favoratie_diff = favorite_count_last - favorite_count
else:
favoratie_diff = -100
if isinstance(repost_count_last, int) and isinstance(repost_count, int):
repost_diff = repost_count_last - repost_count
else:
repost_diff = -100
if parse_mode == "and":
if comment_diff / time_diff >= self.comment_per_min and favoratie_diff / time_diff >= self.favorite_per_min and repost_diff / time_diff >= self.repost_per_min:
return True
else:
return False
elif parse_mode == "or":
if comment_diff / time_diff >= self.comment_per_min or favoratie_diff / time_diff >= self.favorite_per_min or repost_diff / time_diff >= self.repost_per_min:
return True
else:
return False
else:
return False
def scan_from_redis(push_rule_class):
# len_id_list = rds.llen("doc_id")
while True:
doc_id = rds.lpop("doc_id")
if doc_id:
res = rds.llen(doc_id)
if res < 2:
continue
res_list = rds.lrange(doc_id, 0, 1)
fetch_time = 0
repost_count = 0
comment_count = 0
favorite_count = 0
for count, re in enumerate(res_list):
one_data = json.loads(re)
if count == 0:
fetch_time = one_data.get("fetch_time")
repost_count = one_data.get("repost_count")
comment_count = one_data.get("comment_count")
favorite_count = one_data.get("favorite_count")
continue
bool_res = push_rule_class.parse_data(fetch_time_last=fetch_time, repost_count_last=repost_count,
comment_count_last=comment_count,
favorite_count_last=favorite_count,
comment_count=one_data.get("comment_count"),
favorite_count=one_data.get("favorite_count"),
repost_count=one_data.get("repost_count"), parse_mode="and",
fetch_time=one_data.get("fetch_time"))
print(bool_res)
if bool_res:
pass
print(res_list)
else:
time.sleep(5)
def task_main():
# 实例化数据判断规则
push_rule_class = push_rule(favorite_count_ni=0.000000001, time_range=5)
# 循环处理抓取数据
scan_from_redis(push_rule_class)
task_main()
......@@ -208,6 +208,7 @@ class CrawlerDouban():
if video_time:
if start_time < video_time:
if video_time < end_time:
count_false = 0
yield res
else:
count_false += 1
......@@ -220,25 +221,7 @@ if __name__ == '__main__':
url = 'https://weibo.com/p/1644114654/home?from=page_100306&mod=TAB#place'
# releaserUrl = 'http://v.qq.com/vplus/cfa34d96d1b6609f1dccdea65b26b83d'
url_list = [
# "https://weibo.com/u/1764615662",
# "https://weibo.com/u/3662247177",
# "https://weibo.com/u/2378564111",
# "https://weibo.com/u/2983578965",
# "https://weibo.com/u/3938976579",
# "https://weibo.com/u/6511177474",
# "https://weibo.com/u/6343916471",
# "https://weibo.com/u/6511177474",
# "https://weibo.com/u/2921603920",
# "https://weibo.com/u/6470919752",
# "https://weibo.com/u/2653906910?refer_flag=1001030103_&is_hot=1",
# "https://weibo.com/u/3115996363?is_hot=1",
# "https://weibo.com/p/1005053212093237/home?from=page_100505&mod=TAB#place",
# "https://weibo.com/u/3926129482",
# "https://weibo.com/u/5509337969?is_hot=1",
# "https://weibo.com/u/5477320351",
# "https://weibo.com/p/1005055634795408/home?from=page_100505&mod=TAB#place",
"https://weibo.com/u/6511173721",
# "https://weibo.com/p/1005055471534537/home?from=page_100505&mod=TAB&is_hot=1#place",
"https://www.douban.com/people/new_tag"
]
# res = test.releaser_page(url, output_to_es_raw=True,
......@@ -247,7 +230,7 @@ if __name__ == '__main__':
# for r in res:
# print(r)
for u in url_list:
ttt = test.releaser_page_by_time(1590940800000, 1595468554268, u, output_to_es_register=True,
ttt = test.releaser_page_by_time(1595755100232, 1595906959333, u, output_to_es_register=True,
es_index='crawler-data-raw',
doc_type='doc', releaser_page_num_max=4000,allow=20)
for t in ttt:
......
......@@ -182,7 +182,7 @@ class Crawler_weibo():
doc_id_type="all-time-url")
yield res_dic
except Exception as e:
print(mblog)
print(json.dumps(mblog))
print("row formate error %s" % e)
continue
......
import datetime
import copy
import elasticsearch
from elasticsearch.helpers import scan
import json
# now - 2 天
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
class NameIdDict:
def __init__(self, fh_datetime, re_datetime):
self.re_datetime_st = re_datetime
self.re_datetime_et = re_datetime + datetime.timedelta(1)
self.fh_datetime_st = fh_datetime
self.fh_datetime_et = fh_datetime + + datetime.timedelta(1)
self.re_datetime = re_datetime
self.fh_datetime = fh_datetime
self.splits = 1
self.re_datetime_st_ts = int(self.re_datetime_st.timestamp()*1000)
self.re_datetime_et_ts = int(self.re_datetime_et.timestamp() * 1000)
self.fh_datetime_st_ts = int(self.fh_datetime_st.timestamp() * 1000)
self.fh_datetime_et_ts = int(self.fh_datetime_et.timestamp() * 1000)
self.name_id_dict = {}
self.change_list = []
self.find_name_id()
def func_list_to_dict(self, scan_re):
re_dict = {}
for one in scan_re:
line = one['_source']
try:
name = line['releaser']
_id = line['releaser_id_str']
except KeyError:
continue
else:
if _id not in re_dict:
re_dict[_id] = set()
re_dict[_id].add(name)
else:
re_dict[_id].add(name)
self.name_id_dict.update(re_dict)
def find_name_id(self):
id_list = self.find_target_releaser_id()
print(len(id_list), "id的长度")
step = 1000
if len(id_list) < 1000:
range_num = 1
step = len(id_list)
else:
range_num = int(len(id_list) / 1000) + 1
for i in range(0, range_num):
st = i * step
if i == range_num - 1:
et = None
else:
et = step * (i + 1)
search_body = {
"query": {
"bool": {
"filter": [
# {"range": {"release_time": {"gte": self.re_datetime_st_ts, "lt": self.re_datetime_et_ts}}},
{"range": {"fetch_time": {"gte": self.fh_datetime_st_ts, "lt": self.fh_datetime_et_ts}}},
{"terms": {"releaser_id_str": id_list[st:et]}}
]
}
}
}
scan_re = scan(client=es, index='short-video-production', doc_type='daily-url', query=search_body)
search_re = es.search(index='short-video-production', doc_type='daily-url', body=search_body)
# print(search_re['hits']['total'], "发布者ID")
self.func_list_to_dict(scan_re)
def func_find_change(self, class_name_dict):
change_list = []
tmp1 = copy.deepcopy(self.name_id_dict)
# print(tmp1, '旧的')
tmp2 = copy.deepcopy(class_name_dict.name_id_dict)
# print(tmp2, '新的')
for one in tmp1:
change_dict = {}
one_value = tmp1[one]
try:
two_value = tmp2[one]
except KeyError:
two_value = set()
all_value = one_value - two_value
if len(all_value) > 0:
change_dict['releaser_id'] = one
change_dict['releaser_name_old'] = ','.join(list(self.name_id_dict[one]))
try:
change_dict['releaser_name_new'] = list(class_name_dict.name_id_dict[one])[-1]
except Exception as e:
# change_dict['releaser_name_old'] = list(self.name_id_dict[one])[0]
change_dict['releaser_name_new'] = ""
# if change_dict['releaser_name_old'] == change_dict['releaser_name_new']:
# print(list(self.name_id_dict[one]), '旧的')
# print(list(class_name_dict.name_id_dict[one]), '新的')
change_list.append(change_dict)
# print(len(change_list))
# print(change_list)
self.change_list = change_list
return change_list
@staticmethod
def find_target_releaser_id():
id_list = []
body = {
"query": {
"bool": {
"filter": [
{"term": {"key_releaser.keyword": "True"}}
]
}
}
}
s_re = scan(client=es, index='target_releasers', query=body)
for one in s_re:
line = one['_source']
releaser_id = line['releaser_id_str']
id_list.append(releaser_id)
# print(len(id_list))
return id_list
def func_write_into_es(change_list, fhday):
timestamp = int(datetime.datetime.now().timestamp()*1000)
for one_c in change_list:
if one_c == '':
continue
tmp_dict = {}
releaser_id_str = one_c['releaser_id']
old_name = one_c['releaser_name_old']
new_name = one_c['releaser_name_new']
change_day_str = str(fhday)[0:10]
doc_id = releaser_id_str + '_' + change_day_str
tmp_dict.update({'timestamp': timestamp,
'old_name': old_name,
'new_name': new_name,
'data_day': fhday.day,
'data_month': fhday.month,
'data_year': fhday.year,
'releaser_id_str': releaser_id_str
})
bulk_head = '{"index": {"_id":"%s"}}' % doc_id
data_str = json.dumps(tmp_dict, ensure_ascii=False)
bulk_one_body = bulk_head + '\n' + data_str + '\n'
eror_dic = es.bulk(index='change_name', doc_type='doc',
body=bulk_one_body, request_timeout=200)
if eror_dic['errors'] is True:
print(eror_dic['items'])
print(bulk_one_body)
if __name__ == "__main__":
now_ = datetime.datetime.now()
now = datetime.datetime(now_.year, now_.month, now_.day)
# new
fh_dt = now - datetime.timedelta(1)
re_dt = now - datetime.timedelta(2)
t1 = NameIdDict(fh_dt, re_dt)
# old
fh_dt = now - datetime.timedelta(2)
re_dt = now - datetime.timedelta(3)
t2 = NameIdDict(fh_dt, re_dt)
# 比较
change_list = t2.func_find_change(t1)
func_write_into_es(change_list=change_list, fhday=now_)
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 15 10:52:23 2019
@author: zhouyujiang
"""
import smtplib
import datetime
import sys
import argparse
import re
from email.message import EmailMessage
from func_send_keyword_in_file_ck_log import func_find_file_keyword
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--datestr', type=str, default=None,
help=('like 2019-3-15'))
args = parser.parse_args()
if args.datestr is None:
nowdaystr = str(datetime.datetime.now())[0:10]
else:
nowdaystr = args.datestr
ck_log_path = '/data0/var/log/maintenance/'
ck_log_file_name = 'getOTTdataFromFTP_log_' + nowdaystr
ck_log_file_name_extract = 'getOTTdataFromFTP_log_' + nowdaystr + '__extract_errs'
ck_file = ck_log_path+ck_log_file_name
ck_file_extract = ck_log_path+ck_log_file_name_extract
ck_log_list = [ck_file, ck_file_extract]
print('will check:', ck_log_list)
keyword_list = ['with zero length on HDFS for platform', 'failed']
re_dict_list = func_find_file_keyword(ck_log_list, keyword_list)
error_file_list = set()
unzip_file_list =set()
for one in re_dict_list:
path = one['path']
keyword = one['keyword']
line_list = one['filelines']
if line_list != []:
print('get error in ', path)
for line in line_list:
if 'zero length on HDFS for platform' in line:
list_str = re.findall(".*\[(.*)\].*", line)
err_messg_str = list_str[0]
if err_messg_str != '':
err_messg_list = err_messg_str.split(',')
for one_err in err_messg_list:
one_err = one_err.replace("'", '')
err_file = one_err.split('/')[-1]
error_file_list.add(err_file)
if 'failed' in line:
list_str = re.findall(".*Extract file(.*)failed:.*", line)
err_messg_str = list_str[0]
if err_messg_str != '':
err_file = err_messg_str.strip().split('/')[-1]
unzip_file_list.add(err_file)
all_body = ''
if len(error_file_list) != 0:
messg_body_file = ',\t'.join(error_file_list)
messg_body_1 = str(datetime.datetime.now())[0:10] + '发现{%d}个空文件'%len(error_file_list) + '具体如下:\t' + messg_body_file
all_body = all_body + messg_body_1
if len(unzip_file_list) != 0:
messg_body_failed = '\t'.join(unzip_file_list)
messg_body_2= str(datetime.datetime.now())[0:10] + '发现解压文件失败,\t'
all_body = all_body + '\n\n' + messg_body_2
if all_body != '':
email_subj = '[TVpOTT 原始数据预警] %s'%str(datetime.datetime.now())[0:10]
email_msg=EmailMessage()
csm_mail_service='mail.csm.com.cn'
sender = 'zhouyujiang@csm.com.cn'
email_group = ['zhouyujiang@csm.com.cn']
email_msg.set_content(all_body)
email_msg['Subject']=email_subj
email_msg['From']=sender
email_msg['to']=email_group
server=smtplib.SMTP(host=csm_mail_service)
server.send_message(email_msg)
server.quit()
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 5 10:20:25 2019
@author: zhouyujiang
客服部发送Infosys加密文件
"""
import os
import datetime
import time
import shutil
import configparser
from func_send_email_with_file import send_file_email
def func_mk_send_file_name(date, folder):
datestr = str(date).replace('-', '')[2:8]
AUfile = folder + '\\' + 'AU' + datestr + '.ZIP'
PIfile = folder + '\\' + 'PI' + datestr + '.ZIP'
return [AUfile, PIfile]
def func_find_recent_folder(aims_path):
folder_dict = {}
folder_list = []
# for root, dirs, file in os.walk(aims_path):
# folder_list = dirs
# for i in folder_list:
folder_list = [i for i in os.listdir(aims_path) if not os.path.isdir(i)]
new_folder_List = sorted(folder_list, reverse=True)
# for one in folder_list:
# file_path = aims_path + '\\' + one
# update_time = os.path.getmtime(file_path)
# folder_dict.update({file_path:update_time})
# new_file = sorted(folder_dict.items(), key=lambda x:x[1], reverse=True)
return aims_path + '\\' + new_folder_List[0]
def func_rm_sendfile_to_dir(file_path=r'D:',
send_file_list=[]):
tmp_name = 'send_email_file'
tmp_path = file_path + '\\' + tmp_name
# os.mkdir(tmp_path)
if os.listdir(tmp_path) != []:
for j in os.listdir(tmp_path):
os.remove(tmp_path + '\\' + j)
if os.path.exists(tmp_path):
pass
else:
os.mkdir(tmp_path)
if send_file_list == []:
print('no file will rm, send_eamil_list = []')
else:
for one in send_file_list:
shutil.copy(one, tmp_path)
data_str = str(now)[0:10]
send_file_email(tmp_path, data_str, email_group, email_msg_body_str,
title_str, cc_group)
if __name__ == '__main__':
# 配置文件地址
config_path = r'Y:\send_email.ini'
# 配置文件地址 测试
# config_path = r'D:\code\maintenance\config\send_email.ini'
# 临时存放要发送的邮件地址
temp_file_path = r'D:'
# 要发送的数据地址
log_file = r'D:\\log\\send_email_log_' + str(datetime.datetime.now())[0:10]
f = open(log_file, 'w')
aims_path = r'Z:\\'
CONFIG = configparser.ConfigParser()
CONFIG.read(config_path, encoding='utf-8')
print(CONFIG, file=f)
send_time = CONFIG['send_time']['start_time']
send_time_hour = int(send_time[0:2])
send_time_minute = int(send_time[3:5])
email_group = CONFIG['send_to']['email_group'].split(',')
cc_group = CONFIG['Cc_to']['Cc_group'].split(',')
email_msg_body_str = CONFIG['send_body']['body']
print(send_time, file=f)
print(email_group, file=f)
print(cc_group, file=f)
now = datetime.datetime.now()
weekday = now.weekday()
title_str = '[自动发送邮件]'
while True:
if now.hour == send_time_hour and now.minute == send_time_minute:
print('开始发送邮件:', file=f)
if weekday in (1, 2, 3, 4):
email_msg_body_str = email_msg_body_str
send_day = now - datetime.timedelta(1)
recent_folder = func_find_recent_folder(aims_path)
send_list = func_mk_send_file_name(send_day, recent_folder)
func_rm_sendfile_to_dir(file_path=temp_file_path,
send_file_list=send_list)
print('邮件发送成功', datetime.datetime.now(), file=f)
if weekday in (0,):
email_msg_body_str = email_msg_body_str
send_list = []
recent_folder = func_find_recent_folder(aims_path)
for i in range(1, 4):
send_day = now - datetime.timedelta(i)
recent_folder = func_find_recent_folder(aims_path)
send_list += func_mk_send_file_name(send_day, recent_folder)
func_rm_sendfile_to_dir(send_file_list=send_list)
print('邮件发送成功', datetime.datetime.now(), file=f)
break
if (now.hour > send_time_hour) or (now.hour == send_time_hour and now.minute > send_time_minute):
print('超过发送时间', file=f)
break
else:
print('等待邮件发送时间发送:', file=f)
time.sleep(20)
now = datetime.datetime.now()
f.close()
This diff is collapsed.
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 8 15:12:14 2018
@author: hanye
"""
from elasticsearch import Elasticsearch
import datetime
import copy
import json
import logging
def build_maintainance_index(fetch_year, fetch_month, fetch_day,
freq_type='daily'):
# Be noted, currently, freq_type is re-assigned to be 'daily', so even if a value
# passed in which is different from 'daily' will be ignored and the freq_type
# will still be 'daily'. Jan 08 2018
# creater logger
logger=logging.getLogger('maintainance_build_manual.func')
logger.info('fetch_year %d, fetch_month %d, fetch_day %d' %(fetch_year, fetch_month, fetch_day))
# 0 init
logger.info('0 init')
hosts='192.168.17.11'
port=9200
es=Elasticsearch(hosts=hosts, port=port)
index_short_video='short-video-production'
doc_type_short_video_daily='daily-url'
index_maintainance='maintainance-short-video'
doc_type_maintainance_daily='daily'
# 1 set fetch_day for daily maintainance data stats
logger.info('1 set fetch_day for daily maintainance data stats')
fetch_time_start_T=datetime.datetime(fetch_year, fetch_month, fetch_day)
fetch_time_end_T=fetch_time_start_T+datetime.timedelta(days=1)
fetch_time_start_ts=int(fetch_time_start_T.timestamp()*1e3)
fetch_time_end_ts=int(fetch_time_end_T.timestamp()*1e3)
# 2 find how many platforms there
logger.info('2 find how many platforms there')
find_platform_Lst={
"query": {
"bool": {
"filter": [
{"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
]
}
},
"size": 0,
"aggs": {
"platforms": {
"terms": {
"field": "platform.keyword",
"size": 50
}
}
}
}
find_platform_resp=es.search(index=index_short_video,
doc_type=doc_type_short_video_daily,
body=find_platform_Lst,
request_timeout=100)
aggs_result=find_platform_resp['aggregations']['platforms']['buckets']
platform_dict={}
for line in aggs_result:
platform=line['key']
video_num=line['doc_count']
platform_dict[platform]=video_num
# 3 define fields and their stats
logger.info('3 define fields and their stats')
field_dict={
'play_count': {
'max': 'play_count_max',
'min': 'play_count_min',
'avg': 'play_count_avg',
'sum': 'play_count_sum',
},
'favorite_count':{
'max': 'favorite_count_max',
'min': 'favorite_count_min',
'avg': 'favorite_count_avg',
'sum': 'favorite_count_sum',
},
'comment_count':{
'max': 'comment_count_max',
'min': 'comment_count_min',
'avg': 'comment_count_avg',
'sum': 'comment_count_sum',
},
}
# 4 aggregations for each field's each stats metric
logger.info('4 aggregations for each field\'s each stats metric')
stats_Lst=[]
fetch_date_ts=int(datetime.datetime(fetch_year, fetch_month, fetch_day).timestamp()*1e3)
for platform in platform_dict:
logger.info('platform: %s' % (platform))
stats_dict_init={'fetch_year': fetch_year,
'fetch_month': fetch_month,
'fetch_day': fetch_day,
'fetch_date': fetch_date_ts,
'platform': platform,
}
freq_type='daily'
stats_dict_init['freq_type']=freq_type
stats_body_observed={
"query": {
"bool": {
"filter": [
{"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
{"term": {"platform.keyword": platform}}
]
}
},
"size": 0,
"aggs": {
"field_stats": {
"stats": {
"field": None
}
}
}
}
fetch_time_start_ts_enlarge=int(fetch_time_start_ts-24*3600*1e3)
stats_body_new_released={
"query": {
"bool": {
"filter": [
{"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
{"term": {"platform.keyword": platform}},
{"range": {"release_time": {"gte":fetch_time_start_ts_enlarge, "lt":fetch_time_end_ts}}},
]
}
},
"size": 0,
"aggs": {
"field_stats": {
"stats": {
"field": None
}
}
}
}
stats_type_dict={'observed': stats_body_observed,
'new_released': stats_body_new_released}
for stats_type in stats_type_dict:
logger.info('platform: %s, stats_type: %s' % (platform, stats_type))
stats_dict=copy.deepcopy(stats_dict_init)
stats_dict['stats_type']=stats_type
for field_name in field_dict:
logger.info('platform: %s, stats_type: %s, field: %s' % (platform, stats_type, field_name))
stats_body=stats_type_dict[stats_type]
stats_body['aggs']['field_stats']['stats']['field']=field_name
search_resp=es.search(index=index_short_video,
doc_type=doc_type_short_video_daily,
body=stats_body,
request_timeout=100)
video_num=search_resp['hits']['total']
stats_dict['video_num']=video_num
field_max=search_resp['aggregations']['field_stats']['max']
field_min=search_resp['aggregations']['field_stats']['min']
field_avg=search_resp['aggregations']['field_stats']['avg']
field_sum=search_resp['aggregations']['field_stats']['sum']
stats_dict.update({
field_dict[field_name]['sum']: field_sum,
field_dict[field_name]['max']: field_max,
field_dict[field_name]['min']: field_min,
field_dict[field_name]['avg']: field_avg,
})
timestamp=int(datetime.datetime.now().timestamp()*1e3)
stats_dict['timestamp']=timestamp
stats_Lst.append(stats_dict)
# 5 bulk write into maintainance index
bulk_body=''
for line in stats_Lst:
line_id=(line['platform']+'_'
+datetime.datetime.fromtimestamp(line['fetch_date']/1e3).isoformat()[:10]+'_'
+line['stats_type']+'_'
+line['freq_type'])
action_json='{"index": {"_id":"'+line_id+'"}}'
line_json=json.dumps(line, ensure_ascii=False)
line_body=(action_json+'\n'+line_json+'\n')
bulk_body+=line_body
if len(bulk_body)!=0:
es.bulk(body=bulk_body,
index=index_maintainance,
doc_type=doc_type_maintainance_daily,
request_timeout=200)
else:
logger.info('Got empty bulk_body at fetch_year %d, fetch_month %d, fetch_day %d' %(fetch_year, fetch_month, fetch_day))
# 2019/9/10 查看给客服部发邮件程序日志
import datetime
import smtplib
from email.message import EmailMessage
from func_send_keyword_in_file_ck_log import func_find_file_keyword
now = datetime.datetime.now()
log_file = r'D:\\log\\send_email_log_' + str(now)[0:10]
weekday = now.weekday()
if weekday in (0, 1, 2, 3, 4):
keyword = '邮件发送成功'
re_dict = func_find_file_keyword(log_file, keyword)
csm_mail_service = 'mail.csm.com.cn'
email_msg_body = ''
email_msg = EmailMessage()
email_msg['From'] = 'zhouyujiang@csm.com.cn'
email_msg['to'] = ['zhouyujiang@csm.com.cn', 'huwenhui@csm.com.cn', 'lvyidan@csm.com.cn', 'nabuqi@csm.com.cn']
for one in re_dict:
if len(one['filelines']) != 0:
email_msg_body = '今日邮件已发送-' + str(now)[0:10]
email_subj = '[已发送]-邮件发送情况'
else:
email_msg_body = '今日邮件没有发送' + str(now)[0:10] + \
'请检查日志{log_}'.format(log_='send_email_log_' + str(now)[0:10])
email_subj = '[未发送]-邮件发送情况'
email_msg.set_content(email_msg_body)
email_msg['Subject'] = email_subj
server = smtplib.SMTP(host=csm_mail_service)
server.send_message(email_msg)
server.quit()
\ No newline at end of file
......@@ -25,34 +25,29 @@ from email.mime.text import MIMEText
#COMMASPACE = ', '
def send_file_email(file_path, data_str, email_group=['zhouyujiang@csm.com.cn'],
def send_file_email(file_path, data_str, email_group=[],
email_msg_body_str=None,
title_str=None,
cc_group=['zhouyujiang@csm.com.cn', 'hanye@csm.com.cn'], sender=None):
cc_group=["litao@igengmei.com"], sender=None):
directory = file_path
# Create the enclosing (outer) message
if email_msg_body_str == None:
email_msg_body = '''肖老师,李赞,问好:\n 附件是我们爬虫和外部采购短视频数据源获取的与任正非BBC采访有关的短视频数据,其中,
《任正非BBC采访相关短视频数据_{data_str}.csv》是视频明细
《任正非BBC采访相关短视频数据summary_{data_str}.csv》是视频结果汇总
在summary附件中,sum_net_inc_play_count是当日新增播放量的和,sum_playcount 是视频累积播放量和
注:梨视频(pearvideo)平台没有播放量
'''.format(data_str=data_str)
email_msg_body = '''{0} '''.format(data_str=data_str)
else:
email_msg_body = email_msg_body_str
outer = MIMEMultipart()
if title_str == None:
title = '任正非BBC采访相关短视频数据-' + data_str
title = '-' + data_str
else:
title = title_str
outer['Subject'] = title
outer['To'] = ','.join(email_group)
outer['Cc'] = ','.join(cc_group)
if not sender:
outer['From'] = 'zhouyujiang@csm.com.cn'
outer['From'] = 'litao@igengmei.com.cn'
else:
outer['From'] = sender
csm_mail_service = 'mail.csm.com.cn'
mail_service = 'smtp.exmail.qq.com'
outer.attach(MIMEText(email_msg_body))
if directory:
......@@ -94,15 +89,16 @@ def send_file_email(file_path, data_str, email_group=['zhouyujiang@csm.com.cn'],
# Now send or store the message
# composed = outer.as_string()
server = smtplib.SMTP(host=csm_mail_service)
server = smtplib.SMTP_SSL(mail_service,465)
server.login("litao@igengmei.com","Lemo1995")
server.send_message(outer)
server.quit()
if __name__ == '__main__':
send_file_email(file_path=r'D:\code\test',
data_str='2019-02-21')
send_file_email("",'2019-02-21',
sender="litao@igengmei.com",email_group=["litao@igengmei.com"],email_msg_body_str="test",title_str="test",cc_group=["litao@igengmei.com"])
......
......@@ -4,7 +4,7 @@
import redis,time,json,datetime,sys
from maintenance.func_send_email_with_file import send_file_email
rds = redis.StrictRedis(host='192.168.17.60', port=6379, db=2,decode_responses=True)
rds = redis.StrictRedis(host='154.8.190.251', port=6379, db=19,decode_responses=True)
def write_email_task_to_redis(task_name=None,file_path=None, data_str=None, email_group=[],
......
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 2 17:26:50 2018
@author: hanye
"""
import os
import smtplib
from email.message import EmailMessage
import datetime
import time
os.chdir('/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log')
files=os.listdir()
while True:
now=datetime.datetime.now()
if (now.year==2018 and now.month==2 and now.day in [3, 4] and now.hour in [2, 9, 11, 12, 18, 21]):
files_Lst=[]
for ff in files:
mt=os.path.getmtime(ff)
filed={'filename':ff, 'modify_ts':mt}
files_Lst.append(filed)
files_Lst.sort(key=lambda d: d['modify_ts'], reverse=True)
email_msg_body=''
for line in files_Lst[:100]:
filename=line['filename']
filemt=datetime.datetime.fromtimestamp(line['modify_ts']).isoformat()[:19]
line_str=filename+' '+filemt
email_msg_body+=line_str+'\n'
email_msg_suffix=('\n\n\n'
+'-'*80+'\n'
+'这是自动发送的邮件,可以不用回复。\n'
+'This is an automatically sent message. You do NOT need to reply.\n')
email_msg_body+=email_msg_suffix
csm_mail_service='mail.csm.com.cn'
sender='ccr_maintain@csm.com.cn'
receiver=['hanye@csm.com.cn', 'yeahan@outlook.com']
email_subj='check on log directory %s' % datetime.datetime.now().isoformat()[:19]
email_msg=EmailMessage()
email_msg.set_content(email_msg_body)
email_msg['Subject']=email_subj
email_msg['From']=sender
email_msg['to']=receiver
try:
server=smtplib.SMTP(host=csm_mail_service)
server.send_message(email_msg)
server.quit()
print('Successfully sent email to %s' % receiver,
datetime.datetime.now())
print('email_msg:\n', email_msg)
except:
print('Failed to connect email server.', datetime.datetime.now())
time.sleep(60*60)
if now>datetime.datetime(2018,2,4,23):
break
time.sleep(60)
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 2 17:26:50 2018
@author: hanye
"""
import os
import smtplib
from email.message import EmailMessage
import datetime
import time
os.chdir('/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log')
files=os.listdir()
files_Lst=[]
for ff in files:
mt=os.path.getmtime(ff)
filed={'filename':ff, 'modify_ts':mt}
files_Lst.append(filed)
files_Lst.sort(key=lambda d: d['modify_ts'], reverse=True)
email_msg_body=''
line_num=0
for line in files_Lst[:100]:
line_num+=1
filename=line['filename']
filemt=datetime.datetime.fromtimestamp(line['modify_ts']).isoformat()[:19]
line_str='%3d\t%s\t\t%s\n' % (line_num, filename, filemt)
email_msg_body+=line_str+'\n'
email_msg_suffix=('\n\n\n'
+'-'*80+'\n'
+'这是自动发送的邮件,可以不用回复。\n'
+'This is an automatically sent message. You do NOT need to reply.\n')
email_msg_body+=email_msg_suffix
csm_mail_service='mail.csm.com.cn'
sender='ccr_maintain@csm.com.cn'
receiver=['hanye@csm.com.cn', 'yeahan@outlook.com']
email_subj='check on log directory %s' % datetime.datetime.now().isoformat()[:19]
email_msg=EmailMessage()
email_msg.set_content(email_msg_body)
email_msg['Subject']=email_subj
email_msg['From']=sender
email_msg['to']=receiver
try:
server=smtplib.SMTP(host=csm_mail_service)
server.send_message(email_msg)
server.quit()
print('Successfully sent email to %s' % receiver,
datetime.datetime.now())
print('email_msg:\n', email_msg)
except:
print('Failed to connect email server.', datetime.datetime.now())
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Mon Apr 9 07:41:59 2018
Send the contents of a directory as a MIME message.
Unless the -o option is given, the email is sent by forwarding to your local
SMTP server, which then does the normal delivery process.
Attachments size should be checked BEFORE call this function, because
different SMTP servers apply different size limits.
@author: hanye
"""
import os, sys, datetime
import smtplib
# For guessing MIME type based on file name extension
import mimetypes
#from argparse import ArgumentParser
from email.message import EmailMessage
from email.policy import SMTP
def send_all_dir(subject, message, smtp_host,
sender, recipients,
directory='.', attachments=[], output=None,
f_log=sys.stdout):
msg = EmailMessage()
msg['Subject'] = subject
msg['To'] = ', '.join(recipients)
msg['From'] = sender
msg.preamble = 'Email message preamble.\n'
email_msg_body=message
msg.set_content(email_msg_body)
if len(attachments)==0:
attachments=os.listdir(directory)
for filename in attachments:
path = os.path.join(directory, filename)
if not os.path.isfile(path):
continue
# Guess the content type based on the file's extension. Encoding
# will be ignored, although we should check for simple things like
# gzip'd or compressed files.
ctype, encoding = mimetypes.guess_type(path)
if ctype is None or encoding is not None:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
with open(path, 'rb') as fp:
msg.add_attachment(fp.read(),
maintype=maintype,
subtype=subtype,
filename=filename)
# Now send or store the message
send_email_success=False
if output!=None:
with open(output, 'wb') as fp:
fp.write(msg.as_bytes(policy=SMTP))
else:
try:
send_s=datetime.datetime.now()
with smtplib.SMTP(host=smtp_host) as s:
s.send_message(msg)
send_e=datetime.datetime.now()
send_t_delta=send_e-send_s
print('Successfully sent email to %s from %s, takes %s,' % (','.join(recipients), sender, send_t_delta),
datetime.datetime.now(), file=f_log)
send_email_success=True
return send_email_success
except:
print('Failed to send email with attachments.', datetime.datetime.now(), file=f_log)
return send_email_success
if __name__ == '__main__':
pth=r'D:\CSM\Docs\Projects\短视频\code\maintainance\send_recev_email\test'
sender='hanye@csm.com.cn'
csm_mail_service='mail.csm.com.cn'
recr='hanye@csm.com.cn'
subject='这是测试数据邮件附件3'
message_body="""
您好,
这是测试邮件附件
祝好,
韩烨
"""
attachments=['email_test.py', 'videonumber_alert_daily_test.py']
send_all_dir(subject=subject, message=message_body,
directory=pth, attachments=attachments,
sender=sender, smtp_host=csm_mail_service, recipients=[recr])
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 20 16:08:17 2019
@author: zhouyujiang
发送有附件的 邮件
"""
import os
import smtplib
# For guessing MIME type based on file name extension
import mimetypes
from email import encoders
#from email.message import Message
#from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
#from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
#COMMASPACE = ', '
def send_file_email(file_path, data_str, email_group=['zhouyujiang@csm.com.cn'],
email_msg_body_str=None,
title_str=None,
cc_group=['zhouyujiang@csm.com.cn', 'hanye@csm.com.cn'],sender = None):
directory = file_path
# Create the enclosing (outer) message
if email_msg_body_str == None:
email_msg_body = '''肖老师,李赞,问好:\n 附件是我们爬虫和外部采购短视频数据源获取的与任正非BBC采访有关的短视频数据,其中,
《任正非BBC采访相关短视频数据_{data_str}.csv》是视频明细
《任正非BBC采访相关短视频数据summary_{data_str}.csv》是视频结果汇总
在summary附件中,sum_net_inc_play_count是当日新增播放量的和,sum_playcount 是视频累积播放量和
注:梨视频(pearvideo)平台没有播放量
'''.format(data_str=data_str)
else:
email_msg_body = email_msg_body_str
outer = MIMEMultipart()
if title_str == None:
title = '任正非BBC采访相关短视频数据-' + data_str
else:
title = title_str
outer['Subject'] = title
outer['To'] = ','.join(email_group)
outer['Cc'] = ','.join(cc_group)
if not sender:
outer['From'] = 'zhouyujiang@csm.com.cn'
else:
outer['From'] = sender
csm_mail_service = 'mail.csm.com.cn'
outer.attach(MIMEText(email_msg_body))
for filename in os.listdir(directory):
path = os.path.join(directory, filename)
if not os.path.isfile(path):
continue
# Guess the content type based on the file's extension. Encoding
# will be ignored, although we should check for simple things like
# gzip'd or compressed files.
ctype, encoding = mimetypes.guess_type(path)
if ctype is None or encoding is not None:
# No guess could be made, or the file is encoded (compressed), so
# use a generic bag-of-bits type.
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
# if maintype == 'text':
## with open(path,encoding='utf-8') as fp:
### Note: we should handle calculating the charset
## msg = MIMEText(fp.read(), _subtype=subtype)
# continue
if maintype == 'image':
# with open(path, 'rb') as fp:
# msg = MIMEImage(fp.read(), _subtype=subtype)
continue
elif maintype == 'audio':
# with open(path, 'rb') as fp:
# msg = MIMEAudio(fp.read(), _subtype=subtype)
continue
else:
with open(path, 'rb') as fp:
msg = MIMEBase(maintype, subtype,charset='gb18030')
msg.set_payload(fp.read())
# Encode the payload using Base64
encoders.encode_base64(msg)
# Set the filename parameter
msg.add_header('Content-Disposition', 'attachment', filename=filename)
outer.attach(msg)
# Now send or store the message
# composed = outer.as_string()
server = smtplib.SMTP(host=csm_mail_service)
server.send_message(outer)
server.quit()
if __name__ == '__main__':
send_file_email(file_path=r'D:\code\test',
data_str='2019-02-21',
cc_group=[],
sender='lizhiwei@csm.com.cn',
email_group=['lizhiwei@csm.com.cn'])
# encoding: utf-8
'''
@author: zhangjian
@time: 2019/1/8 11:08
'''
import sys
import smtplib
import datetime
import elasticsearch
from email.message import EmailMessage
def releaser_video_alert_weekly(f_log=None):
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
today = datetime.datetime.now()
# if f_log == None:
# path = '/home/zhouyujiang/email_log/'
# log_fn = 'releaser_video_num_alert_for_%s_log' % datetime.datetime.strftime(today, '%b-%Y')
# f_log = open(path + log_fn, 'a')
# # pass
# else:
# f_log = sys.stdout
#
# print('*' * 80, file=f_log)
# print('log timestamp ', datetime.datetime.now(), file=f_log)
# print('Checking task for fetch_date', today.isoformat()[:10], file=f_log)
# 计算周数和时间
week_canlendar = datetime.datetime.now().isocalendar()
# week_canlendar = datetime.datetime(year=2018, month=12, day=25).isocalendar()
year = week_canlendar[0]
this_week = week_canlendar[1] - 1
last_week = this_week - 1
today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day)
# today = datetime.datetime(year=2018, month=12, day=25)
first_day_in_week = today - datetime.timedelta(days=week_canlendar[2] + 7)
fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3)
last_day_in_week = first_day_in_week + datetime.timedelta(days=7)
last_day_ts = int(last_day_in_week.timestamp() * 1e3)
this_week_index = 'short-video-weekly'
this_week_doc = 'daily-url-' + str(year) + '_w' + format(this_week, '>02d') + '_s1'
last_week_index = 'releaser-weekly-short-video'
last_week_doc = 'doc'
result = {'below': [], 'above': []}
# error_count=0
count = 0
with open('/home/zhouyujiang/pycode/alert_releaser.csv', 'r', encoding='gb18030') as f:
header_Lst = f.readline().strip().split(',')
for line in f:
count += 1
one_line = {}
line_Lst = line.strip().split(',')
line_dict = dict(zip(header_Lst, line_Lst))
releaser = line_dict['releaser']
platform = line_dict['platform']
# 计算本周video_num
search_body1 = {
"query": {
"bool": {
"filter": [
{"term": {"platform.keyword": platform}},
{"term": {"releaser.keyword": releaser}},
{"range": {"release_time": {"gte": fisrt_day_ts, "lt": last_day_ts}}},
{"range": {"duration": {"lte": 600}}},
]
}
}
}
resp1 = es.search(index=this_week_index, doc_type=this_week_doc, body=search_body1)
this_week_videonum = resp1['hits']['total']
# 计算上周video_num
search_body2 = {
"query": {
"bool": {
"filter": [
{"term": {"platform.keyword": platform}},
{"term": {"releaser.keyword": releaser}},
{"term": {"data_week_year": year}},
{"term": {"data_week_num": last_week}},
{"term": {"stats_type": "new_released"}}
]
}
}
}
resp2 = es.search(index=last_week_index, doc_type=last_week_doc, body=search_body2)
try:
last_week_videonum = resp2['hits']['hits'][0]['_source']['video_num']
except:
print('last_week_releaser_error:', releaser, platform, resp2['hits'], file=f_log)
last_week_videonum = 0
# 计算百分比 本周/上一周
videonum_per = round(this_week_videonum / last_week_videonum * 100, 1) if last_week_videonum != 0 else -1
one_line.update({"releaser": releaser,
"platform": platform,
str(last_week) + "周_videonum": last_week_videonum,
str(this_week) + '周_videonum': this_week_videonum,
"百分比": videonum_per,
})
if videonum_per > 50 or videonum_per == -1:
result['above'].append(one_line)
else:
result['below'].append(one_line)
result['above'] = sorted(result['above'], key=lambda x: x['百分比'])
result['below'] = sorted(result['below'], key=lambda x: x['百分比'])
email_group = ['zhouyujiang@csm.com.cn',
'hanye@csm.com.cn',
'zhangtianli@csm.com.cn',
'zhangminghui@csm.com.cn',
'litao@csm.com.cn'
]
email_msg_suffix = ('\n\n\n'
+ '-' * 80 + '\n'
+ '这是自动发送的邮件,可以不用回复。\n'
+ 'This is an automatically sent message. You do NOT need to reply.\n')
# # send the alert email
csm_mail_service = 'mail.csm.com.cn'
sender = 'zhouyujiang@csm.com.cn'
email_subj = 'w' + str(this_week) + '_w' + str(last_week) + '_releaser_video_num预警 %s' % (today.isoformat()[:10])
email_msg_body = ''
email_msg_body += str(this_week) + '周和' + str(last_week) + '周 发布者video_num对比\n\n'
email_msg_body += '低于/高于/总共 预警值(50%%):%s/%s/%d\n\n' % (len(result['below']), len(result['above']), count)
for key in result.keys():
if key == 'below':
email_msg_body += '低于预警值:\n'
email_msg_body += '\t发布者 平台 上周 本周 百分比\n'
for one in result[key]:
for name in one.keys():
if name == 'releaser':
email_msg_body += '\t%s' % (one[name])
c = (len(one[name].encode('utf-8')) - len(one[name])) / 2 + len(one[name])
while c < 30:
email_msg_body += ' '
c += 1
elif name == 'platform':
email_msg_body += '%s' % (one[name])
c = (len(one[name].encode('utf-8')) - len(one[name])) / 2 + len(one[name])
while c < 15:
email_msg_body += ' '
c += 1
elif 'videonum' in name:
email_msg_body += '%s' % (one[name])
c = len(str(one[name]))
while c < 8:
email_msg_body += ' '
c += 1
elif name == '百分比':
email_msg_body += '%s%%' % (one[name])
email_msg_body += '\n'
else:
email_msg_body += '高于预警值:\n'
email_msg_body += '\t发布者 平台 上周 本周 百分比\n'
for one in result[key]:
for name in one.keys():
# if name == '百分比':
# email_msg_body += '\t\t%s:%s%%' % (name, one[name])
# else:
# email_msg_body += '\t\t%s:%s' % (name, one[name])
if name == 'releaser':
email_msg_body += '\t%s' % (one[name])
c = (len(one[name].encode('utf-8')) - len(one[name])) / 2 + len(one[name])
while c < 30:
email_msg_body += ' '
c += 1
elif name == 'platform':
email_msg_body += '%s' % (one[name])
c = (len(one[name].encode('utf-8')) - len(one[name])) / 2 + len(one[name])
while c < 15:
email_msg_body += ' '
c += 1
elif 'videonum' in name:
email_msg_body += '%s' % (one[name])
c = len(str(one[name]))
while c < 8:
email_msg_body += ' '
c += 1
elif name == '百分比':
email_msg_body += '%s%%' % (one[name])
email_msg_body += '\n'
if email_msg_body != '':
email_msg_body += email_msg_suffix
print('email_msg_body:\n', email_msg_body, file=f_log)
email_msg = EmailMessage()
email_msg.set_content(email_msg_body)
email_msg['Subject'] = email_subj
email_msg['From'] = sender
email_msg['to'] = email_group
try:
server = smtplib.SMTP(host=csm_mail_service)
server.send_message(email_msg)
server.quit()
print('Successfully sent email to %s' % (email_group),
datetime.datetime.now(), file=f_log)
print('email_msg:\n', email_msg, file=f_log)
except:
print('Failed to connect email server.', datetime.datetime.now(), file=f_log)
print('\n\n', file=f_log)
f_log.close()
if __name__ == '__main__':
releaser_video_alert_weekly()
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 31 13:14:14 2018
@author: hanye
"""
import smtplib, sys, email.utils, email_config_MSoutlook
mailserver=email_config_MSoutlook.smtpservername
#smtp_port=email_config_MSoutlook.smtp_port
mailserver='mail.csm.com.cn'
username=r'csm\hanye'
passwd='hy_csm_17'
From='ccr_maintainance@csm.com.cn'
To='hanye@csm.com.cn'
Tos=['hanye@csm.com.cn', 'yeahan@outlook.com']
Subj='Test maintainance warning'
Date=email.utils.formatdate()
text=('From: %s\nTo: %s\nDate: %s\nSubject: %s\n\n' % (From, To, Date, Subj))
print('Type message text, end with line=[Ctrl+d (Unix), Ctrl+z (Windows)]')
mail_body='This is a test.\nYou\'re receving this is because there might be something wrong with...'
print('Connecting...')
server=smtplib.SMTP(host=mailserver) # note here should NOT use SMTP_SSL
#server.starttls() # must NOT use this with csm mail service
#server.login(username, passwd) # must NOT use this to login server, no login needed for standard smtp
failed=server.sendmail(From, Tos, text+mail_body+email_config_MSoutlook.sender_signature)
server.quit()
if failed:
print('Failed recipients:', failed)
else:
print('No errors.')
print('Bye')
import elasticsearch
import smtplib
from email.message import EmailMessage
import datetime
import sys
def task_alert(days_from_running, f_log=None):
email_group = {
'task-stats': [
'zhouyujiang@csm.com.cn',
'hanye@csm.com.cn',
"litao@csm.com.cn",
"gengdi@csm.com.cn"
]
}
email_msg_suffix = ('\n\n\n'
+ '-' * 80 + '\n'
+ '这是自动发送的邮件,可以不用回复。\n'
+ 'This is an automatically sent message. You do NOT need to reply.\n')
es = elasticsearch.Elasticsearch(hosts='192.168.17.13', port=9200)
index_task = 'task-stats'
# doc_type_task = ''
p = 86400000
today = datetime.datetime.now() - datetime.timedelta(days=days_from_running)
t = datetime.datetime(today.year, today.month, today.day)
print(t)
stamp = int(t.timestamp() * 1000)
print(stamp)
if f_log == None:
path = '/home/zhouyujiang/zyj_log'
log_fn = 'email_alert_for_%s_log' % datetime.datetime.strftime(today, '%b-%Y')
f_log = open(path + log_fn, 'a')
# pass
else:
f_log = sys.stdout
print('*' * 80, file=f_log)
print('log timestamp ', datetime.datetime.now(), file=f_log)
print('Checking task for fetch_date', today.isoformat()[:10], file=f_log)
alert_msg = {}
for is_done in [ False,True]:
alert_msg[is_done] = {}
search_body = {
"query": {
"bool": {
"filter": [
{"term": {"is_done": is_done}},
{"range": {"start_time": {"gte": stamp - p, "lt": stamp}}}
]
}
}
}
search_resp = es.search(index=index_task,
body=search_body,
request_timeout=100,
size=1000)
task_num = search_resp['hits']['total']
fetch_date_str = today.isoformat()[:10]
for i in search_resp['hits']['hits']:
index_name = i['_id']
# print(index_name)
try:
alert_msg[is_done][index_name] = [i['_source']['task_description'], ]
except:
alert_msg[is_done][index_name] = ['', ]
start_time_int = int(i['_source']['start_time'] / 1000)
start_time_H = datetime.datetime.fromtimestamp(start_time_int).isoformat(sep=' ')
alert_msg[is_done][index_name].append(start_time_H)
try:
end_time_int = int(i['_source']['end_time'] / 1000)
end_time_H = datetime.datetime.fromtimestamp(end_time_int).isoformat(sep=' ')
alert_msg[is_done][index_name].append(end_time_H)
except:
alert_msg[is_done][index_name].append('')
# print("***********************************")
# for j in alert_msg[True].keys():
# print(j)
# print("***********************************")
# for k in alert_msg[False].keys():
# print(k)
# # send the alert email
csm_mail_service = 'mail.csm.com.cn'
sender = 'zhouyujiang@csm.com.cn'
stats_type_dict = {
True: '完成的任务',
False: '未完成的任务'
}
email_subj = ' task-stats预警 %s' % (today.isoformat()[:10])
email_msg_body = ''
email_msg_body += '未完成任务/完成任务:%s/%s\n\n' %(len(alert_msg[False].keys()),len(alert_msg[True].keys()))
for is_done in alert_msg:
email_msg_body += ('%s :\n' % (stats_type_dict[is_done]))
for id in alert_msg[is_done].keys():
email_msg_body += ('\t_id :%s\n\ttask_description :%s\n\tstart_time :%s\n\tend_time :%s\n' % (
id, alert_msg[is_done][id][0], alert_msg[is_done][id][1], alert_msg[is_done][id][2]))
email_msg_body += ('\n')
email_msg_body += ('\n')
email_msg_body += '\nchecking data source index name: %s\n\n\n' % (index_task)
if email_msg_body != '':
email_msg_body += email_msg_suffix
print('email_msg_body:\n', email_msg_body, file=f_log)
email_msg = EmailMessage()
email_msg.set_content(email_msg_body)
email_msg['Subject'] = email_subj
email_msg['From'] = sender
email_msg['to'] = email_group[index_task]
try:
server = smtplib.SMTP(host=csm_mail_service)
server.send_message(email_msg)
server.quit()
print('Successfully sent email to %s for %s' % (email_group[index_task], index_task),
datetime.datetime.now(), file=f_log)
print('email_msg:\n', email_msg, file=f_log)
except:
print('Failed to connect email server.', datetime.datetime.now(), file=f_log)
print('\n\n', file=f_log)
f_log.close()
if __name__ == '__main__':
task_alert(0)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment