build_maintainance_index.py 6.39 KB
# -*- coding: utf-8 -*-
"""
Created on Mon Jan  8 15:12:14 2018

@author: hanye
"""

from elasticsearch import Elasticsearch
import datetime
import copy
import json

# 0 init
print('0 init', datetime.datetime.now())
hosts='192.168.17.11'
port=9200
es=Elasticsearch(hosts=hosts, port=port)

index_short_video='short-video-production'
doc_type_short_video_daily='daily-url'

index_maintainance='maintainance'
doc_type_maintainance_daily='daily'

# 1 set fetch_day for daily maintainance data stats
fetch_year=2018
fetch_month=1
fetch_day=1

fetch_time_start_T=datetime.datetime(fetch_year, fetch_month, fetch_day)
fetch_time_end_T=fetch_time_start_T+datetime.timedelta(days=1)

fetch_time_start_ts=int(fetch_time_start_T.timestamp()*1e3)
fetch_time_end_ts=int(fetch_time_end_T.timestamp()*1e3)

# 2 find how many platforms there
print('2 find how many platforms there', datetime.datetime.now())
find_platform_Lst={
  "query": {
    "bool": {
      "filter": [
        {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
        ]
    }
  },
  "size": 0,
  "aggs": {
    "platforms": {
      "terms": {
        "field": "platform.keyword",
        "size": 50
      }
    }
  }
}

find_platform_resp=es.search(index=index_short_video,
                      doc_type=doc_type_short_video_daily,
                      body=find_platform_Lst,
                      request_timeout=100)
aggs_result=find_platform_resp['aggregations']['platforms']['buckets']
platform_dict={}
for line in aggs_result:
    platform=line['key']
    video_num=line['doc_count']
    platform_dict[platform]=video_num

# 3 define fields and their stats 
print('3 define fields and their stats', datetime.datetime.now())
field_dict={
            'play_count': {
                           'max': 'play_count_max',
                           'min': 'play_count_min',
                           'avg': 'play_count_avg',
                           'sum': 'play_count_sum',
                           },
            'favorite_count':{
                              'max': 'favorite_count_max',
                              'min': 'favorite_count_min',
                              'avg': 'favorite_count_avg',
                              'sum': 'favorite_count_sum',
                              },
            'comment_count':{
                             'max': 'comment_count_max',
                             'min': 'comment_count_min',
                             'avg': 'comment_count_avg',
                             'sum': 'comment_count_sum',
                             }, 
            }


# 4 aggregations for each field's each stats metric
print('4 aggregations for each field\'s each stats metric', datetime.datetime.now())
stats_Lst=[]
fetch_date_ts=int(datetime.datetime(fetch_year, fetch_month, fetch_day).timestamp()*1e3)
for platform in platform_dict:
    print('platform: %s' % platform, datetime.datetime.now())
    stats_dict_init={'fetch_year': fetch_year,
                'fetch_month': fetch_month,
                'fetch_day': fetch_day,
                'fetch_date': fetch_date_ts,
                'platform': platform,
                }
    
    freq_type='daily'
    stats_dict_init['freq_type']=freq_type    

    stats_body_observed={
      "query": {
        "bool": {
          "filter": [
            {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
            {"term": {"platform.keyword": platform}}
            ]
        }
      },
      "size": 0,
      "aggs": {
        "field_stats": {
          "stats": {
            "field": None
          }
        }
      }
    }
          
    stats_body_new_released={
      "query": {
        "bool": {
          "filter": [
            {"range": {"fetch_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
            {"term": {"platform.keyword": platform}},
            {"range": {"release_time": {"gte":fetch_time_start_ts, "lt":fetch_time_end_ts}}},
            ]
        }
      },
      "size": 0,
      "aggs": {
        "field_stats": {
          "stats": {
            "field": None
          }
        }
      }
    }       

    stats_type_dict={'observed': stats_body_observed,
                     'new_released': stats_body_new_released}
 
    for stats_type in stats_type_dict:
        print('platform: %s, stats_type: %s' % (platform, stats_type), datetime.datetime.now())
        stats_dict=copy.deepcopy(stats_dict_init)
        stats_dict['stats_type']=stats_type
    
        for field_name in field_dict:
            print('platform: %s, stats_type: %s, field: %s' % (platform, stats_type, field_name),
                  datetime.datetime.now())
            stats_body=stats_type_dict[stats_type]
            stats_body['aggs']['field_stats']['stats']['field']=field_name
            
            search_resp=es.search(index=index_short_video,
                                  doc_type=doc_type_short_video_daily,
                                  body=stats_body,
                                  request_timeout=100)
            video_num=search_resp['hits']['total']
            stats_dict['video_num']=video_num
            
            field_max=search_resp['aggregations']['field_stats']['max']
            field_min=search_resp['aggregations']['field_stats']['min']
            field_avg=search_resp['aggregations']['field_stats']['avg']
            field_sum=search_resp['aggregations']['field_stats']['sum']
            
            stats_dict.update({
                        field_dict[field_name]['sum']: field_sum,
                        field_dict[field_name]['max']: field_max,
                        field_dict[field_name]['min']: field_min,
                        field_dict[field_name]['avg']: field_avg,
                        })

        timestamp=int(datetime.datetime.now().timestamp()*1e3)
        stats_dict['timestamp']=timestamp
        stats_Lst.append(stats_dict)

# 5 bulk write into maintainance index
bulk_body=''
for line in stats_Lst:
    line_id=(line['platform']+'_'
             +datetime.datetime.fromtimestamp(line['fetch_date']/1e3).isoformat()[:10]+'_'
             +line['stats_type']+'_'
             +line['freq_type'])
    print(line_id)
    action_json='{"index": {"_id":"'+line_id+'"}}'
    line_json=json.dumps(line, ensure_ascii=False)
    line_body=(action_json+'\n'+line_json+'\n')
    bulk_body+=line_body

es.bulk(body=bulk_body,
        index=index_maintainance,
        doc_type=doc_type_maintainance_daily,
        request_timeout=200)