# -*- coding: utf-8 -*-
"""
Created on Wed Jun 20 11:24:30 2018

It takes 8.5 hours to caculate 9088171 lines in short-video-weekly/daily-url-2018_w23_s1
when setting threads_num = 10 in actual program execution, see detail in log file
on server 192.168.17.11 file
/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/
cal_weekly_net_inc_for_daily-url-2018_w23_s1_on_2018-07-04T18-38-35.987666_log

A rough estimate earilier shows about 50 hours time expense in single thread on
the same data set.

@author: hanye
"""

import datetime
import re
import sys
import threading
import copy
import logging
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from func_find_week_num import find_week_belongs_to
from func_find_week_num import day_by_week_info
from func_general_bulk_write import bulk_write_short_video
from func_cal_doc_id import cal_doc_id


es = Elasticsearch(hosts='192.168.17.11', port=9200)

def find_error_from_bulk_resp(resp):
    err_msg_Lst = []
    if resp is not None:
        if resp['errors'] is True:
            for line in resp['items']:
                if line['index']['status'] == 400:
                    err_msg = line['index']['error']
                    err_id = line['index']['_id']
                    err_msg_Lst.append({err_id: err_msg})
            return err_msg_Lst


def parse_week_param(doc_type_weekly):
    try:
        week_year = re.findall('[0-9]{4}', doc_type_weekly)[0]
        week_no_raw_str = re.findall('_w[0-9]{2}_', doc_type_weekly)[0]
        week_no = week_no_raw_str[2:4]
        week_day_start_raw_str = re.findall('_s[1-7]{1}', doc_type_weekly)[0]
        week_day_start = week_day_start_raw_str[2:3]
        return {'week_year': int(week_year),
                'week_no': int(week_no),
                'week_day_start': int(week_day_start)}
    except:
        return None


def find_value_after_fetch_day(platform, url, fetch_dayD,
                               fetch_time_upper_bdrT,
                               data_dict,
                               index='short-video-production',
                               doc_type='daily-url'):
    fetch_day_str = fetch_dayD.isoformat()
    video_id_bare = cal_doc_id(platform, url, fetch_day_str=fetch_day_str,
                               data_dict=data_dict,
                               doc_id_type='bare')
    fetch_time_start_ts = int(datetime.datetime(fetch_dayD.year,
                                                fetch_dayD.month,
                                                fetch_dayD.day).timestamp()*1e3)
    fetch_time_end_ts = int(fetch_time_upper_bdrT.timestamp()*1e3)
    if platform in ['toutiao', 'new_tudou']:
        search_bd = {
            "query": {
                "bool": {
                    "filter": [
                        {"term": {"platform.keyword": platform}},
                        {"range": {"fetch_time": {"gte": fetch_time_start_ts,
                                                  "lt": fetch_time_end_ts}}}
                    ],
                    "must": [
                        {"match_phrase": {"url": video_id_bare}}
                    ]
                }
            },
            "sort": [
                {"fetch_time": {"order": "asc"}}
                ]
        }
    else:
        search_bd = {
            "query": {
                "bool": {
                    "filter": [
                        {"term": {"platform.keyword": platform}},
                        {"range": {"fetch_time": {"gte": fetch_time_start_ts,
                                                  "lt": fetch_time_end_ts}}},
                        {"term": {"url.keyword": url}}
                    ],
                }
            },
            "sort": [
                {"fetch_time": {"order": "asc"}}
                ]
        }
    try:
        search_pre_resp = es.search(index=index, doc_type=doc_type,
                                    body=search_bd, size=1,
                                    request_timeout=100)
        hits_total = search_pre_resp['hits']['total']
        if hits_total > 0:
            pre_dict = search_pre_resp['hits']['hits'][0]['_source']
            doc_id = search_pre_resp['hits']['hits'][0]['_id']
            value_dict = {'play_count': pre_dict['play_count'],
                          'fetch_time': pre_dict['fetch_time'],
                          'doc_id': doc_id}
            if 'favorite_count' in pre_dict:
                value_dict.update({'favorite_count': pre_dict['favorite_count']})
            else:
                value_dict.update({'favorite_count': 0,})
            if 'comment_count' in pre_dict:
                value_dict.update({'comment_count': pre_dict['comment_count'],})
            else:
                value_dict.update({'comment_count': 0,})
            return value_dict
        else:
            return None
    except:
        return None


