# -*- coding: utf-8 -*- """ Created on Mon Jan 8 15:12:14 2018 @author: hanye """ from elasticsearch import Elasticsearch import datetime import copy import json import logging def build_maintainance_index(fetch_year, fetch_month, fetch_day, freq_type='daily'): # Be noted, currently, freq_type is re-assigned to be 'daily', so even if a value # passed in which is different from 'daily' will be ignored and the freq_type # will still be 'daily'. Jan 08 2018 # creater logger logger=logging.getLogger('maintainance_build_manual.func') logger.info('fetch_year %d, fetch_month %d, fetch_day %d' %(fetch_year, fetch_month, fetch_day)) # 0 init logger.info('0 init') hosts='192.168.17.11' port=9200 es=Elasticsearch(hosts=hosts, port=port) index_short_video='short-video-production' doc_type_short_video_daily='daily-url' index_maintainance='maintainance-short-video' doc_type_maintainance_daily='daily' # 1 set fetch_day for daily maintainance data stats logger.info('1 set fetch_day for daily maintainance data stats') fetch_time_start_T=datetime.datetime(fetch_year, fetch_month, fetch_day) fetch_time_end_T=fetch_time_start_T+datetime.timedelta(days=1) fetch_time_start_ts=int(fetch_time_start_T.timestamp()*1e3) fetch_time_end_ts=int(fetch_time_end_T.timestamp()*1e3) # 2 find how many platforms there logger.info('2 find how many platforms there') find_platform_Lst={ "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, ] } }, "size": 0, "aggs": { "platforms": { "terms": { "field": "platform.keyword", "size": 50 } } } } find_platform_resp=es.search(index=index_short_video, doc_type=doc_type_short_video_daily, body=find_platform_Lst, request_timeout=100) aggs_result=find_platform_resp['aggregations']['platforms']['buckets'] platform_dict={} for line in aggs_result: platform=line['key'] video_num=line['doc_count'] platform_dict[platform]=video_num # 3 define fields and their stats logger.info('3 define fields and their stats') field_dict={ 'play_count': { 'max': 'play_count_max', 'min': 'play_count_min', 'avg': 'play_count_avg', 'sum': 'play_count_sum', }, 'favorite_count':{ 'max': 'favorite_count_max', 'min': 'favorite_count_min', 'avg': 'favorite_count_avg', 'sum': 'favorite_count_sum', }, 'comment_count':{ 'max': 'comment_count_max', 'min': 'comment_count_min', 'avg': 'comment_count_avg', 'sum': 'comment_count_sum', }, } # 4 aggregations for each field's each stats metric logger.info('4 aggregations for each field\'s each stats metric') stats_Lst=[] fetch_date_ts=int(datetime.datetime(fetch_year, fetch_month, fetch_day).timestamp()*1e3) for platform in platform_dict: logger.info('platform: %s' % (platform)) stats_dict_init={'fetch_year': fetch_year, 'fetch_month': fetch_month, 'fetch_day': fetch_day, 'fetch_date': fetch_date_ts, 'platform': platform, } freq_type='daily' stats_dict_init['freq_type']=freq_type stats_body_observed={ "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, {"term": {"platform.keyword": platform}} ] } }, "size": 0, "aggs": { "field_stats": { "stats": { "field": None } } } } fetch_time_start_ts_enlarge=int(fetch_time_start_ts-24*3600*1e3) stats_body_new_released={ "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, {"term": {"platform.keyword": platform}}, {"range": {"release_time": {"gte":fetch_time_start_ts_enlarge, "lt":fetch_time_end_ts}}}, ] } }, "size": 0, "aggs": { "field_stats": { "stats": { "field": None } } } } stats_type_dict={'observed': stats_body_observed, 'new_released': stats_body_new_released} for stats_type in stats_type_dict: logger.info('platform: %s, stats_type: %s' % (platform, stats_type)) stats_dict=copy.deepcopy(stats_dict_init) stats_dict['stats_type']=stats_type for field_name in field_dict: logger.info('platform: %s, stats_type: %s, field: %s' % (platform, stats_type, field_name)) stats_body=stats_type_dict[stats_type] stats_body['aggs']['field_stats']['stats']['field']=field_name search_resp=es.search(index=index_short_video, doc_type=doc_type_short_video_daily, body=stats_body, request_timeout=100) video_num=search_resp['hits']['total'] stats_dict['video_num']=video_num field_max=search_resp['aggregations']['field_stats']['max'] field_min=search_resp['aggregations']['field_stats']['min'] field_avg=search_resp['aggregations']['field_stats']['avg'] field_sum=search_resp['aggregations']['field_stats']['sum'] stats_dict.update({ field_dict[field_name]['sum']: field_sum, field_dict[field_name]['max']: field_max, field_dict[field_name]['min']: field_min, field_dict[field_name]['avg']: field_avg, }) timestamp=int(datetime.datetime.now().timestamp()*1e3) stats_dict['timestamp']=timestamp stats_Lst.append(stats_dict) # 5 bulk write into maintainance index bulk_body='' for line in stats_Lst: line_id=(line['platform']+'_' +datetime.datetime.fromtimestamp(line['fetch_date']/1e3).isoformat()[:10]+'_' +line['stats_type']+'_' +line['freq_type']) action_json='{"index": {"_id":"'+line_id+'"}}' line_json=json.dumps(line, ensure_ascii=False) line_body=(action_json+'\n'+line_json+'\n') bulk_body+=line_body if len(bulk_body)!=0: es.bulk(body=bulk_body, index=index_maintainance, doc_type=doc_type_maintainance_daily, request_timeout=200) else: logger.info('Got empty bulk_body at fetch_year %d, fetch_month %d, fetch_day %d' %(fetch_year, fetch_month, fetch_day))