# -*- coding: utf-8 -*- """ Created on Mon Jan 8 15:12:14 2018 @author: hanye """ from elasticsearch import Elasticsearch import datetime import copy import json # 0 init print('0 init', datetime.datetime.now()) hosts='192.168.17.11' port=9200 es=Elasticsearch(hosts=hosts, port=port) index_short_video='short-video-production' doc_type_short_video_daily='daily-url' index_maintainance='maintainance' doc_type_maintainance_daily='daily' # 1 set fetch_day for daily maintainance data stats fetch_year=2018 fetch_month=1 fetch_day=1 fetch_time_start_T=datetime.datetime(fetch_year, fetch_month, fetch_day) fetch_time_end_T=fetch_time_start_T+datetime.timedelta(days=1) fetch_time_start_ts=int(fetch_time_start_T.timestamp()*1e3) fetch_time_end_ts=int(fetch_time_end_T.timestamp()*1e3) # 2 find how many platforms there print('2 find how many platforms there', datetime.datetime.now()) find_platform_Lst={ "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, ] } }, "size": 0, "aggs": { "platforms": { "terms": { "field": "platform.keyword", "size": 50 } } } } find_platform_resp=es.search(index=index_short_video, doc_type=doc_type_short_video_daily, body=find_platform_Lst, request_timeout=100) aggs_result=find_platform_resp['aggregations']['platforms']['buckets'] platform_dict={} for line in aggs_result: platform=line['key'] video_num=line['doc_count'] platform_dict[platform]=video_num # 3 define fields and their stats print('3 define fields and their stats', datetime.datetime.now()) field_dict={ 'play_count': { 'max': 'play_count_max', 'min': 'play_count_min', 'avg': 'play_count_avg', 'sum': 'play_count_sum', }, 'favorite_count':{ 'max': 'favorite_count_max', 'min': 'favorite_count_min', 'avg': 'favorite_count_avg', 'sum': 'favorite_count_sum', }, 'comment_count':{ 'max': 'comment_count_max', 'min': 'comment_count_min', 'avg': 'comment_count_avg', 'sum': 'comment_count_sum', }, } # 4 aggregations for each field's each stats metric print('4 aggregations for each field\'s each stats metric', datetime.datetime.now()) stats_Lst=[] fetch_date_ts=int(datetime.datetime(fetch_year, fetch_month, fetch_day).timestamp()*1e3) for platform in platform_dict: print('platform: %s' % platform, datetime.datetime.now()) stats_dict_init={'fetch_year': fetch_year, 'fetch_month': fetch_month, 'fetch_day': fetch_day, 'fetch_date': fetch_date_ts, 'platform': platform, } freq_type='daily' stats_dict_init['freq_type']=freq_type stats_body_observed={ "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, {"term": {"platform.keyword": platform}} ] } }, "size": 0, "aggs": { "field_stats": { "stats": { "field": None } } } } stats_body_new_released={ "query": { "bool": { "filter": [ {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, {"term": {"platform.keyword": platform}}, {"range": {"release_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}}, ] } }, "size": 0, "aggs": { "field_stats": { "stats": { "field": None } } } } stats_type_dict={'observed': stats_body_observed, 'new_released': stats_body_new_released} for stats_type in stats_type_dict: print('platform: %s, stats_type: %s' % (platform, stats_type), datetime.datetime.now()) stats_dict=copy.deepcopy(stats_dict_init) stats_dict['stats_type']=stats_type for field_name in field_dict: print('platform: %s, stats_type: %s, field: %s' % (platform, stats_type, field_name), datetime.datetime.now()) stats_body=stats_type_dict[stats_type] stats_body['aggs']['field_stats']['stats']['field']=field_name search_resp=es.search(index=index_short_video, doc_type=doc_type_short_video_daily, body=stats_body, request_timeout=100) video_num=search_resp['hits']['total'] stats_dict['video_num']=video_num field_max=search_resp['aggregations']['field_stats']['max'] field_min=search_resp['aggregations']['field_stats']['min'] field_avg=search_resp['aggregations']['field_stats']['avg'] field_sum=search_resp['aggregations']['field_stats']['sum'] stats_dict.update({ field_dict[field_name]['sum']: field_sum, field_dict[field_name]['max']: field_max, field_dict[field_name]['min']: field_min, field_dict[field_name]['avg']: field_avg, }) timestamp=int(datetime.datetime.now().timestamp()*1e3) stats_dict['timestamp']=timestamp stats_Lst.append(stats_dict) # 5 bulk write into maintainance index bulk_body='' for line in stats_Lst: line_id=(line['platform']+'_' +datetime.datetime.fromtimestamp(line['fetch_date']/1e3).isoformat()[:10]+'_' +line['stats_type']+'_' +line['freq_type']) print(line_id) action_json='{"index": {"_id":"'+line_id+'"}}' line_json=json.dumps(line, ensure_ascii=False) line_body=(action_json+'\n'+line_json+'\n') bulk_body+=line_body es.bulk(body=bulk_body, index=index_maintainance, doc_type=doc_type_maintainance_daily, request_timeout=200)