def form_sub_value_result(weekly_cal_base, pre_week_value, curr_data_dict):
    WNI_hist_data_id = pre_week_value['doc_id']
    try:
        weekly_net_inc_play_count = (curr_data_dict['play_count']
                                     - pre_week_value['play_count'])
    except:
        return None
    else:
        if 'favorite_count' in curr_data_dict and 'favorite_count' in pre_week_value:
            weekly_net_inc_favorite_count = (curr_data_dict['favorite_count']
                                             - pre_week_value['favorite_count'])
        else:
            weekly_net_inc_favorite_count = 0
        if 'comment_count' in curr_data_dict and 'comment_count' in pre_week_value:
            weekly_net_inc_comment_count = (curr_data_dict['comment_count']
                                            - pre_week_value['comment_count'])
        else:
            weekly_net_inc_comment_count = 0
        sub_result = {'weekly_cal_base': weekly_cal_base,
                      'WNI_hist_data_id': WNI_hist_data_id,
                      'weekly_net_inc_play_count': weekly_net_inc_play_count,
                      'weekly_net_inc_favorite_count': weekly_net_inc_favorite_count,
                      'weekly_net_inc_comment_count': weekly_net_inc_comment_count}
        return sub_result


def sub_pre_week(curr_doc_type_weekly, curr_data_dict):
    url = curr_data_dict['url']
    fetch_day_T = datetime.datetime.fromtimestamp(curr_data_dict['fetch_time']/1e3)
    platform = curr_data_dict['platform']
    seven_days_before = fetch_day_T - datetime.timedelta(days=7)
    week_info = parse_week_param(curr_doc_type_weekly)
    if week_info != None:
        week_day_start = week_info['week_day_start']
        pre_week_year, pre_week_no, pre_weekday = find_week_belongs_to(
            seven_days_before, week_day_start)
        last_day_in_pre_weekD = day_by_week_info(pre_week_year, pre_week_no, 7,
                                                 week_day_start)
        pre_week_value = find_value_after_fetch_day(platform, url,
                                                    last_day_in_pre_weekD,
                                                    fetch_day_T,
                                                    data_dict=curr_data_dict)
        if pre_week_value is not None:
            pre_week_data_fetch_time = pre_week_value['fetch_time']
            try:
                pre_week_data_fetch_timeT = (datetime.datetime
                                             .fromtimestamp(pre_week_data_fetch_time/1e3))
                pre_week_data_fetch_timeD = datetime.date(pre_week_data_fetch_timeT.year,
                                                          pre_week_data_fetch_timeT.month,
                                                          pre_week_data_fetch_timeT.day)
                if pre_week_data_fetch_timeD == last_day_in_pre_weekD:
                    weekly_cal_base = 'historical_complete'
                else:
                    weekly_cal_base = 'historical_uncomplete'
                sub_result = form_sub_value_result(weekly_cal_base,
                                                   pre_week_value,
                                                   curr_data_dict)
            except:
                print('Got exception, probably because of fetch_time ill-formated.')
                return None
        else:
            sub_result = {'weekly_cal_base': 'historical_data_absent'}
        return sub_result
    else:
        return None


def cal_weekly_net_inc(curr_doc_type_weekly, data_dict):
    if 'release_time' in data_dict and 'fetch_time' in data_dict and 'url' in data_dict:
        week_info = parse_week_param(curr_doc_type_weekly)
        first_day_in_weekD = day_by_week_info(week_info['week_year'],
                                              week_info['week_no'], 1,
                                              week_info['week_day_start'])
        first_day_in_weekT = datetime.datetime(first_day_in_weekD.year,
                                               first_day_in_weekD.month,
                                               first_day_in_weekD.day)
        first_day_in_weekT_by_release_time = first_day_in_weekT - datetime.timedelta(1)
