import datetime import copy import pymysql import elasticsearch from elasticsearch.helpers import scan from func_find_week_num import find_first_day_for_given_start_weekday from choice_es_doc_type import func_choice_doc_type from mysql_tool import func_write_into_mysql_with_unique hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) sql_ip = '192.168.16.11' sql_port = 3306 sql_user = 'data_user' sql_passwd = 'vip@123456' database = 'short_video' db = pymysql.connect(host=sql_ip, user=sql_user, password=sql_passwd, db=database, port=sql_port, charset='utf8mb4') def week_num(year=None, cycle=None, cycle_num=None, compare_type=None): now = datetime.datetime.now() now_canlendar = now.isocalendar() if not cycle_num: week_canlendar = now_canlendar else: week_canlendar = (now.year, cycle_num + 1, 0) year = week_canlendar[0] this_week = week_canlendar[1] - 1 if this_week == 1: last_year = year - 1 else: last_year = year last_week = this_week - 1 today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day) # today = datetime.datetime(year=2018, month=12, day=25) first_day_in_week = today - datetime.timedelta( days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1] + 1)) fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3) last_day_in_week = first_day_in_week + datetime.timedelta(days=7) last_day_ts = int(last_day_in_week.timestamp() * 1e3) this_week_index = 'short-video-weekly' this_week_doc = 'daily-url-' + str(year) + '_w' + format(this_week, '>02d') + '_s1' last_week_index = 'releaser-weekly-short-video' last_week_doc = 'doc' if compare_type == "new_released": this_week_index = last_week_index this_week_doc = last_week_doc return this_week_index, this_week_doc, last_week_index, last_week_doc, fisrt_day_ts, last_day_ts, this_week, last_week, last_year class BigCustomer: def __init__(self, **kwargs): self.st_time = 0 self.et_time = 0 self.date_str = '' self.cal_release_time() self.detail_index = '' self.detail_doc_type = '' self.agg_index = '' self.agg_doc_type = '' self.data_month = '' self.data_week = '' self.data_year = '' self.data_day = '' self.fetch_day = None self.fetch_month = None self.fetch_year = None self.fh_time_et = 0 self.fh_time_st = 0 self.releaser_id_list = [] self.search_agg_body = '' self.search_detail_body = '' self.detail_table = None self.agg_table = None @staticmethod def day_by_week_info(week_year, week_no, week_day, week_day_start=1): # find first weekday = week_day_start day1D = find_first_day_for_given_start_weekday(week_year, week_day_start) curr_dayD = day1D + datetime.timedelta(days=(week_no - 1) * 7 + week_day - 1) return curr_dayD def cal_release_time(self): if self.data_day: release_st_ = datetime.datetime(self.data_year, self.data_month, self.data_day) release_et = release_st_ + datetime.timedelta(1) release_st = release_st_ - datetime.timedelta(15) release_st_ts = int(release_st.timestamp()*1000) release_et_ts = int(release_et.timestamp()*1000) fhtech_st = release_st_ + datetime.timedelta(1) fhtech_et = fhtech_st + datetime.timedelta(1) fhtech_st_ts = int(fhtech_st.timestamp()*1000) fhtech_et_ts = int(fhtech_et.timestamp() * 1000) self.fh_time_st = fhtech_st_ts self.fh_time_et = fhtech_et_ts self.st_time = release_st_ts self.et_time = release_et_ts self.fetch_day = fhtech_st.day self.fetch_month = fhtech_st.month self.fetch_year = fhtech_st.year self.date_str = str(release_st)[0:10] if self.data_week: release_time_start_dt = self.day_by_week_info(self.data_year, self.data_week, 1, 1) - datetime.timedelta( days=1) release_time_end_dt = (self.day_by_week_info(self.data_year, self.data_week, 7, 1)) release_time_start_dt = datetime.datetime(release_time_start_dt.year, release_time_start_dt.month, release_time_start_dt.day) release_time_end_dt = datetime.datetime(release_time_end_dt.year, release_time_end_dt.month, release_time_end_dt.day) release_st_ts = int(release_time_start_dt.timestamp() * 1000) release_et_ts = int(release_time_end_dt.timestamp() * 1000) self.st_time = release_st_ts self.et_time = release_et_ts def choice_doc_type(self, systematic, data_year, data_month, data_day, data_week): if data_week: self.detail_index, self.agg_index = func_choice_doc_type(data_year=data_year, data_month=data_month, data_week=data_week, systematic=systematic, style='weekly', choice_index=True) if systematic == 'short_video': self.detail_table = 'short_video_detail_weekly' self.agg_table = 'short_video_agg_weekly' if systematic == 'weixin': self.detail_table = 'weixin_detail_weekly' self.agg_table = 'weixin_agg_weekly' self.detail_doc_type, self.agg_doc_type = func_choice_doc_type(data_year=data_year, data_month=data_month, data_week=data_week, systematic=systematic, style='weekly', choice_index=False) if data_day: self.detail_index, self.agg_index = func_choice_doc_type(data_year=data_year, data_month=data_month, data_week=data_week, systematic=systematic, style='daily', choice_index=True) self.detail_doc_type, self.agg_doc_type = func_choice_doc_type(data_year=data_year, data_month=data_month, data_week=data_week, systematic=systematic, style='daily', choice_index=False) if systematic == 'short_video': self.detail_table = 'short_video_detail_daily' if systematic == 'weixin': self.detail_table = 'weixin_detail_daily' print(self.detail_index, self.detail_doc_type) print(self.agg_index, self.agg_doc_type) @staticmethod def scan_data(search_body, index, doc_type): scan_re = scan(client=es, index=index, doc_type=doc_type, query=search_body) return scan_re @staticmethod def turn_dict(line_dict): try: release_time_str = str(datetime.datetime.fromtimestamp(line_dict['release_time']/1000))[0:10] line_dict['release_time_str'] = release_time_str except: pass try: line_dict['title'] = line_dict['title'].replace('"', '“') line_dict['title'] = line_dict['title'].replace("'", "‘") except: pass return line_dict def write_into_mysql(self, scan_re, tablename): write_list = [] count = 0 for one in scan_re: count = count + 1 if count % 1000 == 0: print(count) line = one['_source'] es_id = one['_id'] line['es_id'] = es_id re_dt = datetime.datetime.fromtimestamp(line['release_time']/1000) line['data_week'] = self.data_week line['release_year'] = re_dt.year line['release_day'] = re_dt.day line['release_month'] = re_dt.month line['fetch_year'] = self.fetch_year line['fetch_day'] = self.fetch_day line['fetch_month'] = self.fetch_month not_pop_list = ["es_id", "releaser", "comment_count", "favorite_count", "play_count", "release_month", "duration", "release_year", "release_day", "title", "url", "releaserUrl", "releaser_id_str", "platform", "release_time", "fetch_time", "release_time_str", "data_week", "repost_count", "channel","releaser_followers_count", "fetch_day", "fetch_month", "fetch_year"] tmp_line = copy.deepcopy(line) for one_filed in tmp_line: if one_filed not in not_pop_list: line.pop(one_filed, None) line = self.turn_dict(line) write_list.append(line) func_write_into_mysql_with_unique(db=db, tablename=tablename, log_file='', data_dict_list=write_list) def search_releaser_id_list(self, systematic): search_releaser = { "query": { "bool": { "should": [ {"match": {"project_tags.keyword": "央视三农"}}, {"match": {"project_tags.keyword": "大客户部"}}, {"match": {"project_tags.keyword": "星光大道"}} ], "minimum_should_match": 1, "filter": [] } } } if systematic == 'short_video': platform_dict = {"terms": {"platform.keyword": ["toutiao", "抖音", "腾讯新闻", "miaopai", "腾讯视频", "new_tudou", "haokan", "kwai"]}} if systematic == 'weixin': platform_dict = {"terms": {"platform.keyword": ["weixin"]}} search_releaser["query"]["bool"]["filter"].append(platform_dict) scan_re = scan(client=es, index='target_releasers', query=search_releaser) releaser_id_list = [] for one in scan_re: line = one['_source'] releaser_id_str = line['releaser_id_str'] releaser_id_list.append(releaser_id_str) self.releaser_id_list = releaser_id_list class BigCustomerShortVideo(BigCustomer): def __init__(self, systematic, data_month, data_year, data_week=None, data_day=None): self.data_month = data_month self.systematic = systematic self.data_year = data_year self.data_week = data_week self.data_day = data_day self.search_agg_body = None self.search_detail_body = None self.detail_index = '' self.detail_doc_type = '' self.agg_index = '' self.agg_doc_type = '' self.sql_table = '' self.choice_doc_type(systematic=self.systematic, data_year=self.data_year, data_month=self.data_month, data_week=self.data_week, data_day=self.data_day) def construction_search_body(self): self.search_releaser_id_list(self.systematic) self.cal_release_time() search_body = { "query": { "bool": { "filter": [ {"terms": {"releaser_id_str": self.releaser_id_list}} ] } } } if self.data_day: search_detail_body = copy.deepcopy(search_body) if self.systematic == 'short_video': extend_list = [ {"range": {"release_time": {"gte": self.st_time, "lt": self.et_time}}}, {"range": {"fetch_time": {"gte": self.fh_time_st, "lt": self.fh_time_et}}} ] if self.systematic == 'weixin': extend_list = [ {"range": {"fetch_time": {"gte": self.fh_time_st, "lt": self.fh_time_et}}} ] search_detail_body["query"]["bool"]["filter"].extend(extend_list) if self.data_week: search_detail_body = copy.deepcopy(search_body) append_dict = {"range": {"release_time": {"gte": self.st_time, "lt": self.et_time}}} search_detail_body["query"]["bool"]["filter"].append(append_dict) # 汇总数据 search_agg_body = copy.deepcopy(search_body) extend_list = [ {"term": {"data_week_year": self.data_year}}, {"term": {"data_week_num": self.data_week}}, {"term": {"stats_type.keyword": "new_released"}} ] search_agg_body["query"]["bool"]["filter"].extend(extend_list) self.search_agg_body = search_agg_body self.search_detail_body = search_detail_body class BigCustomerWeiXin(BigCustomer): def __init__(self, systematic, data_month, data_year, data_week=None, data_day=None): self.data_month = data_month self.systematic = systematic self.data_year = data_year self.data_week = data_week self.data_day = data_day self.search_agg_body = '' self.search_detail_body = '' self.detail_index = '' self.detail_doc_type = '' self.agg_index = '' self.agg_doc_type = '' self.sql_table = '' self.choice_doc_type(systematic=self.systematic, data_year=self.data_year, data_month=self.data_month, data_week=self.data_week, data_day=self.data_day) def construction_search_body(self): self.search_releaser_id_list(self.systematic) self.cal_release_time() search_body = { "query": { "bool": { "filter": [ {"terms": {"releaser_id_str": self.releaser_id_list}} ] } } } if self.data_day: search_detail_body = copy.deepcopy(search_body) extend_list = [ {"range": {"release_time": {"gte": self.st_time, "lt": self.et_time}}} ] search_detail_body["query"]["bool"]["filter"].extend(extend_list) if self.data_week: search_detail_body = copy.deepcopy(search_body) append_dict = {"range": {"release_time": {"gte": self.st_time, "lt": self.et_time}}} search_detail_body["query"]["bool"]["filter"].append(append_dict) # 汇总数据 search_agg_body = copy.deepcopy(search_body) extend_list = [ {"term": {"data_month": self.data_month}}, {"term": {"data_year": self.data_year}}, {"term": {"stats_type.keyword": "new_released"}} ] search_agg_body["query"]["bool"]["filter"].extend(extend_list) self.search_agg_body = search_agg_body self.search_detail_body = search_detail_body if __name__ == '__main__': # 短视频 每天详细数据写入 short_video = BigCustomerShortVideo(systematic='short_video', data_day=20, data_year=2019, data_month=9, data_week=None) short_video.construction_search_body() scan_detail = short_video.scan_data(index=short_video.detail_index, doc_type=short_video.detail_doc_type, search_body=short_video.search_detail_body) short_video.write_into_mysql(scan_detail, short_video.detail_table) # 短视频 周数据 详细数据和汇总数据写入 # short_video = BigCustomerShortVideo(systematic='short_video', data_year=2019, # data_month=9, data_week=36) # 详细数据 # short_video.construction_search_body() # scan_detail = short_video.scan_data(index=short_video.detail_index, doc_type=short_video.detail_doc_type, # search_body=short_video.search_detail_body) # short_video.write_into_mysql(scan_detail, short_video.detail_table) # scan_agg = short_video.scan_data(index=short_video.agg_index, doc_type=short_video.agg_doc_type, # search_body=short_video.search_agg_body) # short_video.write_into_mysql(scan_agg, short_video.agg_table) # 微信数据写入 # 日详细数据写入 # weixin = BigCustomerWeiXin(systematic='weixin', data_day=20, data_year=2019, # data_month=9, data_week=None) # # weixin.construction_search_body() # scan_detail = weixin.scan_data(index=weixin.detail_index, doc_type=weixin.detail_doc_type, # search_body=weixin.search_detail_body) # weixin.write_into_mysql(scan_detail, tablename=weixin.detail_table) # 周详细数据写入 # weixin = BigCustomerWeiXin(systematic='weixin', data_day=None, data_year=2019, # data_month=9, data_week=36) # # weixin.construction_search_body() # scan_detail = weixin.scan_data(index=weixin.detail_index, doc_type=weixin.detail_doc_type, # search_body=weixin.search_detail_body) # weixin.write_into_mysql(scan_detail, tablename=weixin.detail_table)