# -*- coding: utf-8 -*- """ Created on Wed Jun 20 08:54:21 2018 @author: hanye """ import datetime import sys from elasticsearch.helpers import scan from elasticsearch import Elasticsearch es = Elasticsearch(hosts='192.168.17.11', port=9200) ## when test, us es_hy_sign_in #from es_hy_sign_in import es from func_find_week_num import find_week_belongs_to from func_general_bulk_write import bulk_write_short_video def define_doc_type(week_year, week_no, week_day_start): """ doc_type = 'daily-url-2018_w24_s2' means select Tuesday as the first day of each week, it's year 2018's 24th week. In isocalendar defination, Monday - weekday 1, Tuesday - weekday 2, ..., Saturday - weekday 6, Sunday - weekday 7. """ doc_type_str = 'daily-url-%d_w%02d_s%d' % (week_year, week_no, week_day_start) return doc_type_str def update_weekly_datapool(fetch_time_start_T, fetch_time_end_T=None, index_source='short-video-production', index_dest='short-video-weekly', week_day_start=1, f_log=sys.stdout): doc_type_daily = 'daily-url' if fetch_time_end_T is None: fetch_time_end_T = fetch_time_start_T + datetime.timedelta(days=1) else: pass if (fetch_time_end_T-fetch_time_start_T).days <= 1: week_year, week_no, week_day = find_week_belongs_to(fetch_time_start_T, week_day_start) doc_type_data_pool = define_doc_type(week_year, week_no, week_day_start=week_day_start) fetch_time_start_ts = int(fetch_time_start_T.timestamp()*1e3) fetch_time_end_ts = int(fetch_time_end_T.timestamp()*1e3) release_time_start_T = fetch_time_start_T - datetime.timedelta(days=30) find_daily_data_bd = { "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte": fetch_time_start_ts, "lt": fetch_time_end_ts}}}, {"range": {"release_time": {"gte": release_time_start_T, "lt": fetch_time_end_ts}}} ] } } } search_resp = es.search(index=index_source, doc_type=doc_type_daily, body=find_daily_data_bd, size=0, request_timeout=100) hit_total = search_resp['hits']['total'] print('Got %d results in %s/%s on fetch_time range [%s, %s)' % (hit_total, index_source, doc_type_daily, fetch_time_start_T, fetch_time_end_T), datetime.datetime.now(), file=f_log) if hit_total > 0: print('Will write into %s/%s' % (index_dest, doc_type_data_pool), datetime.datetime.now(), file=f_log) scan_resp = scan(client=es, index=index_source, doc_type=doc_type_daily, query=find_daily_data_bd, request_timeout=300) line_counter = 0 data_Lst = [] for line in scan_resp: line_counter += 1 line_d = line['_source'] data_Lst.append(line_d) if line_counter%1000 == 0 or line_counter == hit_total: print('Writing lines %d/%d [%.2f%%]' % (line_counter, hit_total, line_counter/hit_total*100), datetime.datetime.now(), file=f_log) bulk_write_resp = bulk_write_short_video(data_Lst, client=es, index=index_dest, doc_type=doc_type_data_pool, doc_id_type='all-time-url', f_log=f_log) data_Lst.clear() else: print('Got zero hits, program exits.', datetime.datetime.now(), file=f_log) else: print('It\'s NOT recommended that pass in fetch_time range more ' 'than 1 day.', datetime.datetime.now(), file=f_log) print('fetch_time_end_T is more than 1 day greater than fetch_time_start_T, ' 'will be processed recursively one by one at 1 day interval.', datetime.datetime.now(), file=f_log) fetch_time_end_T_ori = fetch_time_end_T fetch_time_end_T_alter = fetch_time_start_T + datetime.timedelta(days=1) print('Alter fetch_time_end_T to be 1 day later than fetch_time_start_T,', datetime.datetime.now(), file=f_log) update_weekly_datapool(fetch_time_start_T, fetch_time_end_T_alter, index_source=index_source, index_dest=index_dest, f_log=f_log) print('Processing data started after altered fetch_time_end_T', datetime.datetime.now(), file=f_log) fetch_time_start_T_alter = fetch_time_end_T_alter fetch_time_end_T_alter_n = fetch_time_end_T_ori update_weekly_datapool(fetch_time_start_T_alter, fetch_time_end_T_alter_n, index_source=index_source, index_dest=index_dest, f_log=f_log) # test if __name__ == '__main__': update_weekly_datapool(datetime.datetime(2018,6,18), datetime.datetime(2018,6,18, 0,5,0), # index_dest='test-v2', week_day_start=1) week_year1, week_no1, week_day1 = find_week_belongs_to(datetime.datetime(2018,6,18), 1) print(define_doc_type(week_year1, week_no1, 1))