#        first_day_in_week_ts = int(first_day_in_weekT.timestamp()*1e3)
        first_day_in_weekT_by_release_time_ts = int(first_day_in_weekT_by_release_time.timestamp()*1e3)
        try:
            release_time = int(data_dict['release_time'])
        except:
            return None
        if release_time >= first_day_in_weekT_by_release_time_ts:
            weekly_cal_base = 'accumulate'
            try:
                weekly_net_inc_play_count = data_dict['play_count']
            except:
                return None
            else:
                if 'favorite_count' in data_dict:
                    weekly_net_inc_favorite_count = data_dict['favorite_count']
                else:
                    weekly_net_inc_favorite_count = 0
                if 'comment_count' in data_dict:
                    weekly_net_inc_comment_count = data_dict['comment_count']
                else:
                    weekly_net_inc_comment_count = 0
                sub_result = {'weekly_cal_base': weekly_cal_base,
                              'weekly_net_inc_play_count': weekly_net_inc_play_count,
                              'weekly_net_inc_favorite_count': weekly_net_inc_favorite_count,
                              'weekly_net_inc_comment_count': weekly_net_inc_comment_count}
        else:
            sub_result = sub_pre_week(curr_doc_type_weekly, data_dict)
        return sub_result
    else:
        return None


def cal_and_bulk_write_with_weekly_net_inc(dict_Lst, doc_type_weekly,
                                           logger_name, thread_id,
                                           index_weekly='short-video-weekly',
                                          ):
    function_name = 'cal_and_bulk_write_with_weekly_net_inc'
    # define logger
    loggerName = '%s.cal_and_bulk_write' % logger_name
    loggerii = logging.getLogger(loggerName)

    if dict_Lst != []:
        loggerii.info('[%d] Calculating weekly net increase values one dict by one.'
                      % (thread_id))
        result_Lst = []
        input_list_size = len(dict_Lst)
        none_res_counter = 0
        for line_d in dict_Lst:
            sub_result = cal_weekly_net_inc(doc_type_weekly, line_d)
            if sub_result != None:
                line_d.update(sub_result)
                result_Lst.append(line_d)
            else:
                none_res_counter += 1
        loggerii.info('[%d] Calculate done, with %d/%d None returns, '
                      'get %d effective results.'
                      % (thread_id, none_res_counter, input_list_size,
                         len(result_Lst)))
        if result_Lst != []:
            try:
                bulk_write_resp = bulk_write_short_video(result_Lst, index=index_weekly,
                                                         doc_type=doc_type_weekly,
                                                         doc_id_type='all-time-url',
                                                         client=es)
                result_Lst.clear()
                return bulk_write_resp
            except:
                loggerii.info('[%d] Got exceptions when bulk write, function %s returns None.'
                              % (thread_id, function_name))
                return None
        else:
            loggerii.info('[%d] Got zero effective results, function %s returns None.'
                          % (thread_id, function_name))
            return None
    else:
        loggerii.info('[%d] Empty input data list, function %s returns None.'
                      % (thread_id, function_name))
        return None

def cal_weekly_net_inc_with_doc_type_name(doc_type_name,
                                          logger_name,
                                          thread_id,
                                          search_body=None,
                                          index_weekly='short-video-weekly',
                                         ):
    # define logger
    loggerName = '%s.in_thread' % logger_name
    loggeri = logging.getLogger(loggerName)

    if search_body is None:
        loggeri.info('[%d] Calculate weekly net increase values for doc_type: %s '
                     'in index: %s.' % (thread_id, doc_type_name, index_weekly))
        find_all_bd = {
            "query": {
                "match_all": {}
            }
        }
    else:
        find_all_bd = copy.deepcopy(search_body)

    search_resp = es.search(index=index_weekly, doc_type=doc_type_name,
                            body=find_all_bd, size=0, request_timeout=100)
    total_hits = search_resp['hits']['total']
    if total_hits > 0:
        loggeri.info('[%d] Total hits: %d with search_body: %s'
                     % (thread_id, total_hits, find_all_bd))
        scan_resp = scan(client=es, index=index_weekly, doc_type=doc_type_name,
                         query=find_all_bd, request_timeout=300)
        line_counter = 0
        ori_data_Lst = []
        for line in scan_resp:
            line_counter += 1
            line_d = line['_source']
            ori_data_Lst.append(line_d)
            if line_counter%1000 == 0 or line_counter == total_hits:
                if thread_id is not None:
                    loggeri.info('[%d] Got lines %d/%d [%.2f%%] to calculate weekly net '
                                 'increase values'
                                 % (thread_id, line_counter, total_hits,
                                    line_counter/total_hits*100))
                bulk_resp = cal_and_bulk_write_with_weekly_net_inc(ori_data_Lst,
                                                                   doc_type_name,
                                                                   loggerName,
                                                                   thread_id,
                                                                   index_weekly=index_weekly,
                                                                  )
                err_msg = find_error_from_bulk_resp(bulk_resp)
                if err_msg is not None:
                    loggeri.info('[%d] Error when bulk write: %s '
                                 % (thread_id, err_msg))
                ori_data_Lst.clear()
    else:
        loggeri.info('[%d] Got zero hits, thread exits.'
                     % (thread_id))
    loggeri.info('[%d] Thread exits.' % (thread_id))


