# -*- coding:utf-8 -*- # @Time : 2019/10/28 15:48 # @Author : litao import elasticsearch import datetime import pandas as pd from elasticsearch.helpers import scan from func_cal_doc_id import cal_doc_id from write_data_into_es.func_get_releaser_id import * import copy, os, time, json import zipfile import redis from maintenance.send_email_with_file_auto_task import write_email_task_to_redis hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) from redis.sentinel import Sentinel sentinel = Sentinel([('192.168.17.65', 26379), ('192.168.17.66', 26379), ('192.168.17.67', 26379) ],socket_timeout=0.5) # 查看master节点 master = sentinel.discover_master('ida_redis_master') # 查看slave 节点 slave = sentinel.discover_slaves('ida_redis_master') # 连接数据库 rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=2, decode_responses=True) # es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) # rds = redis.StrictRedis(host='192.168.17.60', port=6379, db=2, decode_responses=True) def get_project_releaser(project_name): res_list = [] search_body = { "query": { "match": { "project_tags.keyword": project_name } } } res_scan = scan(client=es, query=search_body, index="target_releasers") for res in res_scan: res_list.append(res["_source"]) return res_list def zipDir(dirpath, outFullName): """ 压缩指定文件夹 :param dirpath: 目标文件夹路径 :param outFullName: 压缩文件保存路径+xxxx.zip :return: 无 """ remove_list = [] if ".zip" not in outFullName: outFullName += ".zip" file_path = os.path.split(outFullName) is_path = os.path.exists(file_path[0]) if not is_path: os.mkdir(file_path[0]) time.sleep(0.5) zip = zipfile.ZipFile(outFullName, "w", zipfile.ZIP_DEFLATED) for path, dirnames, filenames in os.walk(dirpath): # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩 fpath = path.replace(dirpath, '') for filename in filenames: zip.write(os.path.join(path, filename), os.path.join(fpath, filename)) remove_list.append(os.path.join(path, filename)) zip.close() for p in remove_list: os.remove(p) time.sleep(0.5) os.removedirs(dirpath) return outFullName def week_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() now_canlendar = now.isocalendar() if not cycle_num: week_canlendar = now_canlendar else: week_canlendar = (now.year, cycle_num + 1, 0) year = week_canlendar[0] this_week = week_canlendar[1] - 1 if this_week == 0: last_year = year - 1 this_week = 1 else: last_year = year if this_week == 1: last_week = "52" else: last_week = this_week - 1 today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) # today = datetime.datetime(year=2018, month=12, day=25) first_day_in_week = today - datetime.timedelta( days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1] + 1)) fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3) last_day_in_week = first_day_in_week + datetime.timedelta(days=7) last_day_ts = int(last_day_in_week.timestamp() * 1e3) this_week_index = 'short-video-weekly' this_week_doc = 'daily-url-' + str(year) + '_w' + format(this_week, '>02d') + '_s1' last_week_index = 'releaser-weekly-short-video' last_week_doc = 'doc' if compare_type == "new_released": this_week_index = last_week_index this_week_doc = last_week_doc return this_week_index, this_week_doc, last_week_index, last_week_doc, fisrt_day_ts, last_day_ts, this_week, last_week, last_year def month_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() if not year: if cycle_num <= now.month: year = now.year else: year = now.year - 1 if not cycle_num: this_mon = now.month - 1 if now.month != 1 else 12 last_mon = this_mon - 1 if this_mon > 1 else this_mon - 1 + 12 if last_mon == 12: last_year = year - 1 else: last_year = year else: this_mon = cycle_num last_mon = cycle_num - 1 if this_mon > 1 else this_mon - 1 + 12 if last_mon == 12: last_year = year - 1 else: last_year = year first_day_ts = int(datetime.datetime(year=year, month=this_mon, day=1).timestamp() * 1e3) if this_mon == 12: next_year = last_year + 1 next_month = 1 else: next_year = year next_month = this_mon + 1 last_day_ts = int(datetime.datetime(year=next_year, month=next_month, day=1).timestamp() * 1e3) this_mon_index = "short-video-production-%s" % year this_mon_doc = "daily-url-%s" % ( datetime.datetime(year=next_year, month=next_month, day=1) + datetime.timedelta(days=-1)).strftime( "%Y-%m-%d") last_mon_index = "releaser" last_mon_doc = "releasers" if compare_type == "new_released": this_mon_index = last_mon_index this_mon_doc = last_mon_doc return this_mon_index, this_mon_doc, last_mon_index, last_mon_doc, first_day_ts, last_day_ts, this_mon, last_mon, last_year def quarter_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() if not cycle_num: this_quarter = int(now.month / 3) + 1 else: this_quarter = cycle_num last_quarter = this_quarter - 1 if cycle_num > 1 else 4 if last_quarter == 4: last_year = year - 1 else: last_year = year first_day_ts = int(datetime.datetime(year=year, month=(this_quarter - 1) * 3 + 1, day=1).timestamp() * 1e3) last_day_ts = int(datetime.datetime(year=year, month=this_quarter * 3 + 1, day=1).timestamp() * 1e3) this_quarter_index = "short-video-quarter-%s" % year this_quarter_doc = "daily-url-2019-Q%s" % this_quarter last_quarter_index = "releaser" last_quarter_doc = "releasers-%s-Q%s" % (last_year, last_quarter) if compare_type == "new_released": this_quarter_index = last_quarter_index this_quarter_doc = last_quarter_doc return this_quarter_index, this_quarter_doc, last_quarter_index, last_quarter_doc, first_day_ts, last_day_ts, this_quarter, last_quarter, last_year def create_index(year=None, cycle="week", cycle_num=None, compare_type=None, project_type="short_video", data_type="detail", **kwargs): """ :param year: 年份 可为空 :param cycle: 周期 week/month/quarter/year/("all-time" 用于微信导出自定义数据) :param cycle_num: 周期 1/2/3/4/5/6 :param compare_type:默认为空 可不填 :param project_type:数据平台 short_video/weibo/weixin/all-time :param data_type:数据类型 detail/summary :param kwargs: 自定义索引和切片 参数如下 :return: """ first_day_ts = "" last_day_ts = "" this_cycle = "" try: if kwargs: this_cycle_index = kwargs.get("this_cycle_index") this_cycle_doc = kwargs.get("this_cycle_doc") last_cycle_index = kwargs.get("last_cycle_index") last_cycle_doc = kwargs.get("last_cycle_doc") first_day_ts = kwargs.get("first_day_ts") last_day_ts = kwargs.get("last_day_ts") else: this_cycle_index = None except: this_cycle_index = None one_line = {} if not this_cycle_index: if cycle == "week": this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle, last_cycle, last_year = week_num( year, cycle, cycle_num, compare_type) elif cycle == "month": this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle, last_cycle, last_year = month_num( year, cycle, cycle_num, compare_type) elif cycle == "quarter": this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle, last_cycle, last_year = quarter_num( year, cycle, cycle_num, compare_type) elif cycle == "year": pass # print(this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, # last_cycle, last_year) if data_type == "detail": if project_type == "short_video": return this_cycle_index, this_cycle_doc, first_day_ts, last_day_ts, this_cycle elif project_type == "weibo": return "ronghe_weibo_monthly", "doc", first_day_ts, last_day_ts, this_cycle elif project_type == "weixin": return "ronghe_weixin", "doc", first_day_ts, last_day_ts, "ronghe_weixin" elif project_type == "all-time": return "short-video-all-time-url", "all-time-url", None, None, None elif project_type == "target_releasers": return "target_releasers", "doc", None, None, None elif project_type == "purchased_releasers": return "purchased_releasers", "doc", None, None, None else: if project_type == "short_video": return last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle elif project_type == "weibo": return "ronghe_weibo_releaser", "doc", first_day_ts, last_day_ts, this_cycle elif project_type == "weixin": if cycle == "all-time": return "ronghe_weixin", "doc", first_day_ts, last_day_ts, "ronghe_weixin" return "ronghe_weixin_releaser", "doc", first_day_ts, last_day_ts, this_cycle elif project_type == "all-time": return "short-video-all-time-url", "all-time-url", None, None, None elif project_type == "purchased_releasers": return "purchased_releasers", "doc", None, None, None def create_search_body(year=None, cycle="week", cycle_num=None, project_type="short_video", data_type="detail", start_ts=None, end_ts=None, releaser=None, platform=None, releaser_id_str=None, **kwargs): if data_type == "detail": search_body = { "query": { "bool": { "filter": [ {"term": {"releaser_id_str": releaser_id_str}}, {"range": {"release_time": {"gte": start_ts, "lt": end_ts}}}, {"range": {"duration": {"lte": 600}}} ] } } } if project_type == "weibo": search_body["query"]["bool"]["filter"].pop(0) search_body["query"]["bool"]["filter"].pop(1) search_body["query"]["bool"]["filter"].append({"term": {"UID.keyword": releaser_id_str}}) if cycle == "month": search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}}) search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}}) elif project_type == "target_releasers": search_body = kwargs.get("extra_dic").get("search_body") elif project_type == "weixin": search_body["query"]["bool"]["filter"].pop(2) search_body["query"]["bool"]["filter"].pop(0) search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str.keyword": releaser_id_str}}) # elif cycle == "month" and project_type == "short_video": # search_body["query"]["bool"]["filter"].pop(0) # search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str.keyword": releaser_id_str}}) else: search_body = { "query": { "bool": { "filter": [ {"term": {"releaser_id_str.keyword": releaser_id_str}} ] } } } if project_type == "weibo": search_body["query"]["bool"]["filter"].pop(0) search_body["query"]["bool"]["filter"].append({"term": {"UID.keyword": releaser_id_str}}) if cycle == "month": search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}}) search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}}) elif project_type == "short_video": if cycle == "month": search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}}) search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}}) elif cycle == "week": search_body["query"]["bool"]["filter"].append({"term": {"data_week_num": cycle_num}}) search_body["query"]["bool"]["filter"].append({"term": {"data_week_year": year}}) elif project_type == "weixin": if cycle == "month": search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}}) search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}}) elif cycle == "all-time": search_body["query"]["bool"]["filter"].append( {"range": {"release_time": {"gte": start_ts, "lt": end_ts}}}) search_body["aggs"] = { "sum_play": { "sum": { "field": "play_count" } }, "sum_favorite": { "sum": { "field": "favorite_count" } }, "sum_comment": { "sum": { "field": "comment_count" } }, "sum_repost": { "sum": { "field": "repost_count" } } } elif project_type == "all-time": search_body["query"]["bool"]["filter"].pop(0) search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str": releaser_id_str}}) search_body["query"]["bool"]["filter"].append({"range": {"duration": {"lte": 600}}}) search_body["query"]["bool"]["filter"].append({"range": {"release_time": {"gte": start_ts, "lt": end_ts}}}) search_body["aggs"] = { "sum_play": { "sum": { "field": "play_count" } }, "sum_favorite": { "sum": { "field": "favorite_count" } }, "sum_comment": { "sum": { "field": "comment_count" } }, "sum_repost": { "sum": { "field": "repost_count" } } } elif project_type == "purchased_releasers": search_body["query"]["bool"]["filter"].pop(0) search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str": releaser_id_str}}) print(search_body) return search_body def detail_to_dic(search_res, data_type="short_video"): if data_type == "target_releasers": for one_scan in search_res: project_tags = one_scan["_source"].get("project_tags") department_tags = one_scan["_source"].get("department_tags") project_type = "" department_type = "" if project_tags: for i in project_tags: if not project_type: project_type += i else: project_type += "/" + i if department_tags: for department in department_tags: if not department_type: department_type += department else: department_type += "/" + department try: re_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("post_time") / 1000) time_str = re_time.strftime('%Y-%m-%d %H:%M:%S') except: time_str = "" line_dict = ({"id": one_scan['_id'], 'releaser': one_scan['_source'].get("releaser"), 'platform': one_scan['_source'].get("platform"), "post_time": time_str, "releaserUrl": one_scan['_source'].get("releaserUrl"), "releaser_id_str": one_scan['_source'].get("releaser_id_str"), "project_type": project_type, "department_tags": department_type }) if not line_dict: print(line_dict) yield line_dict else: if data_type != "weibo": for one_scan in search_res: try: re_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("release_time") / 1000) time_str = re_time.strftime('%Y-%m-%d %H:%M:%S') fc_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("fetch_time") / 1000) fc_str = fc_time.strftime('%Y-%m-%d %H:%M:%S') except: time_str = "" fc_str = "" try: title = one_scan['_source'].get("title") title = title.replace("\n", "") title = title.replace("\r", "") except: title = "" line_dict = ({"id": one_scan['_id'], 'releaser': one_scan['_source'].get("releaser"), 'title': title, 'platform': one_scan['_source'].get("platform"), 'url': one_scan['_source'].get("url"), "release_time": time_str, "releaserUrl": one_scan['_source'].get("releaserUrl"), "releaser_id_str": one_scan['_source'].get("releaser_id_str"), "fetch_time": fc_str, "play_count": one_scan['_source'].get("play_count"), "favorite_count": one_scan['_source'].get("favorite_count"), "comment_count": one_scan['_source'].get("comment_count"), "repost_count": one_scan['_source'].get("repost_count"), "duration": one_scan['_source'].get("duration"), "top": one_scan['_source'].get("top") }) yield line_dict else: for one_scan in search_res: try: re_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("release_time") / 1000) time_str = re_time.strftime('%Y-%m-%d %H:%M:%S') fc_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("fetch_time") / 1000) fc_str = fc_time.strftime('%Y-%m-%d %H:%M:%S') except: time_str = "" fc_str = "" try: title = one_scan['_source'].get("wb_text").replace("\r", "").replace("\n", "") except: title = "" line_dict = ({ 'ID': one_scan['_source'].get("wb_bowen_id"), "UID": str(one_scan['_source']['UID']), '微博昵称': one_scan['_source'].get("wb_name"), '微博内容': title, 'platform': one_scan['_source'].get("platform"), '博文地址': one_scan['_source'].get("wb_bowen_address"), "发布日期": time_str, "抓取时间": fc_str, # "play_count": one_scan['_source'].get("play_count"), "点赞量": one_scan['_source'].get("favorite_count"), "评论量": one_scan['_source'].get("comment_count"), "转发量": one_scan['_source'].get("repost_count"), # "duration": one_scan['_source'].get("duration"), "配图连接": one_scan['_source'].get("video_address"), "博文配图": one_scan['_source'].get("wb_pic"), "博文长度": one_scan['_source'].get("wb_bowen_length"), "短链标题": one_scan['_source'].get("wb_type_text"), "短链播放量-视频": one_scan['_source'].get("wb_video_number"), "短链地址-视频": one_scan['_source'].get("video_address"), "短链地址": one_scan['_source'].get("wb_type_address"), "是否原创": one_scan['_source'].get("wb_father_UID"), "原创账户": one_scan['_source'].get("wb_father_bowen_id"), "博文类型": one_scan['_source'].get("wb_bowen_type"), }) yield line_dict def summary_to_dic(search_res, fans_count=None, fans_date=None, releaser_id_str=None, platform=None,start_ts=None, end_ts=None, **kwargs): extra_dic = kwargs.get("extra_dic") if search_res: line_dict = {} video_num = search_res['hits']['total'] play_count_sum = search_res['aggregations']['sum_play']['value'] favorite_count_sum = search_res['aggregations']['sum_favorite']['value'] comment_count_sum = search_res['aggregations']['sum_comment']['value'] repost_count_sum = search_res['aggregations']['sum_repost']['value'] month_str = "" if start_ts: re_time = datetime.datetime.fromtimestamp(start_ts/ 1000) month_str += re_time.strftime('%Y-%m-%d') + "至" if end_ts: re_time = datetime.datetime.fromtimestamp(end_ts / 1000) month_str += re_time.strftime('%Y-%m-%d') line_dict.update({'releaser': "", 'platform': platform, 'releaserUrl': "", 'video_num': video_num, 'play_count_sum': play_count_sum, 'favorite_count_sum': favorite_count_sum, 'comment_count_sum': comment_count_sum, 'repost_count_sum': repost_count_sum, "data_month": month_str, "this_fans": fans_count, "production_org_category": "", "tv_station": "", "releaser_id_str": releaser_id_str, "fans_date": fans_date }) line_dict.update(extra_dic) return line_dict def purchased_releaser_to_dic(search_res, releaser_is_str=None, line_dict=None,releaser=None,platform=None,releaserUrl=None,**kwargs): count_has = 0 line_dict_new = copy.deepcopy(line_dict) for res in search_res: count_has = 1 departments = res["_source"].get("departments") department_str = "" if departments: for department in departments: if not department_str: department_str += department else: department_str += "/" + department try: re_time = datetime.datetime.fromtimestamp(res['_source'].get("end_purchase_time") / 1000) time_str_end = re_time.strftime('%Y-%m-%d %H:%M:%S') except: time_str_end = "" try: re_time = datetime.datetime.fromtimestamp(res['_source'].get("start_purchase_time") / 1000) time_str_start = re_time.strftime('%Y-%m-%d %H:%M:%S') except: time_str_start = "" line_dict_new = { 'releaser': releaser, 'platform': platform, "start_purchase_time": time_str_start, "end_purchase_time": time_str_end, "releaserUrl": releaserUrl, "releaser_id_str": releaser_is_str, "is_purchased" : 1, "departments": department_str, "is_purchased_str": res["_source"].get("is_purchased_str") } return line_dict_new if not count_has: line_dict_new = { 'releaser': releaser, 'platform': platform, "start_purchase_time": "", "end_purchase_time": "", "releaserUrl": releaserUrl, "releaser_id_str": releaser_is_str, "is_purchased": "", "departments": "", "is_purchased_str": "" } return line_dict_new def releaser_to_dic(search_res, releaser_is_str=None, line_dict=None, fans=None, fc_str=None, **kwargs): count_has = 0 line_dict_list = [] for res in search_res: line_dict_new = copy.deepcopy(line_dict) count_has = 1 print(res) video_num = res['_source']['video_num'] play_count_sum = res['_source']['play_count_sum'] favorite_count_sum = res['_source']['favorite_count_sum'] comment_count_sum = res['_source']['comment_count_sum'] repost_count_sum = res['_source'].get('repost_count_sum') ranking_by_play_count_sum = res['_source'].get('ranking_by_play_count_sum') stats_type = res['_source']['stats_type'] _id = res['_id'] line_dict_new.update({ 'video_num': video_num, 'play_count_sum': play_count_sum, 'favorite_count_sum': favorite_count_sum, 'comment_count_sum': comment_count_sum, "stats_type": stats_type, "repost_count_sum": repost_count_sum, "ranking_by_play_count_sum": ranking_by_play_count_sum, "_id": _id, "releaser_followers_count": fans, "this_fans": fc_str, "releaser_id_str": releaser_is_str }) yield line_dict_new if not count_has: new_dic = { 'video_num': 0, 'play_count_sum': 0, 'favorite_count_sum': 0, 'comment_count_sum': 0, "stats_type": "new_released", "repost_count_sum": 0, "releaser_id_str": releaser_is_str, } new_dic.update(line_dict) ob_dic = { 'video_num': 0, 'play_count_sum': 0, 'favorite_count_sum': 0, 'comment_count_sum': 0, "stats_type": "observed", "repost_count_sum": 0, "releaser_id_str": releaser_is_str, "releaser_followers_count": fans, "this_fans": fc_str, } ob_dic.update(line_dict) line_dict_list.append(new_dic) line_dict_list.append(ob_dic) for dic in line_dict_list: yield dic def get_follwer_num(releaser_id_str=None, ts=None): search_short_body_fans = { "query": { "bool": { "filter": [ {"term": {"releaser_id_str.keyword": releaser_id_str}}, {"range": {"fetch_time": {"gte": ts}}} ] } }, "sort": [ { "timestamp": { "order": "asc" } } ] } try: fans_count = es.search(index="releaser_fans", doc_type="doc", body=search_short_body_fans) fans = fans_count["hits"]["hits"][0]["_source"]["releaser_followers_count"] fans_date = datetime.datetime.fromtimestamp( fans_count["hits"]["hits"][0]["_source"]["fetch_time"] / 1000) fc_str = fans_date.strftime('%Y-%m-%d %H:%M:%S') except: fans = None fc_str = None return fans, fc_str def func_es_to_csv(file_path, read_index, read_doc, year=None, cycle="week", cycle_num=None, project_type="short_video", data_type="detail", start_ts=None, end_ts=None, save_file_path="", file_task_name=None, this_cycle=None, **kwargs): all_dic = { "toutiao": [], "抖音": [], "腾讯视频": [], "腾讯新闻": [], "haokan": [], "miaopai": [], "网易新闻": [], "new_tudou": [], "pearvideo": [], "kwai": [], "weibo": [], "weixin": [], "tencent_combine": [], "purchased_releasers": [], } all_plant = [] if not cycle_num: cycle_num = this_cycle if file_path: try: f = open(file_path, 'r', encoding="gb18030") head = f.readline() head_list = head.strip().split(',') except: f = file_path for line, i in enumerate(f): if type(file_path) != list: try: line_list = i.strip().split(',') line_dict = dict(zip(head_list, line_list)) except: line_dict = f else: line_dict = i releaser_id_str = line_dict.get("releaser_id_str") platform = line_dict.get("platform") releaserUrl = line_dict.get("releaserUrl") releaser = line_dict.get("releaser") if not releaser_id_str: releaser_id_str = line_dict.get("releaser_id") if not releaser_id_str: releaser_id = get_releaser_id(platform=platform, releaserUrl=releaserUrl) if releaser_id: if platform != "weixin" and platform != "weibo": releaser_id_str = platform + '_' + releaser_id else: releaser_id_str = releaser_id else: releaser_id_str = "" search_body = create_search_body(year=year, cycle=cycle, cycle_num=cycle_num, project_type=project_type, data_type=data_type, start_ts=start_ts, end_ts=end_ts, platform=platform, releaser=releaser, releaser_id_str=releaser_id_str,**kwargs) print(search_body) search_res = scan(es, query=search_body, index=read_index, doc_type=read_doc,preserve_order=True) # file_name = "%s_%s_%s_%s_%s" % (project_type, year, cycle, cycle_num, data_type) if data_type == "detail": for data in detail_to_dic(search_res, data_type=project_type): all_plant.append(data) all_dic[line_dict["platform"]].append(data) else: if project_type == "all-time" or cycle == "all-time": fans, fc_str = get_follwer_num(releaser_id_str=releaser_id_str, ts=end_ts) search_res = es.search(body=search_body, index=read_index, doc_type=read_doc) res_dic = summary_to_dic(search_res, fans_count=fans, fans_date=fc_str, releaser=releaser, releaser_id_str=releaser_id_str, releaserUrl=releaserUrl, platform=platform, extra_dic=line_dict,start_ts=start_ts, end_ts=end_ts) all_plant.append(res_dic) elif project_type == "purchased_releasers": res_dic = purchased_releaser_to_dic(search_res,releaser=releaser,platform=platform,releaserUrl=releaserUrl,releaser_is_str=releaser_id_str,line_dict=line_dict) all_plant.append(res_dic) else: fans, fc_str = get_follwer_num(releaser_id_str=releaser_id_str, ts=end_ts) for dic in releaser_to_dic(search_res, line_dict=line_dict, fans=fans, fc_str=fc_str, releaser_is_str=releaser_id_str): all_plant.append(dic) if data_type == "detail": if project_type != "weibo": columns = ["id", 'releaser', 'platform', 'title', "releaserUrl", 'url', 'releaser_id_str', "release_time", "fetch_time", "play_count", "favorite_count", "comment_count", "repost_count", "duration", "top" ] else: columns = ['ID', "UID", '微博昵称', '微博内容', 'platform', '博文地址', "发布日期", "抓取时间", "点赞量", "评论量", "转发量", "配图连接", "博文配图", "博文长度", "短链标题", "短链播放量-视频", "短链地址-视频", "短链地址", "是否原创", "原创账户", "博文类型", ] else: if project_type == "purchased_releasers": columns = ['releaser','platform',"start_purchase_time","end_purchase_time","releaserUrl","releaser_id_str","is_purchased","departments","is_purchased_str"] else: columns = ['releaser', "ranking_by_play_count_sum", "total_releaser_num_on_platform", 'platform', "video_num", "play_count_sum", "play_count_sum_total_on_platform", "video_num_sum_total_on_platform", "favorite_count_sum", "comment_count_sum", "repost_count_sum", "releaser_followers_count", "this_fans", "last_fans", "fans_change", "data_year", "data_month", "stats_type", "production_org_category", "releaserUrl", "releaser_id_str", "fans_date", "top" ] for d in line_dict: if d not in columns: columns.append(d) if not all_plant: return save_file_path + file_task_name data = pd.DataFrame(all_plant) s = datetime.datetime.now() ss = str(s)[0:19].replace(' ', '-').replace(':', '-') try: os.mkdir(save_file_path + file_task_name) except Exception as e: print("save error 716", e) pass save_path = '%s%s/%s_%s_%s_%s.csv' % ( save_file_path, file_task_name, file_task_name, project_type, data_type, ss) print(save_path) data.to_csv(save_path, encoding='gb18030', columns=columns) if data_type == "detail" and project_type == "short_video": for d in all_dic: if all_dic[d]: data = pd.DataFrame(all_dic[d]) s = datetime.datetime.now() ss = str(s)[0:19].replace(' ', '-').replace(':', '-') data.to_csv('%s/%s_%s_%s.csv' % (save_file_path + file_task_name, file_task_name, d, ss), encoding='gb18030', columns=columns) time.sleep(0.5) return save_file_path + file_task_name else: search_body = kwargs.get("search_body") search_res = scan(es, query=search_body, index=read_index, doc_type=read_doc) file_name = "%s_%s" % (project_type, data_type) if data_type == "detail": for data in detail_to_dic(search_res, data_type=project_type): if data: all_plant.append(data) data = pd.DataFrame(all_plant) s = datetime.datetime.now() ss = str(s)[0:19].replace(' ', '-').replace(':', '-') try: os.mkdir(save_file_path + file_task_name) except Exception as e: print(e) save_path = '%s%s/%s_%s.csv' % (save_file_path, file_task_name, file_name, ss) print(save_path) data.to_csv(save_path, encoding='gb18030') return save_file_path + file_task_name def timestamp_range(start_timestamp, end_timestamp): time_stamp_list = [] start_date = datetime.datetime.fromtimestamp(start_timestamp / 1e3) start_year = start_date.year start_month = start_date.month end_date = datetime.datetime.fromtimestamp(end_timestamp / 1e3) if start_date.day == 1 and end_date.day == 1: while start_timestamp <= end_timestamp: time_stamp_list.append(start_timestamp) if start_month == 12: start_month = 1 start_year += 1 else: start_month += 1 start_date = datetime.datetime(year=start_year, month=start_month, day=1) start_timestamp = int(start_date.timestamp() * 1e3) return time_stamp_list else: return [start_timestamp, end_timestamp] def es_to_csv(year=None, cycle="week", cycle_num=None, compare_type=None, project_type="short_video", data_type_list=["detail"], file_path=None, file_task_name=None, save_file_path=None, start_timestamp=None, end_timestamp=None, split_month=False, project_name=None, **kwargs): """ :param year: int 年份 可为空 :param cycle: str 周期 week/month/quarter/year/(all-time 微信使用自定义时间)/ :param cycle_num: int 周期 1/2/3/4/5/6 :param compare_type:默认为空 可不填 :param project_type:str 数据平台 short_video/weibo/weixin/all-time/target_relasers/purchased_releasers(data_type_list需为summary) :param data_type_list: list 数据类型 ["detail","summary"] :param file_path: str 传入文件的路径 :param file_task_name: str 传入的任务名 :param split_month: bool 是否进行分月 只能在all-time参数下使用 :param project_name: Str 如果传入的是项目名,则不使用文件 :param kwargs:dict search_body = {} 自定义请求体 :return: """ project_dic = { "weixin": [], "weibo": [], "short_video": [], "all-time":[], "target_releasers":[], "purchased_releasers":[] } # if project_name: file_path = get_project_releaser(project_name) # file_path = r"D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2020-01\微博模板 (1)84781418.csv" if not save_file_path: save_file_path = "/opt/code/dataManagementSys/dataManagementSys/dataManagementSys/static/zipfile/" # save_file_path = r"G:\releaserSystem\dataManagementSys\dataManagementSys\static\zipfile\\" if not year: year = datetime.datetime.now().year try: f = open(file_path, 'r', encoding="gb18030") head = f.readline() head_list = head.strip().split(',') except: f = file_path if file_path: for line, i in enumerate(f): if type(file_path) != list: try: line_list = i.strip().split(',') line_dict = dict(zip(head_list, line_list)) except: line_dict = f else: line_dict = i if line_dict["platform"] == "weixin": project_dic["weixin"].append(line_dict) elif line_dict["platform"] == "weibo": project_dic["weibo"].append(line_dict) if project_type == "purchased_releasers": project_dic["purchased_releasers"].append(line_dict) else: if project_type == "short_video": project_dic["short_video"].append(line_dict) else: project_dic["all-time"].append(line_dict) else: if project_type == "target_releasers": project_dic["target_releasers"].append(None) for platform_type in project_dic: if not project_dic[platform_type]: continue if platform_type == "weixin" or platform_type == "weibo": project_type_temp = platform_type else: project_type_temp = project_type if project_type != platform_type: continue for data_type in data_type_list: index, doc, start_ts, end_ts, this_cycle = create_index(year=year, cycle=cycle, cycle_num=cycle_num, compare_type=compare_type, project_type=project_type_temp, data_type=data_type, first_day_ts=start_timestamp, last_day_ts=end_timestamp, **kwargs) print(index, doc, start_ts, end_ts, this_cycle) try: if not split_month: if start_timestamp and end_timestamp: start_ts = start_timestamp end_ts = end_timestamp try: if platform_type == "target_releasers": project_dic[platform_type] = [] res_path = func_es_to_csv(project_dic[platform_type], index, doc, year=year, cycle=cycle, cycle_num=cycle_num, project_type=project_type_temp, data_type=data_type, start_ts=start_ts, end_ts=end_ts, save_file_path=save_file_path, file_task_name=file_task_name, this_cycle=this_cycle, **kwargs) except Exception as e: print("zip file eroor ",e) else: month_timenstamp_list = timestamp_range(start_timestamp, end_timestamp) for s, start_ts in enumerate(month_timenstamp_list): if s < len(month_timenstamp_list) - 1: try: res_path = func_es_to_csv(project_dic[platform_type], index, doc, year=year, cycle=cycle, cycle_num=cycle_num, project_type=project_type_temp, data_type=data_type, start_ts=start_ts, end_ts=month_timenstamp_list[s + 1], save_file_path=save_file_path, file_task_name=file_task_name, this_cycle=this_cycle, **kwargs) time.sleep(2) except Exception as e: print("zip file eroor ", e) except Exception as e: print("line 769 eroor", e) # zipDir(res_path, save_file_path + "down_task/" + file_task_name) # raise FileNotFoundError("file didn't find") time.sleep(3) return zipDir(res_path, save_file_path + "down_task/" + file_task_name) def write_csv_task_to_redis(year=None, cycle="week", cycle_num=None, compare_type=None, project_type="short_video", data_type_list=["detail"], file_path=None, file_task_name=None, save_file_path=None, start_timestamp=None, end_timestamp=None, split_month=False, email_group=[], project_name=None, **kwargs): """ :param year: int 年份 可为空 :param cycle: str 周期 week/month/quarter/year :param cycle_num: int 周期 1/2/3/4/5/6 :param compare_type:默认为空 可不填 :param project_type:str 数据平台 short_video/weibo/weixin/all-time/target_relasers :param data_type_list: list 数据类型 ["detail","summary"] :param file_path: str 传入文件的路径 :param file_task_name: str 传入的任务名 :param split_month: bool 是否进行分月 只能在all-time参数下使用 :param project_name: Str 传入项目标签 :param kwargs:dict search_body = {} 自定义请求体 :return: """ now = int(datetime.datetime.now().timestamp() * 1e3) mapping_dic = { "year": year, "cycle": cycle, "cycle_num": cycle_num, "compare_type": compare_type, "project_type": project_type, "data_type_list": data_type_list, "file_path": file_path, "file_task_name": file_task_name, "save_file_path": save_file_path, "start_timestamp": start_timestamp, "end_timestamp": end_timestamp, "split_month": split_month, "email_group": email_group, "project_name": project_name, "kwargs": kwargs } for k in mapping_dic: mapping_dic[k] = json.dumps(mapping_dic[k]) file_name_msg = rds.hmset(file_task_name + "%s_csv" % now, mapping_dic) csv_task_msg = rds.rpush("csv_task", file_task_name + "%s_csv" % str(now)) res = rds.hgetall(file_task_name + "%s_csv" % now) if res: print(res) print("file_name_msg", file_name_msg) print("csv_task_msg", csv_task_msg) return True else: return False def get_task_name_from_redis(): res = rds.llen("csv_task") if res != 0: data_dic = {} one_project_name = rds.lpop("csv_task") time.sleep(3) project_data = rds.hgetall(one_project_name) rds.delete(one_project_name) for k in project_data: data_dic[k] = json.loads(project_data[k]) task_name = data_dic.get("file_task_name") print(data_dic) try: res_path = es_to_csv(year=data_dic.get("year"), cycle=data_dic.get("cycle"), cycle_num=data_dic.get("cycle_num"), compare_type=data_dic.get("compare_type"), project_type=data_dic.get("project_type"), data_type_list=data_dic.get("data_type_list"), file_path=data_dic.get("file_path"), file_task_name=data_dic.get("file_task_name"), save_file_path=data_dic.get("save_file_path"), start_timestamp=data_dic.get("start_timestamp"), end_timestamp=data_dic.get("end_timestamp"), split_month=data_dic.get("split_month"), email_group=data_dic.get("email_group"), project_name=data_dic.get("project_name"), **data_dic.get("kwargs") ) except Exception as e: print("save error ", e) mapping_dic = { "task_name": task_name, "file_path": None, "data_str": "导出任务 %s 失败" % task_name, "email_group": data_dic.get("email_group"), "email_msg_body_str": "问好:\n 导出任务 %s 失败\n 库中没有对应数据" % task_name, "title_str": "导出任务 %s 失败" % task_name, "cc_group": ["litao@csm.com.cn", "zhouyujiang@csm.com.cn", "gengdi@csm.com.cn"], "sender": data_dic.get("email_group")[0] } delete_task_form_redis(task_name, mapping_dic, "", hasdata=False) return None mapping_dic = { "task_name": task_name, "file_path": None, "data_str": "导出任务 %s 已完成" % task_name, "email_group": data_dic.get("email_group"), "email_msg_body_str": "问好:\n 导出任务 %s 已完成" % task_name, "title_str": "导出任务 %s 已完成" % task_name, "cc_group": ["litao@csm.com.cn", "liushuangdan@csm.com.cn", "gengdi@csm.com.cn"], "sender": data_dic.get("email_group")[0] } print(task_name, mapping_dic, res_path) delete_task_form_redis(task_name, mapping_dic, res_path, hasdata=True) return res_path else: return None def delete_task_form_redis(one_project_name, email_dic, csv_path, hasdata=True): print(one_project_name) rds.delete(one_project_name) dump_str = json.dumps( {"timestamp": int(datetime.datetime.now().timestamp() * 1e3), "csv_path": csv_path, "has_data": hasdata}) rds.hset("csv_task_task_dowm", one_project_name, dump_str) if email_dic: write_email_task_to_redis(**email_dic) def get_csv_task_down(): res = rds.hgetall("csv_task_task_dowm") if res: for key in res: rds.hdel("csv_task_task_dowm", key) rds.delete(key) return res else: return None def do_task(): now = datetime.datetime.now() while True: try: if rds.llen("csv_task") != 0: get_task_name_from_redis() else: now = datetime.datetime.now() print(now.strftime("%Y-%m-%d %H:%M:%S")) time.sleep(5) except Exception as e: print(e) continue # while True: # do_task() if __name__ == "__main__": # es_to_csv(year=2019, cycle="month", project_type="purchased_releasers", data_type_list=["summary"], # file_path=r"D:\work_file\发布者账号\一次性需求附件\CCR2月以来采购账号.csv", save_file_path=r"./", file_task_name="156", # ) # a = {'kwargs': {}, 'split_month': False, 'project_name': None, 'email_group': ['gengdi@csm.com.cn'], 'start_timestamp': None, 'project_type': 'purchased_releasers', 'compare_type': None, 'save_file_path': '/opt/code/dataManagementSys/dataManagementSys/dataManagementSys/static/zipfile/down_task/', 'cycle_num': None, 'file_path': '/opt/code/dataManagementSys/dataManagementSys/dataManagementSys/static/purchaseCsv/', 'year': None, 'data_type_list': ['summary'], 'file_task_name': 'is_purchase_gengdi2020412', 'cycle': 'week', 'end_timestamp': None} # # write_csv_task_to_redis(**a) # write_csv_task_to_redis(year=2019, cycle="month",cycle_num=9,project_type="target_releasers",data_type_list=["detail"],save_file_path=r"E:\test",file_task_name="1789",start_timestamp=1567267200000,end_timestamp=1572537600000,email_group=["litao@csm.com.cn",],search_body={ # "query": { # "bool": { # "filter": [ # {"term": {"releaser.keyword": "BTV财经"}} # ] # } # },"sort": [ # { # "timestamp": { # "order": "desc" # } # } # ] # }) # get_task_name_from_redis() # time.sleep(2) # print(get_csv_task_down()) ### 写入redis任务 调用 write_csv_task_to_redis() ### 查询完成状态 调用 get_csv_task_down() # print(month_num(year=2020,cycle_num=1)) # print(month_num(year=2019,cycle_num=12)) # do_task() # get_csv_task_down()r # dic = {'start_timestamp': 1577808000000, 'cycle': 'week', 'compare_type': None, 'project_type': 'all-time', 'year': None, # 'file_path': r'D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2020-04\brief2-影视综账号抖音快手.csv', # 'end_timestamp': 1585670400000, # 'save_file_path': './', # 'data_type_list': ['summary', 'detail'], 'file_task_name': '306_天津brief二影视综短视频一至三月', 'email_group': [], # 'cycle_num': None, 'project_name': None, 'split_month': True, 'kwargs': {}} # es_to_csv(**dic) do_task() # res= es_to_csv(email_group=[],project_type='weixin', # compare_type=None, save_file_path="./", cycle_num=2, # year=2020,project_name="中山", # data_type_list=['summary','detail'], file_task_name= '中山', cycle='all-time',start_timestamp=1577808000000,end_timestamp=1580486400000) # dic = {'kwargs': {'search_body': {'query': {'bool': {'filter': [], 'should': [{'match': {'project_tags.keyword': '城市媒体融合'}}], 'minimum_should_match': 1}}}}, 'split_month': False, 'project_name': None, 'email_group': ['litao@csm.com.cn'], 'start_timestamp': None, 'project_type': 'target_releasers', 'compare_type': None, 'save_file_path': './', 'cycle_num': None, 'file_path': None, 'year': None, 'data_type_list': ['detail'], 'file_task_name': 'litao202048', 'cycle': 'week', 'end_timestamp': None} # es_to_csv(**dic) # res = timestamp_range(1577808000000,1585670400000) # print(res)