# -*- coding: utf-8 -*- """ Created on Mon Sep 25 14:34:45 2017 calculate monthly net increase values for daily-url data 1 if a video is released after 00:00 of the first day of each month, the accumulated values is used as monthly_net_inc values directly. 2 if a video is released before 00:00 of the first day of each month, the current accumulated values will be substracted by the accumulated values at the day before the first day of each month. 2.1 if a video is released before the first day of a month, but it's historical data is not aviable migrate to be a function on Oct 03 2017 Nov 18 2017 Add multi-threads. Auto distribute data by release_time into threads. Modify release_time range to be from 60 days before the passed-in date. The reason for doing this is, data from earlier time have tiny chance to found historical data for net increase values calculation. This lead to more search time, which slows down the whole calculation process. Jul 16 2018 Refactoring. Breaking change: Function cal_monthly_net_inc's argument search_body changed to be a search_body_logic_part, which is part of a complete search body, not a json body that can be used directly in search. @author: hanye """ import json import copy import datetime import logging import threading from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import _thread from func_calculate_toutiao_video_id import calculate_toutiao_video_id from func_calculate_newTudou_video_id import calculate_newTudou_video_id hosts = '192.168.17.11' port = 9200 es_read = Elasticsearch(hosts=hosts, port=port) es_write = Elasticsearch(hosts=hosts, port=port) index_sv = 'short-video-production' doc_type_daily_url = 'daily-url' def find_first_day_of_the_month(fetch_year, fetch_month, fetch_day): if fetch_day > 1: first_day_of_the_month_T = datetime.datetime(year=fetch_year, month=fetch_month, day=1) elif fetch_day == 1: if fetch_month > 1: first_day_of_the_month_T = datetime.datetime(year=fetch_year, month=fetch_month-1, day=1) elif fetch_month == 1: first_day_of_the_month_T = datetime.datetime(year=fetch_year-1, month=12, day=1) return first_day_of_the_month_T def form_bulk_body(line_data, doc_id): # 3.1 update line_data dict # added update timestamp when calculate monthly_net_inc values on Oct 16 2017 line_data.update({'timestamp': int(datetime.datetime.now().timestamp()*1e3)}) # 3.2 form json body for each line action_json_line_str = '{"index": {"_id":"' + doc_id + '"}}' line_data.pop('_id', None) data_json_line_str = json.dumps(line_data, ensure_ascii=False) line_bulk_body = (action_json_line_str + '\n' + data_json_line_str + '\n') return line_bulk_body def bulk_write(thread_id, logger, data_dict_Lst, doc_type_target, index=index_sv): bulk_body = '' for data_dict in data_dict_Lst: doc_id = data_dict['_id'] line_bulk_body = form_bulk_body(data_dict, doc_id) bulk_body += line_bulk_body t_s = datetime.datetime.now() es_write.bulk(body=bulk_body, index=index, doc_type=doc_type_target, request_timeout=100) logger.info('[ %d ] update %s with %d docs.' % (thread_id, doc_type_target, len(data_dict_Lst))) t_e = datetime.datetime.now() t_delta = t_e-t_s logger.info('[ %d ] bulk write for one data batch time spent %s' % (thread_id, t_delta)) # 3.5 clear data_raw_collector after bulk write data_dict_Lst.clear() def form_search_body_hist_uncomp(platform, fetch_time_upper_boundary_ts, fetch_time_lower_boundary_ts, url): if platform in ['toutiao', 'new_tudou']: videoID = cal_vid_from_url(platform, url) search_body_hist_uncomp = { "query": { "bool": { "must": [ {"range": {"fetch_time": { "gte": fetch_time_lower_boundary_ts, "lt": fetch_time_upper_boundary_ts,}}}, {"match_phrase": {"url": videoID}}, {"exists": {"field": "play_count"}} ], } }, "sort": [ {"fetch_time": {"order": "asc"}} ] } else: search_body_hist_uncomp = { "query": { "bool": { "filter": [ {"range": {"fetch_time": { "gte": fetch_time_lower_boundary_ts, "lt": fetch_time_upper_boundary_ts,}}}, {"term": {"url.keyword": url}} ], "must": [ {"exists": {"field": "play_count"}} ], } }, "sort": [ {"fetch_time": {"order": "asc"}} ] } return search_body_hist_uncomp def cal_vid_from_url(platform, url): if platform == 'toutiao': videoID = calculate_toutiao_video_id(url) elif platform == 'new_tudou': videoID = calculate_newTudou_video_id(url) else: videoID = url return videoID def get_key_field_values(logger, thread_id, line_data_dict): try: platform = line_data_dict['platform'] line_fetch_time_ts = line_data_dict['fetch_time'] url = line_data_dict['url'] play_count_acc = line_data_dict['play_count'] return (platform, line_fetch_time_ts, url, play_count_acc) except KeyError: logger.info('[%s] lack of field "platform" or "fetch_time" or "url", ' 'function find_historical returns.' % thread_id) return None def update_MNI_value_for_new_released(logger, thread_id, line_data_dict): key_fields = get_key_field_values(logger, thread_id, line_data_dict) if key_fields is None: return None else: play_count_acc = key_fields[3] monthly_net_inc_play_count = play_count_acc monthly_cal_base = 'accumulated_values' if 'comment_count' in line_data_dict: comment_count_acc = line_data_dict['comment_count'] monthly_net_inc_comment_count = comment_count_acc else: monthly_net_inc_comment_count = 0 if 'favorite_count' in line_data_dict: favorite_count_acc = line_data_dict['favorite_count'] monthly_net_inc_favorite_count = favorite_count_acc else: monthly_net_inc_favorite_count = 0 line_data_dict.update({'monthly_cal_base': monthly_cal_base, 'monthly_net_inc_play_count': monthly_net_inc_play_count, 'monthly_net_inc_comment_count': monthly_net_inc_comment_count, 'monthly_net_inc_favorite_count': monthly_net_inc_favorite_count}) return line_data_dict def find_historical(logger, stdout_lock, thread_id, first_day_of_the_month_T, line_data_dict, f_err_log): key_fields = get_key_field_values(logger, thread_id, line_data_dict) if key_fields is None: return None else: platform = key_fields[0] line_fetch_time_ts = key_fields[1] url = key_fields[2] play_count_acc = key_fields[3] zero_oclock_on_first_day_T = datetime.datetime(first_day_of_the_month_T.year, first_day_of_the_month_T.month, first_day_of_the_month_T.day) twenty_four_oclock_on_first_day_T = (zero_oclock_on_first_day_T + datetime.timedelta(seconds=24*3600)) twenty_four_oclock_on_first_day_ts = int(twenty_four_oclock_on_first_day_T.timestamp()*1e3) fetch_time_lower_boundary_ts = int(zero_oclock_on_first_day_T.timestamp()*1e3) fetch_time_upper_boundary_ts = line_fetch_time_ts search_body_hist_uncomp = form_search_body_hist_uncomp( platform, fetch_time_upper_boundary_ts, fetch_time_lower_boundary_ts, url) search_resp = es_read.search(index=index_sv, doc_type=doc_type_daily_url, body=search_body_hist_uncomp, size=10, request_timeout=100) hist_data_raw_collector = search_resp['hits']['hits'] search_resp_total = search_resp['hits']['total'] if search_resp_total >= 1: earliest_hist_data = hist_data_raw_collector[0]['_source'] hist_data_id = hist_data_raw_collector[0]['_id'] earliest_hist_data_fetch_time = earliest_hist_data['fetch_time'] play_count_acc_history = int(earliest_hist_data['play_count']) monthly_net_inc_play_count = play_count_acc - play_count_acc_history monthly_cal_base_fetch_time = earliest_hist_data_fetch_time if earliest_hist_data_fetch_time <= twenty_four_oclock_on_first_day_ts: monthly_cal_base = 'historical_complete' else: monthly_cal_base = 'historical_uncomplete' if 'comment_count' in earliest_hist_data and 'comment_count' in line_data_dict: comment_count_acc = line_data_dict['comment_count'] comment_count_acc_history = int(earliest_hist_data['comment_count']) monthly_net_inc_comment_count = comment_count_acc - comment_count_acc_history else: monthly_net_inc_comment_count = 0 if 'favorite_count' in earliest_hist_data and 'favorite_count' in line_data_dict: favorite_count_acc = line_data_dict['favorite_count'] favorite_count_acc_history = int(earliest_hist_data['favorite_count']) monthly_net_inc_favorite_count = favorite_count_acc - favorite_count_acc_history else: monthly_net_inc_favorite_count = 0 line_data_dict.update({'monthly_cal_base': monthly_cal_base, 'monthly_net_inc_play_count': monthly_net_inc_play_count, 'monthly_cal_base_fetch_time': monthly_cal_base_fetch_time, 'MNI_hist_data_id': hist_data_id, 'monthly_net_inc_comment_count': monthly_net_inc_comment_count, 'monthly_net_inc_favorite_count': monthly_net_inc_favorite_count}) # if there are no hits, leave the monthly_net_inc fields empty, and # tag monthly_cal_base as historical_data_absent else: with stdout_lock: print('[', thread_id, ']', url, 'found no historical data in present survey date range', datetime.datetime.fromtimestamp(fetch_time_lower_boundary_ts/1e3), 'to', datetime.datetime.fromtimestamp(fetch_time_upper_boundary_ts/1e3), file=f_err_log) monthly_cal_base = 'historical_data_absent' line_data_dict.update({'monthly_cal_base': monthly_cal_base,}) return line_data_dict # 1.2 define sub-function to run in thread def cal_monthly_net_inc_and_write_es_in_thread( thread_id, logger, es_read, index, doc_type_target, search_body_in_thread, first_day_of_the_month_T, stdout_lock, f_err_log): logger.info('[ %d ] search_body_in_thread %s' % (thread_id, str(search_body_in_thread))) find_hits_total_in_threads = es_read.search(index=index, doc_type=doc_type_target, body=search_body_in_thread, size=0, request_timeout=100) hits_total_in_threads = find_hits_total_in_threads['hits']['total'] logger.info('[ %d ] hits_total_in_threads %d' % (thread_id, hits_total_in_threads)) scan_response = scan(client=es_read, query=search_body_in_thread, index=index, doc_type=doc_type_target, scroll='5m', size=1000, request_timeout=100) data_raw_collector = [] line_counter = 0 for line in scan_response: line_counter += 1 raw_line = line line_doc_id = raw_line['_id'] line_data = raw_line['_source'] release_time = int(line_data['release_time']) release_time_T = datetime.datetime.fromtimestamp(int(release_time/1e3)) if release_time_T >= first_day_of_the_month_T: line_data = update_MNI_value_for_new_released(logger, thread_id, line_data) else: line_data = find_historical(logger, stdout_lock, thread_id, first_day_of_the_month_T, line_data, f_err_log) line_data['_id'] = line_doc_id data_raw_collector.append(line_data) # update monthly_net_inc values every 1000 docs if line_counter%1000 == 0 or line_counter == hits_total_in_threads: logger.info('[ %d ] processing %d / %d' % (thread_id, line_counter, hits_total_in_threads)) bulk_write(thread_id, logger, data_raw_collector, doc_type_target, index=index_sv) data_raw_collector.clear() if data_raw_collector != []: logger.info('[ %d ] processing %d / %d' % (thread_id, line_counter, hits_total_in_threads)) bulk_write(thread_id, logger, data_raw_collector, doc_type_target, index=index_sv) data_raw_collector.clear() logger.info('[ %d ] Thread exits. %s' % (thread_id, datetime.datetime.now())) def divid_by_release_time(logger, threads_num, average_data_num, data_distr_by_release_time_Lst): # 1.1 By total hits and data distribution by release_time # find proper release_time range for multi-threads release_time_range_Lst = [] # find the end side of each range segment data_counter_collector = 0 distr_idx = 0 for distr_by_releaseT in data_distr_by_release_time_Lst: data_num_each_day = distr_by_releaseT['doc_count'] data_counter_collector += data_num_each_day if (data_counter_collector > average_data_num*0.9 or distr_idx == len(data_distr_by_release_time_Lst)-1): release_time_range_dict = {'start':None, 'end':None, 'end_idx': distr_idx, 'data_num': data_counter_collector} release_time_range_Lst.append(release_time_range_dict) data_counter_collector = 0 distr_idx += 1 # fillup the start timestamp and the end timestamp start_side_cache = data_distr_by_release_time_Lst[0]['key'] for range_seg in release_time_range_Lst: if range_seg['end_idx']+1 > len(data_distr_by_release_time_Lst)-1: range_seg['end'] = int(data_distr_by_release_time_Lst[range_seg['end_idx']]['key'] + 24 * 3600 * 1e3) else: range_seg['end'] = data_distr_by_release_time_Lst[range_seg['end_idx']+1]['key'] range_seg['start'] = start_side_cache start_side_cache = range_seg['end'] # in case the splitted range segments are longer or shorter than threads_num if len(release_time_range_Lst) != threads_num: threads_num = len(release_time_range_Lst) logger.info('Actual threads_num %d' % threads_num) logger.info('release_time_range_Lst:\n %s' % str(release_time_range_Lst)) return (threads_num, release_time_range_Lst) def cal_monthly_net_inc(fetch_year, fetch_month, fetch_day, fetch_hour=0, fetch_time_seg_hours=24, doc_type_target='daily-url', search_body_logic_part=None, threads_num=5, logger_name='calculate_monthly_net_inc'): date_passed_in = datetime.datetime(year=fetch_year, month=fetch_month, day=fetch_day, hour=fetch_hour) first_day_of_the_month_T = find_first_day_of_the_month(fetch_year, fetch_month, fetch_day) logger = logging.getLogger('%s.func' % logger_name) log_path = ('/home/hanye/project_data/Python/Projects/proj-short-videos/' 'write-data-into-es/log/') logger.info('************ log starts') logger.info('calculate monthly net_inc values') err_log_filename = ('calculate_monthly_net_inc_for_' + str(fetch_year) + '_' + str(fetch_month) + '_' + str(fetch_day) + '_error_log'+ '_' + str(datetime.datetime.now())[0:10]) f_err_log = open(log_path+err_log_filename, 'a', encoding='gb18030') t_s_cal = datetime.datetime.now() # 1 get all the data from ccr es for given fetch_time # from _type=daily-url fetch_time_start_ts = int(date_passed_in.timestamp()*1e3) fetch_time_end_ts = int((date_passed_in + datetime.timedelta(seconds=fetch_time_seg_hours*3600) ).timestamp()*1e3) fetch_time_start_iso = datetime.datetime.fromtimestamp( int(fetch_time_start_ts/1e3)).isoformat() fetch_time_end_iso = datetime.datetime.fromtimestamp( int(fetch_time_end_ts/1e3)).isoformat() logger.info(('fetch_time_start: %s, fetch_time_end: %s' % (fetch_time_start_iso, fetch_time_end_iso))) # Modify release_time range to be from 90 days before the passed-in date # Nov 18 2017 release_time_start_ts = int((date_passed_in - datetime.timedelta(days=60)) .timestamp()*1000) release_time_end_ts = int((date_passed_in + datetime.timedelta(days=365)) .timestamp()*1000) search_body = { "query": { "bool": { "filter": [ {"range": {"release_time": { "gte": release_time_start_ts, "lt": release_time_end_ts}} }, {"range": {"fetch_time": { "gte": fetch_time_start_ts, "lt": fetch_time_end_ts}} } ], } }, "size": 2, "aggs": { "release_time_distribution": { "date_histogram": { "field": "release_time", "interval": "day", "time_zone": "Asia/Shanghai" } } } } if search_body_logic_part is not None: if 'filter' in search_body_logic_part: search_body['query']['bool']['filter'].append(search_body_logic_part['filter']) else: search_body['query']['bool'].update(search_body_logic_part) else: pass search_response = es_read.search(index=index_sv, doc_type=doc_type_target, body=search_body, request_timeout=100) hits_total = search_response['hits']['total'] logger.info('find total hits %d in _type %s.' % (hits_total, doc_type_target)) if hits_total == 0: return else: pass # 1.1 By total hits and data distribution by release_time # find proper release_time range for multi-threads data_distr_by_release_time_Lst = search_response['aggregations']['release_time_distribution']['buckets'] average_data_num = hits_total // threads_num sharding_by_release_time = divid_by_release_time(logger, threads_num, average_data_num, data_distr_by_release_time_Lst) threads_num = sharding_by_release_time[0] release_time_range_Lst = sharding_by_release_time[1] # global lock to prevent conflit on stdout when print # used for error log stdout_lock = _thread.allocate_lock() search_body_in_thread_Lst = [] waitfor = [] for i in range(0, threads_num): release_time_ts_start = release_time_range_Lst[i]['start'] release_time_ts_end = release_time_range_Lst[i]['end'] search_body_in_thread = copy.deepcopy(search_body) search_body_in_thread.pop('size', None) search_body_in_thread.pop('aggs', None) search_body_in_thread['query']['bool']['filter'][0]['range']['release_time']['gte'] = release_time_ts_start search_body_in_thread['query']['bool']['filter'][0]['range']['release_time']['lt'] = release_time_ts_end search_body_in_thread['query']['bool']['filter'][1]['range']['fetch_time']['gte'] = fetch_time_start_ts search_body_in_thread['query']['bool']['filter'][1]['range']['fetch_time']['lt'] = fetch_time_end_ts search_body_in_thread_Lst.append(search_body_in_thread) thread = threading.Thread(target=cal_monthly_net_inc_and_write_es_in_thread, args=(i, logger, es_read, index_sv, doc_type_target, search_body_in_thread_Lst[i], first_day_of_the_month_T, stdout_lock, f_err_log)) waitfor.append(thread) thread.start() # check if every thread exists for thread in waitfor: thread.join() t_e_cal = datetime.datetime.now() t_delta_cal = t_e_cal-t_s_cal logger.info('for all data of date %s total time cost: %s' % (date_passed_in.isoformat()[:10], t_delta_cal)) logger.info('Main thread exiting.') f_err_log.close()