def divid_by_release_time(
        doc_type_name, threads_num=5,
        index_weekly='short-video-weekly',
        query_term=None):
    """
    argument query_term must be a sub dict below keyword 'bool'
    in Elasticsearch search body, such as {'filter' : {'range': {...}}},
    or {'must_not': [...]}
    """

    find_all_bd = {
        "query": {
            "match_all": {}
        },
        "size": 0,
        "aggs": {
            "date_distr": {
                "date_histogram": {
                    "field": "release_time",
                    "interval": "day",
                    "time_zone": "Asia/Shanghai"
                }
            }
        }
    }
    # allow to modify query body
    if query_term is not None:
        find_all_bd['query'].pop('match_all', None)
        find_all_bd['query'].update({'bool': {}})
        find_all_bd['query']['bool'].update(query_term)
        print('find_all_bd is : ',find_all_bd)
    else:
        pass
    search_resp = es.search(index=index_weekly, doc_type=doc_type_name,
                            body=find_all_bd, size=0, request_timeout=100)
    total_hits = search_resp['hits']['total']
    data_distr_by_release_time_Lst = search_resp[
        'aggregations']['date_distr']['buckets']
    if data_distr_by_release_time_Lst == []:
        print('Got empty result from aggregations for doc_type_name: %s, %s'
              % (doc_type_name, datetime.datetime.now()))
        return None
    else:
        average_data_num = total_hits // threads_num
        release_time_range_Lst = []
        data_counter_collector = 0
        distr_idx = 0
        for distr_by_release_dict in data_distr_by_release_time_Lst:
            data_num_each_day = distr_by_release_dict['doc_count']
            data_counter_collector += data_num_each_day
            if (data_counter_collector > average_data_num*0.9
                    or distr_idx == len(data_distr_by_release_time_Lst)-1):
                release_time_range_dict = {'start':None, 'end':None,
                                           'end_idx': distr_idx,
                                           'data_num': data_counter_collector}
                release_time_range_Lst.append(release_time_range_dict)
                data_counter_collector = 0
            distr_idx += 1

        # fillup the start timestamp and the end timestamp
        start_side_cache = data_distr_by_release_time_Lst[0]['key']
        for range_seg in release_time_range_Lst:
            if range_seg['end_idx']+1 > len(data_distr_by_release_time_Lst)-1:
                range_seg['end'] = int(data_distr_by_release_time_Lst[range_seg['end_idx']]['key']
                                       + 24*3600*1e3)
            else:
                range_seg['end'] = data_distr_by_release_time_Lst[range_seg['end_idx']+1]['key']
            range_seg['start'] = start_side_cache
            start_side_cache = range_seg['end']

        # in case the splitted range segments are longer or shorter than threads_num
        if len(release_time_range_Lst) != threads_num:
            threads_num = len(release_time_range_Lst)
        print('actually the thread number is: %d'
              % threads_num)
        print('release_time_range_Lst:\n%s'
              % release_time_range_Lst)

        return (threads_num, release_time_range_Lst, total_hits)


