# -*- coding:utf-8 -*- # @Time : 2019/9/20 17:03 # @Author : litao import elasticsearch import datetime import hashlib from elasticsearch.helpers import scan from write_data_into_es.func_get_releaser_id import * import json, copy hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) write_index = "releaser" write_type = "releasers" new_released_dic = { "favorite_count_sum": None, "video_num": None, "play_count_sum": None, "ranking_by_play_count_sum": None, "data_year": None, "releaser_id_str": None, "leader_top5_percent": None, "leader_top5_num": None, "timestamp": int(datetime.datetime.now().timestamp() * 1e3), "releaser": None, "releaserUrl": None, "comment_count_sum": None, "platform": None, "data_month": None, "stats_type": "new_released", "repost_count_sum": None, "releaser_followers_count": None, } observed_dic = { "comment_count_sum": None, "releaser_followers_count": None, "new_comment_count_sum": None, "data_year": None, "play_count_sum": None, "favorite_count_sum": None, "new_video_num": None, "data_month": None, "releaser": None, "new_favorite_count_sum": None, "timestamp": int(datetime.datetime.now().timestamp() * 1e3), "platform": None, "releaser_id_str": None, "stats_type": "observed", "ranking_by_play_count_sum": None, "repost_count_sum": None, "video_num": None, "last_fans": None, "releaserUrl": None, "fans_change": None, "new_play_count_sum": None } def week_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() now_canlendar = now.isocalendar() if not cycle_num: week_canlendar = now_canlendar else: week_canlendar = (now.year, cycle_num + 1, 0) year = week_canlendar[0] this_week = week_canlendar[1] - 1 if this_week == 1: last_year = year - 1 else: last_year = year last_week = this_week - 1 today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) # today = datetime.datetime(year=2018, month=12, day=25) first_day_in_week = today - datetime.timedelta( days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1] + 1)) fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3) last_day_in_week = first_day_in_week + datetime.timedelta(days=7) last_day_ts = int(last_day_in_week.timestamp() * 1e3) this_week_index = 'short-video-weekly' this_week_doc = 'daily-url-' + str(year) + '_w' + format(this_week, '>02d') + '_s1' last_week_index = 'releaser-weekly-short-video' last_week_doc = 'doc' if compare_type == "new_released": this_week_index = last_week_index this_week_doc = last_week_doc return this_week_index, this_week_doc, last_week_index, last_week_doc, fisrt_day_ts, last_day_ts, this_week, last_week, last_year def month_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() if not cycle_num: this_mon = now.month - 1 last_mon = this_mon - 1 if this_mon > 1 else this_mon - 1 + 12 if last_mon == 12: last_year = year - 1 else: last_year = year else: this_mon = cycle_num last_mon = cycle_num - 1 if this_mon > 1 else this_mon - 1 + 12 if last_mon == 12: last_year = year - 1 else: last_year = year if this_mon == 12: next_year = year + 1 next_month = 1 else: next_year = year next_month = this_mon + 1 first_day_ts = int(datetime.datetime(year=year, month=this_mon, day=1).timestamp() * 1e3) last_day_ts = int(datetime.datetime(year=next_year, month=next_month, day=1).timestamp() * 1e3) this_mon_index = "short-video-production-%s" % year this_mon_doc = "daily-url-%s" % ( datetime.datetime(year=next_year, month=next_month, day=1) + datetime.timedelta(days=-1)).strftime( "%Y-%m-%d") last_mon_index = "releaser" last_mon_doc = "releasers" if compare_type == "new_released": this_mon_index = last_mon_index this_mon_doc = last_mon_doc return this_mon_index, this_mon_doc, last_mon_index, last_mon_doc, first_day_ts, last_day_ts, this_mon, last_mon, last_year def quarter_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() if not cycle_num: this_quarter = int(now.month / 3) + 1 else: this_quarter = cycle_num last_quarter = this_quarter - 1 if cycle_num > 1 else 4 if last_quarter == 4: last_year = year - 1 else: last_year = year first_day_ts = int(datetime.datetime(year=year, month=(this_quarter - 1) * 3 + 1, day=1).timestamp() * 1e3) last_day_ts = int(datetime.datetime(year=year, month=this_quarter * 3 + 1, day=1).timestamp() * 1e3) this_quarter_index = "short-video-quarter-%s" % year this_quarter_doc = "daily-url-2019-Q%s" % this_quarter last_quarter_index = "releaser" last_quarter_doc = "releasers-%s-Q%s" % (last_year, last_quarter) if compare_type == "new_released": this_quarter_index = last_quarter_index this_quarter_doc = last_quarter_doc return this_quarter_index, this_quarter_doc, last_quarter_index, last_quarter_doc, first_day_ts, last_day_ts, this_quarter, last_quarter, last_year def create_body(platform, releaser, releaserUrl, year=None, cycle="week", cycle_num=None, compare_type=None,project_type=None, **kwargs): global this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, last_cycle, last_year this_cycle_index = kwargs.get("this_cycle_index") this_cycle_doc = kwargs.get("this_cycle_doc") last_cycle_index = kwargs.get("last_cycle_index") last_cycle_doc = kwargs.get("last_cycle_doc") fisrt_day_ts = kwargs.get("fisrt_day_ts") last_day_ts = kwargs.get("last_day_ts") releaser_id_str = kwargs.get("releaser_id_str") if not releaser_id_str: releaser_id = get_releaser_id(platform=platform, releaserUrl=releaserUrl) if releaser_id: releaser_id_str = platform + '_' + releaser_id else: releaser_id_str = "" one_line = {} if not this_cycle_index: if cycle == "week": this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, last_cycle, last_year = week_num( year, cycle, cycle_num, compare_type) elif cycle == "month": this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, last_cycle, last_year = month_num( year, cycle, cycle_num, compare_type) elif cycle == "quarter": this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, last_cycle, last_year = quarter_num( year, cycle, cycle_num, compare_type) elif cycle == "year": pass def get_releaser_doc_id(releaser,releaser_id_str,platform,year,month,stats_type): if year <= 2019 and month <= 7: releaser_name_md5 = hashlib.md5(releaser.encode('utf-8')).hexdigest() else: releaser_name_md5 = hashlib.md5(releaser_id_str.encode('utf-8')).hexdigest() _id = (releaser_name_md5 + '_' + platform + '_' + str(year) + '_' + (str(month) if len(str(month)) == 2 else str(0) + str(month)) + stats_type) return _id def if_exists_in_releasers(last_cycle_index,last_cycle_doc,doc_id): pass def write_releasers_to_es(releaserUrl, platform, releaser,year, month): releaser_id = get_releaser_id(platform=platform, releaserUrl=releaserUrl) this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, last_cycle, last_year = month_num( year, "month", month) #print(this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle, last_cycle, last_year) bulk_all_body = "" count_true = 0 releaser_id_str = "" if releaser_id: releaser_id_str = platform + "_" + releaser_id _id_new = get_releaser_doc_id(releaser,releaser_id_str,platform,year,month,"new_released") _id_ob = get_releaser_doc_id(releaser,releaser_id_str,platform,year,month,"observed") if_exists_new = es.exists(last_cycle_index, last_cycle_doc, _id_new) if_exists_ob = es.exists(last_cycle_index, last_cycle_doc, _id_ob) if if_exists_new: exists_dic_new = es.get(last_cycle_index, last_cycle_doc,_id_new) if if_exists_ob: exists_dic_ob = es.get(last_cycle_index, last_cycle_doc,_id_ob) bulk_body_new = copy.deepcopy(new_released_dic) bulk_body_ob = copy.deepcopy(observed_dic) search_body_new = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, #{"term": {"releaser.keyword": releaser}}, {"term": {"releaser_id_str": releaser_id_str}}, {"range": {"release_time": {"gte": fisrt_day_ts, "lt": last_day_ts}}}, {"range": {"duration": {"lte": 600}}} ] } }, "aggs": { "sum_play": { "sum": { "field": "play_count" } }, "sum_favorite": { "sum": { "field": "favorite_count" } }, "sum_comment": { "sum": { "field": "comment_count" } }, "sum_repost": { "sum": { "field": "repost_count" } } }, "size": 0 } search_body_ob = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, # {"term": {"releaser.keyword": releaser}}, {"term": {"releaser_id_str": releaser_id_str}}, {"range": {"duration": {"lte": 600}}} ], "must": [{"exists": {"field": "monthly_net_inc_play_count"}}] } }, "aggs": { "sum_play": { "sum": { "field": "monthly_net_inc_play_count" } }, "sum_favorite": { "sum": { "field": "monthly_net_inc_favorite_count" } }, "sum_comment": { "sum": { "field": "monthly_net_inc_comment_count" } }, "sum_repost": { "sum": { "field": "monthly_net_inc_repost_count" } } }, "size": 0 } res_new = es.search(index=this_cycle_index, doc_type=this_cycle_doc, body=search_body_new) video_num = res_new['hits']['total'] play_count_sum = res_new['aggregations']['sum_play']['value'] favorite_count_sum = res_new['aggregations']['sum_favorite']['value'] comment_count_sum = res_new['aggregations']['sum_comment']['value'] repost_count_sum = res_new['aggregations']['sum_repost']['value'] res_ob = es.search(index=this_cycle_index, doc_type=this_cycle_doc, body=search_body_ob) video_num_ob = res_ob['hits']['total'] play_count_sum_ob = res_ob['aggregations']['sum_play']['value'] favorite_count_sum_ob = res_ob['aggregations']['sum_favorite']['value'] comment_count_sum_ob = res_ob['aggregations']['sum_comment']['value'] repost_count_sum_ob = res_ob['aggregations']['sum_repost']['value'] if if_exists_new: if exists_dic_new["_source"]["video_num"] == video_num and exists_dic_new["_source"]["play_count_sum"] == play_count_sum: if_exists_new = True else: if_exists_new = False if if_exists_ob: if exists_dic_ob["_source"]["video_num"] >= video_num_ob and exists_dic_ob["_source"]["play_count_sum"] >= play_count_sum_ob: if_exists_ob = True else: if_exists_ob = False if platform == "抖音": play_count_sum = favorite_count_sum play_count_sum_ob = favorite_count_sum_ob for stats_type in ("new_released","observed"): if stats_type == "new_released" and if_exists_new and if_exists_ob: continue if stats_type == "observed" and if_exists_ob: continue search_rank_body = { "query": { "bool": { "filter": [ {"term": {"data_year": year}}, {"term": {"data_month": month}}, {"term": {"platform.keyword": platform}}, {"term": {"stats_type.keyword": stats_type}}, {"range": {"play_count_sum": {"gt": play_count_sum if stats_type=="new_released" else play_count_sum_ob}}} ] } }, "sort": [ { "ranking_by_play_count_sum": { "order": "desc" } }],"size":1 } rank_res = es.search(index=write_index, doc_type=write_type, body=search_rank_body) try: rank_count = int(rank_res['hits']['hits'][0]["_source"]["ranking_by_play_count_sum"]) + 1 except: rank_count = 1 if stats_type == "new_released": bulk_body_new["ranking_by_play_count_sum"] = rank_count else: bulk_body_ob["ranking_by_play_count_sum"] = rank_count followers_short_body = { "query": { "bool": { "filter": [ {"term": {"releaser_id_str.keyword": releaser_id_str}}, {"range": {"fetch_time": {"gte": None}}} ] } }, "sort": [ { "fetch_time": { "order": "asc" } } ], "size": 1 } if stats_type == "new_released": followers_short_body["query"]["bool"]["filter"][1]["range"]["fetch_time"]["gte"] = fisrt_day_ts else: followers_short_body["query"]["bool"]["filter"][1]["range"]["fetch_time"]["gte"] = last_day_ts try: follers_res = es.search(index="releaser_fans",doc_type="doc",body=followers_short_body) releaser_followers_count = follers_res['hits']['hits'][0]["_source"]["releaser_followers_count"] releaser_followers_count_time = datetime.datetime.fromtimestamp(follers_res['hits']['hits'][0]["_source"]["fetch_time"]/1e3).strftime('%Y-%m-%d %H:%M:%S') except: releaser_followers_count = 0 releaser_followers_count_time = "" if stats_type == "new_released": bulk_body_new["releaser_followers_count"] = releaser_followers_count last_fans = copy.deepcopy(releaser_followers_count) bulk_body_new["releaser_followers_count_time"] = releaser_followers_count_time else: bulk_body_ob["releaser_followers_count"] = releaser_followers_count bulk_body_ob["releaser_followers_count_time"] = releaser_followers_count_time bulk_body_ob["last_fans"] = last_fans fans_change = releaser_followers_count - last_fans if fans_change >=0: bulk_body_ob["fans_change"]= fans_change else: bulk_body_ob["fans_change"] = 0 bulk_body_new.update({ "favorite_count_sum": favorite_count_sum, "video_num": video_num, "play_count_sum": play_count_sum, "data_year": year, "releaser_id_str": releaser_id_str, "timestamp": int(datetime.datetime.now().timestamp() * 1e3), "releaser": releaser, "releaserUrl": releaserUrl, "comment_count_sum": comment_count_sum, "platform": platform, "data_month": month, "repost_count_sum": repost_count_sum, }) if stats_type == "observed": bulk_body_ob.update( { "favorite_count_sum": favorite_count_sum_ob, "video_num": video_num_ob, "play_count_sum": play_count_sum_ob, "data_year": year, "releaser_id_str": releaser_id_str, "timestamp": int(datetime.datetime.now().timestamp() * 1e3), "releaser": releaser, "releaserUrl": releaserUrl, "comment_count_sum": comment_count_sum_ob, "platform": platform, "data_month": month, "repost_count_sum": repost_count_sum_ob, "new_comment_count_sum": comment_count_sum, "new_favorite_count_sum": favorite_count_sum, "new_play_count_sum": play_count_sum, "new_video_num":video_num } ) if not if_exists_new: bulk_head = '{"index": {"_id":"%s"}}' % _id_new bulk_body = json.dumps(bulk_body_new, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + bulk_body + '\n' bulk_all_body += bulk_one_body count_true += 1 if not if_exists_ob: bulk_head = '{"index": {"_id":"%s"}}' % _id_ob bulk_body = json.dumps(bulk_body_ob, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + bulk_body + '\n' bulk_all_body += bulk_one_body count_true += 1 if bulk_all_body != '': eror_dic = es.bulk(index=write_index, doc_type=write_type, body=bulk_all_body, request_timeout=200) if eror_dic['errors'] is True: count_false = 1 print(eror_dic) print(count_true) if __name__ == "__main__": year = 2019 month = 10 #file = r'D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2019-11\短视频三农111(1).csv' #file = r'D:\work_file\发布者账号\融媒账号列表\电视新闻媒体栏目、账号与APP汇总(仅省级台)对照表1009(7、8月使用)(暂含需剔除账号信息).csv' #file = r'D:\work_file\发布者账号\融媒账号列表\【7月使用】电视新闻(仅省级台)对照表(暂含需剔除账号信息)1010 的副本.csv' file = r'D:\work_file\5月补数据.csv' # file = r'1572537600000' with open(file, 'r', encoding="gb18030")as f: header_Lst = f.readline().strip().split(',') for line in f: line_Lst = line.strip().split(',') line_dict = dict(zip(header_Lst, line_Lst)) releaserUrl = line_dict['releaserUrl'] platform = line_dict['platform'] # if platform != "short_video": # continue # platform = line_dict['releaser_platform'] releaser = line_dict.get("releaser") write_releasers_to_es(releaserUrl, platform, releaser,year, month)