# -*- coding: utf-8 -*-
"""
Created on Tue Feb 12 17:32:31 2019

@author: zhouyujiang

讲计算增量分为两部分
一部分计算历史数据
另一部分计算新发布数据 写入redis计算
写入 redis 采用pipeline 速度提升
从redis读取,采用管道不合适

"""

import redis
import hashlib
import datetime
import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

hosts = '192.168.17.11'
port = 9200
es = Elasticsearch(hosts=hosts, port=port, timeout=30)


    
def cal_id_hash(doc_id, cal_type):
    releaser_hash = ('%s_%s' % (cal_type, hashlib.md5(
        doc_id.encode('utf-8')).hexdigest()))
    return releaser_hash
    
def scan_index(index, doc_type, search_body, 
               log_file):
    search_resp = es.search(index=index,
                            doc_type=doc_type,
                            body=search_body,
                            size=0)
    total_hit = search_resp['hits']['total']
    print('Index: %s total hit: %d'
          % (index, total_hit), file=log_file)
    if total_hit > 0:
        scan_resp = scan(client=es,
                         query=search_body,
                         index=index,
                         doc_type=doc_type,
                         request_timeout=300)
    else:
        print('Zero hit.', file=log_file)
        scan_resp = None

    return (total_hit, scan_resp)
    
def form_bulk_body(line_data, doc_id):

    line_data.update({'timestamp': int(datetime.datetime.now().timestamp()*1e3)})
    action_json_line_str = '{"index": {"_id":"' + doc_id + '"}}'
    line_data.pop('_id', None)
    data_json_line_str = json.dumps(line_data, ensure_ascii=False)
    line_bulk_body = (action_json_line_str + '\n'
                      + data_json_line_str + '\n')
    return line_bulk_body
    
def bulk_write(data_list, doc_type,index):
    bulk_body = ''
    for data_dict in data_list:
        doc_id = data_dict['_id']
        line_bulk_body = form_bulk_body(data_dict['_source'], doc_id)
        bulk_body += line_bulk_body
    try:

        error = es.bulk(body=bulk_body, index=index,
                doc_type=doc_type,
                request_timeout=100)
    except:
        print(error)

    data_list.clear()