def cal_weekly_net_inc_with_doc_type_name_multi_thread(
        doc_type_name, threads_num=5,
        index_weekly='short-video-weekly',
        query_term=None):
    """
    argument query_term must be a sub dict below keyword 'bool'
    in Elasticsearch search body, such as {'filter' : {'range': {...}}},
    or {'must_not': [...]}
    """

    # define logger
    loggerName = 'cal_weekly_net_inc'
    logger = logging.getLogger(loggerName)
    logger.setLevel(logging.DEBUG)
    # create handler
    path = '/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/'
    log_fn = ('cal_weekly_net_inc_for_%s_on_%s_log'
              % (doc_type_name, datetime.datetime.now().isoformat().replace(':', '-')))
    fh = logging.FileHandler(path+log_fn)
    fh.setLevel(logging.INFO)
    # create formatter and add it to the handler
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    # add handler to logger
    logger.addHandler(fh)

    logger.info('************ log starts')
    week_info = parse_week_param(doc_type_name)
    first_day_in_weekD = day_by_week_info(week_info['week_year'],
                                          week_info['week_no'], 1,
                                          week_info['week_day_start'])
    first_day_in_weekT = datetime.datetime(first_day_in_weekD.year,
                                           first_day_in_weekD.month,
                                           first_day_in_weekD.day)
    releaser_time_start_T_zyj = first_day_in_weekT - datetime.timedelta(14)
    releaser_time_end_T_zyj = first_day_in_weekT + datetime.timedelta(10)
    releaser_time_start_ts_zyi = int(releaser_time_start_T_zyj.timestamp()*1e3)
    releaser_time_end_ts_zyj = int(releaser_time_end_T_zyj.timestamp()*1e3)
    print(releaser_time_start_ts_zyi,releaser_time_end_ts_zyj )
    if query_term is None:
        query_term = {
                    "filter": [
                        {"range": {"release_time": {"gte": releaser_time_start_ts_zyi,"lt":releaser_time_end_ts_zyj}}}
                                ]
                    }
    get_divs = divid_by_release_time(doc_type_name, threads_num,
                                     query_term=query_term)
    if get_divs is None:
        logger.info('Find zero data to be calculated, program exits.')
    else:
        threads_num = get_divs[0]
        release_time_range_Lst = get_divs[1]
        total_hits = get_divs[2]
        logger.info('There are %d lines in %s, '
                    'will start %d threads to calculate.'
                    %(total_hits, doc_type_name, threads_num))
        print('release_time_range_Lst is : ', release_time_range_Lst)
        waitfor = []
        for i in range(0, threads_num):
            release_time_ts_start = release_time_range_Lst[i]['start']
            release_time_ts_end = release_time_range_Lst[i]['end']
            print(release_time_ts_start, release_time_ts_end)
            search_bd_in_thread = {
                "query": {
                    "bool": {
                        "filter": [
                            {"range": {"release_time": {
                                "gte": release_time_ts_start,
                                "lt": release_time_ts_end}}},
                        ],
                    }
                },
            }
            # allow to modify search body
#            if query_term is not None:
#                search_bd_in_thread['query']['bool'].update(query_term)
#            else:
#                pass
            if  release_time_ts_start <= 0:
                release_time_ts_start = 1
            release_time_start = datetime.datetime.fromtimestamp(release_time_ts_start/1e3)
            release_time_end = datetime.datetime.fromtimestamp(release_time_ts_end/1e3)
            logger.info('Starting thread: %d, '
                        'release_time range: [%s, %s)'
                        % (i, release_time_start, release_time_end))
            threadi = threading.Thread(target=cal_weekly_net_inc_with_doc_type_name,
                                       kwargs={'doc_type_name': doc_type_name,
                                               'logger_name': loggerName,
                                               'search_body': search_bd_in_thread,
                                               'thread_id': i,
                                               'index_weekly': index_weekly})
            waitfor.append(threadi)
            threadi.start()
        for td in waitfor:
            td.join()
        logger.info('Main thread exits.')



## test
#if __name__ == '__main__':
#    doc_type_name = 'daily-url-2018_w25_s1'
#    index = 'short-video-weekly'
#    cal_weekly_net_inc_with_doc_type_name_multi_thread(doc_type_name, index_weekly=index)