def func_cal_increment(index_last, doc_type_last, cal_type,
                       index_now, doc_type_now, search_body=None,
                       now_body=None,
                       release_time_st_last=None,
                       release_time_et_last=None,
                       redis_db=10,
                       log_f=None,
                       limit_platform=None,
                       target_id="_id"):
    if log_f is None:
        is_close = True
        file_path = '/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/cal_NI_{dstr}.log'.format(dstr=str(datetime.datetime.now())[0:10])
        log_file = open(file_path, 'a')
    else:
        log_file = log_f
        is_close = False
    rds = redis.StrictRedis(host='192.168.17.60', port=6379, db=redis_db)
    rds.flushdb()
    pipe = rds.pipeline()
 
    re_list = []
    rds_cpunt = 0 
    count = 0 
    if cal_type == 'M':
        cal_dict = {'cal_base':'monthly_cal_base',
                    'net_inc_repost_count': 'monthly_net_inc_repost_count',
                    'net_inc_play_count':'monthly_net_inc_play_count',
                    'net_inc_comment_count':"monthly_net_inc_comment_count",
                    'net_inc_favorite_count':"monthly_net_inc_favorite_count",
                    'net_inc_play_count_real': 'monthly_net_inc_play_count_real',
                    'net_inc_comment_count_real': 'monthly_net_inc_comment_count_real',
                    'net_inc_favorite_count_real': 'monthly_net_inc_favorite_count_real',
                    'net_inc_repost_count_real': 'monthly_net_inc_repost_count_real',
                    }
        print('will use %s'%cal_dict, file=log_file)
    if cal_type == 'W':
        cal_dict = {'cal_base':'weekly_cal_base',
            'net_inc_repost_count': 'weekly_net_inc_repost_count',
            'net_inc_play_count':'weekly_net_inc_play_count',
            'net_inc_comment_count':"weekly_net_inc_comment_count",
            'net_inc_favorite_count':"weekly_net_inc_favorite_count",
            'net_inc_play_count_real': 'weekly_net_inc_play_count_real',
            'net_inc_comment_count_real': 'weekly_net_inc_comment_count_real',
            'net_inc_favorite_count_real': 'weekly_net_inc_favorite_count_real',
            'net_inc_repost_count_real': 'weekly_net_inc_repost_count_real',
            }
        print('will use %s'%cal_dict, file=log_file)
    if cal_type == 'N':
        cal_dict = {
            'cal_base':'netly_cal_base',
            'net_inc_repost_count': 'net_inc_repost_count',
            'net_inc_play_count':'net_inc_play_count',
            'net_inc_comment_count':"net_inc_comment_count",
            'net_inc_favorite_count':"net_inc_favorite_count",
            'net_inc_play_count_real': 'net_inc_play_count_real',
            'net_inc_comment_count_real': 'net_inc_comment_count_real',
            'net_inc_favorite_count_real': 'net_inc_favorite_count_real',
            'net_inc_repost_count_real': 'net_inc_repost_count_real',
            }
    if search_body == None:
        if release_time_st_last != None and release_time_et_last != None:
            scan_body = {
                      "query": {
                          "bool": {
                            "filter": [
                              {"range": {"release_time": {"gte": release_time_st_last,
                                                          "lt":release_time_et_last}}}
                              ]
                          }
                          }
                         }
        else:
            print('GET error in release_time')
            return None
    else:
        scan_body = search_body
    print(scan_body)
    if limit_platform!= None:
        platform_dict = {"terms": {"platform.keyword": limit_platform}},
        scan_body["query"]['bool']['filter'].append(platform_dict)
        print('last_search:', scan_body)
    total_hit, scan_resp = scan_index(index_last, doc_type_last, scan_body, log_file)
    
    print('%s _ %s total have %s'%(index_last, doc_type_last, total_hit), file=log_file)
    
    print('start  write into redis', datetime.datetime.now(), file=log_file)
    
    for one_scan in scan_resp:
        rds_cpunt = rds_cpunt + 1
        if rds_cpunt %10000 == 0 or rds_cpunt == total_hit:
            print('write into redis {rds_cpunt}/{total_hit}'.format(rds_cpunt=rds_cpunt,total_hit=total_hit),datetime.datetime.now(), file=log_file)
            pipe.execute()
        line_dcit = one_scan['_source']
        if target_id == "_id":
            doc_id = one_scan[target_id]
        else:
            doc_id = one_scan['_source'][target_id]
        doc_id_hash = cal_id_hash(doc_id, cal_type)
        if 'repost_count' in line_dcit:
            try:
                data_dict_for_redis = {'play_count': line_dcit.get('play_count'),
                                       'favorite_count': line_dcit['favorite_count'],
                                       'comment_count': line_dcit['comment_count'],
                                       'repost_count': line_dcit['repost_count']}
            except Exception as e:
                print(180, e, line_dcit, file=log_file)
                continue
        else:
            try:
                data_dict_for_redis = {'play_count': line_dcit.get('play_count'),
                                       'favorite_count': line_dcit['favorite_count'],
                                       'comment_count': line_dcit['comment_count']}
            except Exception as e:
                print(186, e, line_dcit, file=log_file)
                continue
        pipe.hmset(doc_id_hash, data_dict_for_redis)
    pipe.execute()
    print('end write into redis', datetime.datetime.now(), file=log_file)
    print('start cal IN', datetime.datetime.now(), file=log_file)
    if now_body == None:
        search_now_body = {
                   "query": {
                      "bool": {
                        "filter": [
                          {"range": {"release_time": {"gte": release_time_st_last,
                                                      "lt":release_time_et_last}}}
                          ]
                      }
                      }   
                      }
    else:
        search_now_body = now_body
    if limit_platform!=None:
        platform_dict = {"terms": {"platform.keyword": limit_platform}},
        search_now_body["query"]['bool']['filter'].append(platform_dict)
        print('this_search:', search_now_body)
    total_hit_now, scan_resp_now = scan_index(index_now, doc_type_now, search_now_body,
                                              log_file)
    
    print('%s _ %s total have %s'%(index_now, doc_type_now, total_hit_now), 
          file=log_file)
    
    for one_scan_now in scan_resp_now:
        count = count + 1 
        line_dcit_now = one_scan_now['_source']
        if target_id == "_id":
            doc_id = one_scan_now[target_id]
        else:
            doc_id = one_scan_now['_source'][target_id]
        doc_id_hash = cal_id_hash(doc_id, cal_type)
        if rds.exists(doc_id_hash):
            data_dict_in_redis = rds.hgetall(doc_id_hash)
            try:
                cal_base = 'historical_complete'
                try:
                    net_inc_play_count = line_dcit_now['play_count'] - int(data_dict_in_redis[b'play_count'])
                    # print(doc_id,line_dcit_now['play_count'],int(data_dict_in_redis[b'play_count']))
                except:
                    net_inc_play_count = 0
                net_inc_comment_count = line_dcit_now['comment_count'] - int(data_dict_in_redis[b'comment_count'])
                net_inc_favorite_count = line_dcit_now['favorite_count'] - int(data_dict_in_redis[b'favorite_count'])
            except Exception as e:
                print(e, line_dcit_now, file=log_file)
                continue
            if 'repost_count' in line_dcit_now and b'repost_count' in data_dict_in_redis:
                try:
                    net_inc_repost_count = line_dcit_now['repost_count'] - int(data_dict_in_redis[b'repost_count'])
                except Exception as e:
                    print(e, line_dcit_now, file=log_file)
            else:
                net_inc_repost_count = 0
                
            if net_inc_play_count < 0 or net_inc_comment_count < 0 or net_inc_favorite_count < 0 or net_inc_repost_count < 0:
                cal_base = 'historical_complete_change_MNI'
                if net_inc_play_count < 0:
                    net_inc_play_count_real = net_inc_play_count
                    net_inc_play_count = 0
                else:
                    net_inc_play_count_real = 0
                if net_inc_comment_count < 0:
                    net_inc_comment_count_real = net_inc_comment_count
                    net_inc_comment_count = 0
                else:
                    net_inc_comment_count_real = 0
                if net_inc_favorite_count < 0:
                    net_inc_favorite_count_real = net_inc_favorite_count
                    net_inc_favorite_count = 0
                else:
                    net_inc_favorite_count_real = 0
                if net_inc_repost_count < 0:
                    net_inc_repost_count_real = net_inc_repost_count
                    net_inc_repost_count = 0
                else:
                    net_inc_repost_count_real = 0
                one_scan_now['_source'].update({cal_dict['cal_base']:cal_base,
                                          cal_dict['net_inc_repost_count']:net_inc_repost_count,
                                          cal_dict['net_inc_play_count']:net_inc_play_count,
                                          cal_dict['net_inc_comment_count']:net_inc_comment_count,
                                          cal_dict['net_inc_favorite_count']:net_inc_favorite_count,
                                          cal_dict['net_inc_repost_count_real']:net_inc_repost_count_real,
                                          cal_dict['net_inc_play_count_real']:net_inc_play_count_real,
                                          cal_dict['net_inc_comment_count_real']:net_inc_comment_count_real,
                                          cal_dict['net_inc_favorite_count_real']:net_inc_favorite_count_real,                                  
                                      })
            else:
                one_scan_now['_source'].update({cal_dict['cal_base']:cal_base,
                          cal_dict['net_inc_play_count']:net_inc_play_count,
                          cal_dict['net_inc_comment_count']:net_inc_comment_count,
                          cal_dict['net_inc_favorite_count']:net_inc_favorite_count,
                          cal_dict['net_inc_repost_count']:net_inc_repost_count                          
                      })
            re_list.append(one_scan_now)
            if count%1000 == 0 or count == total_hit_now:
                print(str(count) + "/" + str(total_hit_now), file=log_file)
                bulk_write(data_list=re_list, doc_type=doc_type_now,
                           index=index_now)
                re_list.clear()
        elif cal_dict['net_inc_play_count'] in one_scan_now['_source']:
            one_scan_now['_source'].pop(cal_dict['net_inc_play_count'])
            one_scan_now['_source'].pop(cal_dict['net_inc_comment_count'])
            one_scan_now['_source'].pop(cal_dict['net_inc_favorite_count'])
            cal_base= 'historical_not_found'
#            one_scan_now['_source'].update({cal_dict['cal_base']:cal_base,
#                                              cal_dict['net_inc_repost_count']:0,
#                                              cal_dict['net_inc_play_count']:0,
#                                              cal_dict['net_inc_comment_count']:0,
#                                              cal_dict['net_inc_favorite_count']:0,
#                                              cal_dict['net_inc_repost_count_real']:0,
#                                              cal_dict['net_inc_play_count_real']:0,
#                                              cal_dict['net_inc_comment_count_real']:0,
#                                              cal_dict['net_inc_favorite_count_real']:0,                                  
#                                          })
            re_list.append(one_scan_now)
            if count%1000 == 0 or count == total_hit_now:
                print(str(count) + "/" + str(total_hit_now), file=log_file)
                bulk_write(data_list=re_list, doc_type=doc_type_now,
                           index=index_now)
                re_list.clear()
    if re_list != []:
        bulk_write(data_list=re_list, doc_type=doc_type_now,
                   index=index_now)
        re_list.clear()
    if is_close:
        log_file.close()
                
def func_cal_new_released_NI(cal_type, index_now, doc_type_now, exter_list=None, 
                             search_body=None,
                             release_time_st_now=None,
                             release_time_et_now=None,
                             log_f=None,
                             exter_dict=None,
                             limit_platform=None):
    if log_f is None:
        is_close = True
        file_path = '/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/cal_NI_{dstr}.log'.format(dstr=str(datetime.datetime.now())[0:10])
        log_file = open(file_path, 'a')
    else:
        log_file = log_f
        is_close = False
    count = 0
    re_list = []
    if cal_type == 'M':
        cal_dict = {'cal_base':'monthly_cal_base',
                    'net_inc_repost_count': 'monthly_net_inc_repost_count',
                    'net_inc_play_count':'monthly_net_inc_play_count',
                    'net_inc_comment_count':"monthly_net_inc_comment_count",
                    'net_inc_favorite_count':"monthly_net_inc_favorite_count"
                    }
        print('will use %s'%cal_dict, file=log_file)
    if cal_type == 'W':
        cal_dict = {'cal_base':'weekly_cal_base',
                    'net_inc_repost_count': 'weekly_net_inc_repost_count',
                    'net_inc_play_count':'weekly_net_inc_play_count',
                    'net_inc_comment_count':"weekly_net_inc_comment_count",
                    'net_inc_favorite_count':"weekly_net_inc_favorite_count"
            }
        print('will use %s'%cal_dict, file=log_file)
    if cal_type == 'N':
        cal_dict = {
            'cal_base':'netly_cal_base',
            'net_inc_repost_count': 'net_inc_repost_count',
            'net_inc_play_count':'net_inc_play_count',
            'net_inc_comment_count':"net_inc_comment_count",
            'net_inc_favorite_count':"net_inc_favorite_count"
            }
    if search_body  == None:
        search_now_body = {
                       "query": {
                          "bool": {
                            "filter": [
                              {"range": {"release_time": {"gte": release_time_st_now,
                                                          "lt":release_time_et_now}}}
                              ]
                    #,
                    # "must_not": [
                    # {"exists": {"field": "monthly_net_inc_play_count"}}
                    # ]
                          }
                          }   
                          }
        if exter_dict != None:
            try:
                print(exter_list)
                for one_exter in exter_list:
                    search_now_body['query']['bool']['filter'].append(one_exter)
            except:
                print(exter_list)
    else:
        search_now_body = search_body
    if limit_platform != None:
        platform_dict = {"terms": {"platform.keyword": limit_platform}},
        search_now_body["query"]['bool']['filter'].append(platform_dict)
        print('this_search:', search_now_body)
    total_hit_now, scan_resp_now = scan_index(index_now, doc_type_now, search_now_body,
                                              log_file)
    
    print('%s _ %s in total have %s'%(index_now, doc_type_now,total_hit_now), file=log_file)
    for one_scan_now in scan_resp_now:
        count = count + 1 
        line_dcit_now = one_scan_now['_source']
        try:
            cal_base = 'accumulate'
            net_inc_play_count = line_dcit_now.get('play_count')
            if not net_inc_play_count:
                net_inc_play_count = 0
            net_inc_comment_count = line_dcit_now['comment_count']
            net_inc_favorite_count = line_dcit_now['favorite_count']
        except Exception as e:
            print(374, e, line_dcit_now, file=log_file)
            continue
        if 'repost_count' in line_dcit_now:
            try:
                net_inc_repost_count = line_dcit_now['repost_count']
            except Exception as e:
                print(380, e, line_dcit_now, file=log_file)
                continue
        else:
            net_inc_repost_count = 0
        one_scan_now['_source'].update({cal_dict['cal_base']:cal_base,
                      cal_dict['net_inc_play_count']:net_inc_play_count,
                      cal_dict['net_inc_comment_count']:net_inc_comment_count,
                      cal_dict['net_inc_favorite_count']:net_inc_favorite_count,
                      cal_dict['net_inc_repost_count']:net_inc_repost_count,
                      })
        re_list.append(one_scan_now)
        if count%1000 ==0 or count == total_hit_now:
            bulk_write(data_list=re_list, doc_type=doc_type_now,
                       index=index_now)    
    if is_close:    
        log_file.close()              
    
if __name__ == '__main__':
#    release_time_st_last = 1543593600000
#    release_time_et_last = 1546272000000
#    index_last = 'short-video-production-2018'
#    doc_type_last = 'daily-url-2018-12-31'
    cal_type = 'M'
    release_time_st_now = 1546272000000
    release_time_et_now = 1548950400000
    index_now = 'short-video-production-2019'
    doc_type_now = 'daily-url-2019-01-31'
    exter_dict = [{"term": {"releaser.keyword": "看看新闻Knews"}},
                  {"term": {"platform.keyword": "haokan"}}]
    func_cal_new_released_NI(cal_type=cal_type,
                             release_time_st_now=release_time_st_now,
                             release_time_et_now=release_time_et_now,
                             index_now=index_now,
                             doc_type_now=doc_type_now,
                             exter_list=exter_dict
                       )