# -*- coding:utf-8 -*-
# @Time : 2019/10/28 15:48 
# @Author : litao
import elasticsearch
import datetime
import pandas as pd
from elasticsearch.helpers import scan
from func_cal_doc_id import cal_doc_id
from write_data_into_es.func_get_releaser_id import *
import copy, os, time, json
import zipfile
import redis
from maintenance.send_email_with_file_auto_task import write_email_task_to_redis

hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
from redis.sentinel import Sentinel
sentinel = Sentinel([('192.168.17.65', 26379),
                     ('192.168.17.66', 26379),
                     ('192.168.17.67', 26379)
             ],socket_timeout=0.5)
# 查看master节点
master = sentinel.discover_master('ida_redis_master')
# 查看slave 节点
slave = sentinel.discover_slaves('ida_redis_master')
# 连接数据库
rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=2, decode_responses=True)
# es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
# rds = redis.StrictRedis(host='192.168.17.60', port=6379, db=2, decode_responses=True)


def get_project_releaser(project_name):
    res_list = []
    search_body = {
            "query": {
                    "match": {
                            "project_tags.keyword": project_name
                    }
            }
    }
    res_scan = scan(client=es, query=search_body, index="target_releasers")
    for res in res_scan:
        res_list.append(res["_source"])
    return res_list


def zipDir(dirpath, outFullName):
    """
    压缩指定文件夹
    :param dirpath: 目标文件夹路径
    :param outFullName: 压缩文件保存路径+xxxx.zip
    :return: 无
    """
    remove_list = []
    if ".zip" not in outFullName:
        outFullName += ".zip"
    file_path = os.path.split(outFullName)
    is_path = os.path.exists(file_path[0])
    if not is_path:
        os.mkdir(file_path[0])
        time.sleep(0.5)
    zip = zipfile.ZipFile(outFullName, "w", zipfile.ZIP_DEFLATED)
    for path, dirnames, filenames in os.walk(dirpath):
        # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
        fpath = path.replace(dirpath, '')
        for filename in filenames:
            zip.write(os.path.join(path, filename), os.path.join(fpath, filename))
            remove_list.append(os.path.join(path, filename))
    zip.close()

    for p in remove_list:
        os.remove(p)
    time.sleep(0.5)
    os.removedirs(dirpath)
    return outFullName


def week_num(year=None, cycle=None, cycle_num=None, compare_type=None):
    now = datetime.datetime.now()
    now_canlendar = now.isocalendar()
    if not cycle_num:
        week_canlendar = now_canlendar
    else:
        week_canlendar = (now.year, cycle_num + 1, 0)
    year = week_canlendar[0]
    this_week = week_canlendar[1] - 1
    if this_week == 0:
        last_year = year - 1
        this_week = 1
    else:
        last_year = year
    if this_week == 1:
        last_week = "52"
    else:
        last_week = this_week - 1
    today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day)
    # today = datetime.datetime(year=2018, month=12, day=25)
    first_day_in_week = today - datetime.timedelta(
            days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1] + 1))
    fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3)
    last_day_in_week = first_day_in_week + datetime.timedelta(days=7)
    last_day_ts = int(last_day_in_week.timestamp() * 1e3)

    this_week_index = 'short-video-weekly'
    this_week_doc = 'daily-url-' + str(year) + '_w' + format(this_week, '>02d') + '_s1'
    last_week_index = 'releaser-weekly-short-video'
    last_week_doc = 'doc'
    if compare_type == "new_released":
        this_week_index = last_week_index
        this_week_doc = last_week_doc
    return this_week_index, this_week_doc, last_week_index, last_week_doc, fisrt_day_ts, last_day_ts, this_week, last_week, last_year


def month_num(year=None, cycle=None, cycle_num=None, compare_type=None):
    now = datetime.datetime.now()
    if not year:
        if cycle_num <= now.month:
            year = now.year
        else:
            year = now.year - 1
    if not cycle_num:
        this_mon = now.month - 1 if now.month != 1 else 12
        last_mon = this_mon - 1 if this_mon > 1 else this_mon - 1 + 12
        if last_mon == 12:
            last_year = year - 1
        else:
            last_year = year
    else:
        this_mon = cycle_num
        last_mon = cycle_num - 1 if this_mon > 1 else this_mon - 1 + 12
        if last_mon == 12:
            last_year = year - 1
        else:
            last_year = year
    first_day_ts = int(datetime.datetime(year=year, month=this_mon, day=1).timestamp() * 1e3)
    if this_mon == 12:
        next_year = last_year + 1
        next_month = 1
    else:
        next_year = year
        next_month = this_mon + 1
    last_day_ts = int(datetime.datetime(year=next_year, month=next_month, day=1).timestamp() * 1e3)
    this_mon_index = "short-video-production-%s" % year
    this_mon_doc = "daily-url-%s" % (
            datetime.datetime(year=next_year, month=next_month, day=1) + datetime.timedelta(days=-1)).strftime(
            "%Y-%m-%d")
    last_mon_index = "releaser"
    last_mon_doc = "releasers"
    if compare_type == "new_released":
        this_mon_index = last_mon_index
        this_mon_doc = last_mon_doc
    return this_mon_index, this_mon_doc, last_mon_index, last_mon_doc, first_day_ts, last_day_ts, this_mon, last_mon, last_year


def quarter_num(year=None, cycle=None, cycle_num=None, compare_type=None):
    now = datetime.datetime.now()
    if not cycle_num:
        this_quarter = int(now.month / 3) + 1
    else:
        this_quarter = cycle_num
    last_quarter = this_quarter - 1 if cycle_num > 1 else 4
    if last_quarter == 4:
        last_year = year - 1
    else:
        last_year = year
    first_day_ts = int(datetime.datetime(year=year, month=(this_quarter - 1) * 3 + 1, day=1).timestamp() * 1e3)
    last_day_ts = int(datetime.datetime(year=year, month=this_quarter * 3 + 1, day=1).timestamp() * 1e3)
    this_quarter_index = "short-video-quarter-%s" % year
    this_quarter_doc = "daily-url-2019-Q%s" % this_quarter
    last_quarter_index = "releaser"
    last_quarter_doc = "releasers-%s-Q%s" % (last_year, last_quarter)
    if compare_type == "new_released":
        this_quarter_index = last_quarter_index
        this_quarter_doc = last_quarter_doc
    return this_quarter_index, this_quarter_doc, last_quarter_index, last_quarter_doc, first_day_ts, last_day_ts, this_quarter, last_quarter, last_year


def create_index(year=None, cycle="week", cycle_num=None, compare_type=None, project_type="short_video",
                 data_type="detail", **kwargs):
    """

    :param year: 年份 可为空
    :param cycle: 周期 week/month/quarter/year/("all-time" 用于微信导出自定义数据)
    :param cycle_num: 周期 1/2/3/4/5/6
    :param compare_type:默认为空 可不填
    :param project_type:数据平台 short_video/weibo/weixin/all-time
    :param data_type:数据类型 detail/summary
    :param kwargs: 自定义索引和切片 参数如下
    :return:
    """
    first_day_ts = ""
    last_day_ts = ""
    this_cycle = ""
    try:
        if kwargs:
            this_cycle_index = kwargs.get("this_cycle_index")
            this_cycle_doc = kwargs.get("this_cycle_doc")
            last_cycle_index = kwargs.get("last_cycle_index")
            last_cycle_doc = kwargs.get("last_cycle_doc")
            first_day_ts = kwargs.get("first_day_ts")
            last_day_ts = kwargs.get("last_day_ts")
        else:
            this_cycle_index = None
    except:
        this_cycle_index = None
    one_line = {}
    if not this_cycle_index:
        if cycle == "week":
            this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle, last_cycle, last_year = week_num(
                    year, cycle, cycle_num, compare_type)
        elif cycle == "month":
            this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle, last_cycle, last_year = month_num(
                    year, cycle, cycle_num, compare_type)
        elif cycle == "quarter":
            this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle, last_cycle, last_year = quarter_num(
                    year, cycle, cycle_num, compare_type)
        elif cycle == "year":
            pass

    # print(this_cycle_index, this_cycle_doc, last_cycle_index, last_cycle_doc, fisrt_day_ts, last_day_ts, this_cycle,
    #      last_cycle, last_year)
    if data_type == "detail":
        if project_type == "short_video":
            return this_cycle_index, this_cycle_doc, first_day_ts, last_day_ts, this_cycle
        elif project_type == "weibo":
            return "ronghe_weibo_monthly", "doc", first_day_ts, last_day_ts, this_cycle
        elif project_type == "weixin":
            return "ronghe_weixin", "doc", first_day_ts, last_day_ts, "ronghe_weixin"
        elif project_type == "all-time":
            return "short-video-all-time-url", "all-time-url", None, None, None
        elif project_type == "target_releasers":
            return "target_releasers", "doc", None, None, None
        elif project_type == "purchased_releasers":
            return "purchased_releasers", "doc", None, None, None
    else:
        if project_type == "short_video":
            return last_cycle_index, last_cycle_doc, first_day_ts, last_day_ts, this_cycle
        elif project_type == "weibo":
            return "ronghe_weibo_releaser", "doc", first_day_ts, last_day_ts, this_cycle
        elif project_type == "weixin":
            if cycle == "all-time":
                return "ronghe_weixin", "doc", first_day_ts, last_day_ts, "ronghe_weixin"
            return "ronghe_weixin_releaser", "doc", first_day_ts, last_day_ts, this_cycle
        elif project_type == "all-time":
            return "short-video-all-time-url", "all-time-url", None, None, None
        elif project_type == "purchased_releasers":
            return "purchased_releasers", "doc", None, None, None

def create_search_body(year=None, cycle="week", cycle_num=None, project_type="short_video", data_type="detail",
                       start_ts=None, end_ts=None, releaser=None, platform=None, releaser_id_str=None, **kwargs):
    if data_type == "detail":
        search_body = {
                "query": {
                        "bool": {
                                "filter": [
                                        {"term": {"releaser_id_str": releaser_id_str}},
                                        {"range": {"release_time": {"gte": start_ts, "lt": end_ts}}},
                                        {"range": {"duration": {"lte": 600}}}
                                ]
                        }
                }
        }
        if project_type == "weibo":
            search_body["query"]["bool"]["filter"].pop(0)
            search_body["query"]["bool"]["filter"].pop(1)
            search_body["query"]["bool"]["filter"].append({"term": {"UID.keyword": releaser_id_str}})
            if cycle == "month":
                search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}})
                search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}})
        elif project_type == "target_releasers":
            search_body = kwargs.get("extra_dic").get("search_body")

        elif project_type == "weixin":
            search_body["query"]["bool"]["filter"].pop(2)
            search_body["query"]["bool"]["filter"].pop(0)
            search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str.keyword": releaser_id_str}})
        # elif cycle == "month" and project_type == "short_video":
        #     search_body["query"]["bool"]["filter"].pop(0)
        #     search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str.keyword": releaser_id_str}})
    else:
        search_body = {
                "query": {
                        "bool": {
                                "filter": [
                                        {"term": {"releaser_id_str.keyword": releaser_id_str}}
                                ]
                        }
                }
        }
        if project_type == "weibo":
            search_body["query"]["bool"]["filter"].pop(0)
            search_body["query"]["bool"]["filter"].append({"term": {"UID.keyword": releaser_id_str}})
            if cycle == "month":
                search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}})
                search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}})
        elif project_type == "short_video":
            if cycle == "month":
                search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}})
                search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}})
            elif cycle == "week":
                search_body["query"]["bool"]["filter"].append({"term": {"data_week_num": cycle_num}})
                search_body["query"]["bool"]["filter"].append({"term": {"data_week_year": year}})
        elif project_type == "weixin":
            if cycle == "month":
                search_body["query"]["bool"]["filter"].append({"term": {"data_month": cycle_num}})
                search_body["query"]["bool"]["filter"].append({"term": {"data_year": year}})
            elif cycle == "all-time":
                search_body["query"]["bool"]["filter"].append(
                        {"range": {"release_time": {"gte": start_ts, "lt": end_ts}}})
                search_body["aggs"] = {
                        "sum_play": {
                                "sum": {
                                        "field": "play_count"
                                }
                        },
                        "sum_favorite": {
                                "sum": {
                                        "field": "favorite_count"
                                }
                        },
                        "sum_comment": {
                                "sum": {
                                        "field": "comment_count"
                                }
                        },
                        "sum_repost": {
                                "sum": {
                                        "field": "repost_count"
                                }
                        }

                }
        elif project_type == "all-time":
            search_body["query"]["bool"]["filter"].pop(0)
            search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str": releaser_id_str}})
            search_body["query"]["bool"]["filter"].append({"range": {"duration": {"lte": 600}}})
            search_body["query"]["bool"]["filter"].append({"range": {"release_time": {"gte": start_ts, "lt": end_ts}}})
            search_body["aggs"] = {
                    "sum_play": {
                            "sum": {
                                    "field": "play_count"
                            }
                    },
                    "sum_favorite": {
                            "sum": {
                                    "field": "favorite_count"
                            }
                    },
                    "sum_comment": {
                            "sum": {
                                    "field": "comment_count"
                            }
                    },
                    "sum_repost": {
                            "sum": {
                                    "field": "repost_count"
                            }
                    }

            }
        elif project_type == "purchased_releasers":
            search_body["query"]["bool"]["filter"].pop(0)
            search_body["query"]["bool"]["filter"].append({"term": {"releaser_id_str": releaser_id_str}})
    print(search_body)
    return search_body


def detail_to_dic(search_res, data_type="short_video"):
    if data_type == "target_releasers":
        for one_scan in search_res:
            project_tags = one_scan["_source"].get("project_tags")
            department_tags = one_scan["_source"].get("department_tags")
            project_type = ""
            department_type = ""
            if project_tags:
                for i in project_tags:
                    if not project_type:
                        project_type += i
                    else:
                        project_type += "/" + i
            if department_tags:
                for department in department_tags:
                    if not department_type:
                        department_type += department
                    else:
                        department_type += "/" + department
            try:
                re_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("post_time") / 1000)
                time_str = re_time.strftime('%Y-%m-%d %H:%M:%S')
            except:
                time_str = ""

            line_dict = ({"id": one_scan['_id'],
                          'releaser': one_scan['_source'].get("releaser"),
                          'platform': one_scan['_source'].get("platform"),
                          "post_time": time_str,
                          "releaserUrl": one_scan['_source'].get("releaserUrl"),
                          "releaser_id_str": one_scan['_source'].get("releaser_id_str"),
                          "project_type": project_type,
                          "department_tags": department_type
                          })
            if not line_dict:
                print(line_dict)
            yield line_dict

    else:
        if data_type != "weibo":
            for one_scan in search_res:
                try:
                    re_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("release_time") / 1000)
                    time_str = re_time.strftime('%Y-%m-%d %H:%M:%S')
                    fc_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("fetch_time") / 1000)
                    fc_str = fc_time.strftime('%Y-%m-%d %H:%M:%S')
                except:
                    time_str = ""
                    fc_str = ""

                try:
                    title = one_scan['_source'].get("title")
                    title = title.replace("\n", "")
                    title = title.replace("\r", "")
                except:
                    title = ""
                line_dict = ({"id": one_scan['_id'],
                              'releaser': one_scan['_source'].get("releaser"),
                              'title': title,
                              'platform': one_scan['_source'].get("platform"),
                              'url': one_scan['_source'].get("url"),
                              "release_time": time_str,
                              "releaserUrl": one_scan['_source'].get("releaserUrl"),
                              "releaser_id_str": one_scan['_source'].get("releaser_id_str"),
                              "fetch_time": fc_str,
                              "play_count": one_scan['_source'].get("play_count"),
                              "favorite_count": one_scan['_source'].get("favorite_count"),
                              "comment_count": one_scan['_source'].get("comment_count"),
                              "repost_count": one_scan['_source'].get("repost_count"),
                              "duration": one_scan['_source'].get("duration"),
                              "top": one_scan['_source'].get("top")
                              })
                yield line_dict
        else:
            for one_scan in search_res:
                try:
                    re_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("release_time") / 1000)
                    time_str = re_time.strftime('%Y-%m-%d %H:%M:%S')
                    fc_time = datetime.datetime.fromtimestamp(one_scan['_source'].get("fetch_time") / 1000)
                    fc_str = fc_time.strftime('%Y-%m-%d %H:%M:%S')
                except:
                    time_str = ""
                    fc_str = ""
                try:
                    title = one_scan['_source'].get("wb_text").replace("\r", "").replace("\n", "")
                except:
                    title = ""
                line_dict = ({
                        'ID': one_scan['_source'].get("wb_bowen_id"),
                        "UID": str(one_scan['_source']['UID']),
                        '微博昵称': one_scan['_source'].get("wb_name"),
                        '微博内容': title,
                        'platform': one_scan['_source'].get("platform"),
                        '博文地址': one_scan['_source'].get("wb_bowen_address"),
                        "发布日期": time_str,
                        "抓取时间": fc_str,
                        # "play_count": one_scan['_source'].get("play_count"),
                        "点赞量": one_scan['_source'].get("favorite_count"),
                        "评论量": one_scan['_source'].get("comment_count"),
                        "转发量": one_scan['_source'].get("repost_count"),
                        # "duration": one_scan['_source'].get("duration"),
                        "配图连接": one_scan['_source'].get("video_address"),
                        "博文配图": one_scan['_source'].get("wb_pic"),
                        "博文长度": one_scan['_source'].get("wb_bowen_length"),
                        "短链标题": one_scan['_source'].get("wb_type_text"),
                        "短链播放量-视频": one_scan['_source'].get("wb_video_number"),
                        "短链地址-视频": one_scan['_source'].get("video_address"),
                        "短链地址": one_scan['_source'].get("wb_type_address"),
                        "是否原创": one_scan['_source'].get("wb_father_UID"),
                        "原创账户": one_scan['_source'].get("wb_father_bowen_id"),
                        "博文类型": one_scan['_source'].get("wb_bowen_type"),

                })
                yield line_dict


def summary_to_dic(search_res, fans_count=None, fans_date=None, releaser_id_str=None, platform=None,start_ts=None, end_ts=None, **kwargs):
    extra_dic = kwargs.get("extra_dic")
    if search_res:
        line_dict = {}
        video_num = search_res['hits']['total']
        play_count_sum = search_res['aggregations']['sum_play']['value']
        favorite_count_sum = search_res['aggregations']['sum_favorite']['value']
        comment_count_sum = search_res['aggregations']['sum_comment']['value']
        repost_count_sum = search_res['aggregations']['sum_repost']['value']
        month_str = ""
        if start_ts:
            re_time = datetime.datetime.fromtimestamp(start_ts/ 1000)
            month_str += re_time.strftime('%Y-%m-%d') + "至"
        if end_ts:
            re_time = datetime.datetime.fromtimestamp(end_ts / 1000)
            month_str += re_time.strftime('%Y-%m-%d')
        line_dict.update({'releaser': "",
                          'platform': platform,
                          'releaserUrl': "",
                          'video_num': video_num,
                          'play_count_sum': play_count_sum,
                          'favorite_count_sum': favorite_count_sum,
                          'comment_count_sum': comment_count_sum,
                          'repost_count_sum': repost_count_sum,
                          "data_month": month_str,
                          "this_fans": fans_count,
                          "production_org_category": "",
                          "tv_station": "",
                          "releaser_id_str": releaser_id_str,
                          "fans_date": fans_date
                          })
        line_dict.update(extra_dic)
        return line_dict


def purchased_releaser_to_dic(search_res, releaser_is_str=None, line_dict=None,releaser=None,platform=None,releaserUrl=None,**kwargs):
    count_has = 0
    line_dict_new = copy.deepcopy(line_dict)
    for res in search_res:
        count_has = 1
        departments = res["_source"].get("departments")
        department_str = ""
        if departments:
            for department in departments:
                if not department_str:
                    department_str += department
                else:
                    department_str += "/" + department

        try:
            re_time = datetime.datetime.fromtimestamp(res['_source'].get("end_purchase_time") / 1000)
            time_str_end = re_time.strftime('%Y-%m-%d %H:%M:%S')
        except:
            time_str_end = ""
        try:
            re_time = datetime.datetime.fromtimestamp(res['_source'].get("start_purchase_time") / 1000)
            time_str_start = re_time.strftime('%Y-%m-%d %H:%M:%S')
        except:
            time_str_start = ""
        line_dict_new = {
                      'releaser': releaser,
                      'platform': platform,
                      "start_purchase_time": time_str_start,
                      "end_purchase_time": time_str_end,
                      "releaserUrl": releaserUrl,
                      "releaser_id_str": releaser_is_str,
                      "is_purchased" : 1,
                      "departments": department_str,
                      "is_purchased_str": res["_source"].get("is_purchased_str")
                      }
        return line_dict_new
    if not count_has:
        line_dict_new = {
                      'releaser': releaser,
                      'platform': platform,
                      "start_purchase_time": "",
                      "end_purchase_time": "",
                      "releaserUrl": releaserUrl,
                      "releaser_id_str": releaser_is_str,
                      "is_purchased": "",
                      "departments": "",
                      "is_purchased_str": ""
                      }
        return line_dict_new


def releaser_to_dic(search_res, releaser_is_str=None, line_dict=None, fans=None, fc_str=None, **kwargs):
    count_has = 0
    line_dict_list = []
    for res in search_res:
        line_dict_new = copy.deepcopy(line_dict)
        count_has = 1
        print(res)
        video_num = res['_source']['video_num']
        play_count_sum = res['_source']['play_count_sum']
        favorite_count_sum = res['_source']['favorite_count_sum']
        comment_count_sum = res['_source']['comment_count_sum']
        repost_count_sum = res['_source'].get('repost_count_sum')
        ranking_by_play_count_sum = res['_source'].get('ranking_by_play_count_sum')
        stats_type = res['_source']['stats_type']
        _id = res['_id']
        line_dict_new.update({
                'video_num': video_num,
                'play_count_sum': play_count_sum,
                'favorite_count_sum': favorite_count_sum,
                'comment_count_sum': comment_count_sum,
                "stats_type": stats_type,
                "repost_count_sum": repost_count_sum,
                "ranking_by_play_count_sum": ranking_by_play_count_sum,
                "_id": _id,
                "releaser_followers_count": fans,
                "this_fans": fc_str,
                "releaser_id_str": releaser_is_str
        })
        yield line_dict_new
    if not count_has:
        new_dic = {
                'video_num': 0,
                'play_count_sum': 0,
                'favorite_count_sum': 0,
                'comment_count_sum': 0,
                "stats_type": "new_released",
                "repost_count_sum": 0,
                "releaser_id_str": releaser_is_str,
        }
        new_dic.update(line_dict)
        ob_dic = {
                'video_num': 0,
                'play_count_sum': 0,
                'favorite_count_sum': 0,
                'comment_count_sum': 0,
                "stats_type": "observed",
                "repost_count_sum": 0,
                "releaser_id_str": releaser_is_str,
                "releaser_followers_count": fans,
                "this_fans": fc_str,
        }
        ob_dic.update(line_dict)
        line_dict_list.append(new_dic)
        line_dict_list.append(ob_dic)
        for dic in line_dict_list:
            yield dic


def get_follwer_num(releaser_id_str=None, ts=None):
    search_short_body_fans = {
            "query": {
                    "bool": {
                            "filter": [
                                    {"term": {"releaser_id_str.keyword": releaser_id_str}},
                                    {"range": {"fetch_time": {"gte": ts}}}
                            ]
                    }
            }, "sort": [
                    {
                            "timestamp": {
                                    "order": "asc"
                            }
                    }
            ]
    }

    try:
        fans_count = es.search(index="releaser_fans", doc_type="doc", body=search_short_body_fans)
        fans = fans_count["hits"]["hits"][0]["_source"]["releaser_followers_count"]
        fans_date = datetime.datetime.fromtimestamp(
                fans_count["hits"]["hits"][0]["_source"]["fetch_time"] / 1000)
        fc_str = fans_date.strftime('%Y-%m-%d %H:%M:%S')
    except:
        fans = None
        fc_str = None
    return fans, fc_str


def func_es_to_csv(file_path, read_index, read_doc, year=None, cycle="week", cycle_num=None, project_type="short_video",
                   data_type="detail",
                   start_ts=None, end_ts=None, save_file_path="", file_task_name=None, this_cycle=None, **kwargs):
    all_dic = {
            "toutiao": [],
            "抖音": [],
            "腾讯视频": [],
            "腾讯新闻": [],
            "haokan": [],
            "miaopai": [],
            "网易新闻": [],
            "new_tudou": [],
            "pearvideo": [],
            "kwai": [],
            "weibo": [],
            "weixin": [],
            "tencent_combine": [],
            "purchased_releasers": [],
    }
    all_plant = []
    if not cycle_num:
        cycle_num = this_cycle

    if file_path:
        try:
            f = open(file_path, 'r', encoding="gb18030")
            head = f.readline()
            head_list = head.strip().split(',')
        except:
            f = file_path
        for line, i in enumerate(f):
            if type(file_path) != list:
                try:
                    line_list = i.strip().split(',')
                    line_dict = dict(zip(head_list, line_list))
                except:
                    line_dict = f
            else:
                line_dict = i
            releaser_id_str = line_dict.get("releaser_id_str")
            platform = line_dict.get("platform")
            releaserUrl = line_dict.get("releaserUrl")
            releaser = line_dict.get("releaser")
            if not releaser_id_str:
                releaser_id_str = line_dict.get("releaser_id")
            if not releaser_id_str:
                releaser_id = get_releaser_id(platform=platform, releaserUrl=releaserUrl)
                if releaser_id:
                    if platform != "weixin" and platform != "weibo":
                        releaser_id_str = platform + '_' + releaser_id
                    else:
                        releaser_id_str = releaser_id
                else:
                    releaser_id_str = ""
            search_body = create_search_body(year=year, cycle=cycle, cycle_num=cycle_num, project_type=project_type,
                                             data_type=data_type, start_ts=start_ts, end_ts=end_ts, platform=platform,
                                             releaser=releaser, releaser_id_str=releaser_id_str,**kwargs)
            print(search_body)
            search_res = scan(es, query=search_body, index=read_index, doc_type=read_doc,preserve_order=True)

            # file_name = "%s_%s_%s_%s_%s" % (project_type, year, cycle, cycle_num, data_type)
            if data_type == "detail":
                for data in detail_to_dic(search_res, data_type=project_type):
                    all_plant.append(data)
                    all_dic[line_dict["platform"]].append(data)
            else:
                if project_type == "all-time" or cycle == "all-time":
                    fans, fc_str = get_follwer_num(releaser_id_str=releaser_id_str, ts=end_ts)
                    search_res = es.search(body=search_body, index=read_index, doc_type=read_doc)
                    res_dic = summary_to_dic(search_res, fans_count=fans, fans_date=fc_str, releaser=releaser,
                                             releaser_id_str=releaser_id_str, releaserUrl=releaserUrl,
                                             platform=platform, extra_dic=line_dict,start_ts=start_ts, end_ts=end_ts)
                    all_plant.append(res_dic)
                elif project_type == "purchased_releasers":
                    res_dic = purchased_releaser_to_dic(search_res,releaser=releaser,platform=platform,releaserUrl=releaserUrl,releaser_is_str=releaser_id_str,line_dict=line_dict)
                    all_plant.append(res_dic)
                else:
                    fans, fc_str = get_follwer_num(releaser_id_str=releaser_id_str, ts=end_ts)
                    for dic in releaser_to_dic(search_res, line_dict=line_dict, fans=fans, fc_str=fc_str,
                                               releaser_is_str=releaser_id_str):
                        all_plant.append(dic)
        if data_type == "detail":
            if project_type != "weibo":
                columns = ["id", 'releaser', 'platform', 'title', "releaserUrl", 'url', 'releaser_id_str',
                           "release_time",
                           "fetch_time",
                           "play_count",
                           "favorite_count",
                           "comment_count", "repost_count", "duration", "top"
                           ]
            else:
                columns = ['ID', "UID",
                           '微博昵称',
                           '微博内容',
                           'platform',
                           '博文地址',
                           "发布日期",
                           "抓取时间",
                           "点赞量",
                           "评论量",
                           "转发量",
                           "配图连接",
                           "博文配图",
                           "博文长度",
                           "短链标题",
                           "短链播放量-视频",
                           "短链地址-视频",
                           "短链地址",
                           "是否原创",
                           "原创账户",
                           "博文类型",
                           ]
        else:
            if project_type == "purchased_releasers":
                columns = ['releaser','platform',"start_purchase_time","end_purchase_time","releaserUrl","releaser_id_str","is_purchased","departments","is_purchased_str"]
            else:
                columns = ['releaser',
                       "ranking_by_play_count_sum",
                       "total_releaser_num_on_platform",
                       'platform',
                       "video_num",
                       "play_count_sum",
                       "play_count_sum_total_on_platform",
                       "video_num_sum_total_on_platform",
                       "favorite_count_sum",
                       "comment_count_sum",
                       "repost_count_sum",
                       "releaser_followers_count",
                       "this_fans",
                       "last_fans",
                       "fans_change",
                       "data_year",
                       "data_month",
                       "stats_type",
                       "production_org_category",
                       "releaserUrl",
                       "releaser_id_str",
                        "fans_date",
                       "top"
                       ]
        for d in line_dict:
            if d not in columns:
                columns.append(d)
        if not all_plant:
            return save_file_path + file_task_name
        data = pd.DataFrame(all_plant)
        s = datetime.datetime.now()
        ss = str(s)[0:19].replace(' ', '-').replace(':', '-')
        try:
            os.mkdir(save_file_path + file_task_name)
        except Exception as e:
            print("save error 716", e)
            pass
        save_path = '%s%s/%s_%s_%s_%s.csv' % (
        save_file_path, file_task_name, file_task_name, project_type, data_type, ss)
        print(save_path)
        data.to_csv(save_path, encoding='gb18030', columns=columns)
        if data_type == "detail" and project_type == "short_video":
            for d in all_dic:
                if all_dic[d]:
                    data = pd.DataFrame(all_dic[d])
                    s = datetime.datetime.now()
                    ss = str(s)[0:19].replace(' ', '-').replace(':', '-')
                    data.to_csv('%s/%s_%s_%s.csv' % (save_file_path + file_task_name, file_task_name, d, ss),
                                encoding='gb18030', columns=columns)
        time.sleep(0.5)
        return save_file_path + file_task_name
    else:
        search_body = kwargs.get("search_body")
        search_res = scan(es, query=search_body, index=read_index, doc_type=read_doc)
        file_name = "%s_%s" % (project_type, data_type)
        if data_type == "detail":
            for data in detail_to_dic(search_res, data_type=project_type):
                if data:
                    all_plant.append(data)

        data = pd.DataFrame(all_plant)
        s = datetime.datetime.now()
        ss = str(s)[0:19].replace(' ', '-').replace(':', '-')
        try:
            os.mkdir(save_file_path + file_task_name)
        except Exception as e:
            print(e)
        save_path = '%s%s/%s_%s.csv' % (save_file_path, file_task_name, file_name, ss)
        print(save_path)
        data.to_csv(save_path, encoding='gb18030')
        return save_file_path + file_task_name


def timestamp_range(start_timestamp, end_timestamp):
    time_stamp_list = []
    start_date = datetime.datetime.fromtimestamp(start_timestamp / 1e3)
    start_year = start_date.year
    start_month = start_date.month
    end_date = datetime.datetime.fromtimestamp(end_timestamp / 1e3)
    if start_date.day == 1 and end_date.day == 1:
        while start_timestamp <= end_timestamp:
            time_stamp_list.append(start_timestamp)
            if start_month == 12:
                start_month = 1
                start_year += 1
            else:
                start_month += 1
            start_date = datetime.datetime(year=start_year, month=start_month, day=1)
            start_timestamp = int(start_date.timestamp() * 1e3)
        return time_stamp_list
    else:
        return [start_timestamp, end_timestamp]


def es_to_csv(year=None, cycle="week", cycle_num=None, compare_type=None, project_type="short_video",
              data_type_list=["detail"], file_path=None, file_task_name=None, save_file_path=None, start_timestamp=None,
              end_timestamp=None, split_month=False, project_name=None, **kwargs):
    """
    :param year: int 年份 可为空
    :param cycle: str 周期 week/month/quarter/year/(all-time 微信使用自定义时间)/
    :param cycle_num: int  周期 1/2/3/4/5/6
    :param compare_type:默认为空 可不填
    :param project_type:str 数据平台 short_video/weibo/weixin/all-time/target_relasers/purchased_releasers(data_type_list需为summary)
    :param data_type_list: list 数据类型 ["detail","summary"]
    :param file_path: str 传入文件的路径
    :param file_task_name: str 传入的任务名
    :param split_month: bool 是否进行分月 只能在all-time参数下使用
    :param project_name: Str 如果传入的是项目名,则不使用文件
    :param kwargs:dict search_body = {}  自定义请求体
    :return:
    """
    project_dic = {
            "weixin": [],
            "weibo": [],
            "short_video": [],
            "all-time":[],
            "target_releasers":[],
            "purchased_releasers":[]
    }
    #
    if project_name:
        file_path = get_project_releaser(project_name)
    # file_path = r"D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2020-01\微博模板 (1)84781418.csv"
    if not save_file_path:
        save_file_path = "/opt/code/dataManagementSys/dataManagementSys/dataManagementSys/static/zipfile/"
        # save_file_path = r"G:\releaserSystem\dataManagementSys\dataManagementSys\static\zipfile\\"
    if not year:
        year = datetime.datetime.now().year

    try:
        f = open(file_path, 'r', encoding="gb18030")
        head = f.readline()
        head_list = head.strip().split(',')
    except:
        f = file_path
    if file_path:
        for line, i in enumerate(f):
            if type(file_path) != list:
                try:
                    line_list = i.strip().split(',')
                    line_dict = dict(zip(head_list, line_list))
                except:
                    line_dict = f
            else:
                line_dict = i
            if line_dict["platform"] == "weixin":
                project_dic["weixin"].append(line_dict)
            elif line_dict["platform"] == "weibo":
                project_dic["weibo"].append(line_dict)
            if project_type == "purchased_releasers":
                project_dic["purchased_releasers"].append(line_dict)
            else:
                if project_type == "short_video":
                    project_dic["short_video"].append(line_dict)
                else:
                    project_dic["all-time"].append(line_dict)
    else:
        if project_type == "target_releasers":
            project_dic["target_releasers"].append(None)

    for platform_type in project_dic:
        if not project_dic[platform_type]:
            continue
        if platform_type == "weixin" or platform_type == "weibo":
            project_type_temp = platform_type
        else:
            project_type_temp = project_type
        if project_type != platform_type:
            continue
        for data_type in data_type_list:
            index, doc, start_ts, end_ts, this_cycle = create_index(year=year, cycle=cycle, cycle_num=cycle_num,
                                                                    compare_type=compare_type,
                                                                    project_type=project_type_temp,
                                                                    data_type=data_type, first_day_ts=start_timestamp,
                                                                    last_day_ts=end_timestamp, **kwargs)
            print(index, doc, start_ts, end_ts, this_cycle)
            try:
                if not split_month:
                    if start_timestamp and end_timestamp:
                        start_ts = start_timestamp
                        end_ts = end_timestamp
                    try:
                        if platform_type == "target_releasers":
                            project_dic[platform_type] = []
                        res_path = func_es_to_csv(project_dic[platform_type], index, doc, year=year, cycle=cycle,
                                                  cycle_num=cycle_num,
                                                  project_type=project_type_temp,
                                                  data_type=data_type, start_ts=start_ts, end_ts=end_ts,
                                                  save_file_path=save_file_path, file_task_name=file_task_name,
                                                  this_cycle=this_cycle, **kwargs)
                    except Exception as e:
                        print("zip file eroor ",e)
                else:
                    month_timenstamp_list = timestamp_range(start_timestamp, end_timestamp)
                    for s, start_ts in enumerate(month_timenstamp_list):
                        if s < len(month_timenstamp_list) - 1:
                            try:
                                res_path = func_es_to_csv(project_dic[platform_type], index, doc, year=year, cycle=cycle,
                                                          cycle_num=cycle_num,
                                                          project_type=project_type_temp,
                                                          data_type=data_type, start_ts=start_ts,
                                                          end_ts=month_timenstamp_list[s + 1],
                                                          save_file_path=save_file_path, file_task_name=file_task_name,
                                                          this_cycle=this_cycle, **kwargs)
                                time.sleep(2)
                            except Exception as e:
                                print("zip file eroor ", e)
            except Exception as e:
                print("line 769 eroor", e)
                # zipDir(res_path, save_file_path + "down_task/" + file_task_name)
                # raise FileNotFoundError("file didn't find")
        time.sleep(3)
    return zipDir(res_path, save_file_path + "down_task/" + file_task_name)


def write_csv_task_to_redis(year=None, cycle="week", cycle_num=None, compare_type=None, project_type="short_video",
                            data_type_list=["detail"], file_path=None, file_task_name=None, save_file_path=None,
                            start_timestamp=None, end_timestamp=None, split_month=False, email_group=[],
                            project_name=None, **kwargs):
    """
        :param year: int 年份 可为空
        :param cycle: str 周期 week/month/quarter/year
        :param cycle_num: int  周期 1/2/3/4/5/6
        :param compare_type:默认为空 可不填
        :param project_type:str 数据平台 short_video/weibo/weixin/all-time/target_relasers
        :param data_type_list: list 数据类型 ["detail","summary"]
        :param file_path: str 传入文件的路径
        :param file_task_name: str 传入的任务名
        :param split_month: bool 是否进行分月 只能在all-time参数下使用
        :param project_name: Str 传入项目标签
        :param kwargs:dict search_body = {}  自定义请求体
        :return:
        """
    now = int(datetime.datetime.now().timestamp() * 1e3)
    mapping_dic = {
            "year": year,
            "cycle": cycle,
            "cycle_num": cycle_num,
            "compare_type": compare_type,
            "project_type": project_type,
            "data_type_list": data_type_list,
            "file_path": file_path,
            "file_task_name": file_task_name,
            "save_file_path": save_file_path,
            "start_timestamp": start_timestamp,
            "end_timestamp": end_timestamp,
            "split_month": split_month,
            "email_group": email_group,
            "project_name": project_name,
            "kwargs": kwargs
    }
    for k in mapping_dic:
        mapping_dic[k] = json.dumps(mapping_dic[k])
    file_name_msg = rds.hmset(file_task_name + "%s_csv" % now, mapping_dic)
    csv_task_msg = rds.rpush("csv_task", file_task_name + "%s_csv" % str(now))
    res = rds.hgetall(file_task_name + "%s_csv" % now)
    if res:
        print(res)
        print("file_name_msg", file_name_msg)
        print("csv_task_msg", csv_task_msg)
        return True
    else:
        return False


def get_task_name_from_redis():
    res = rds.llen("csv_task")
    if res != 0:
        data_dic = {}
        one_project_name = rds.lpop("csv_task")
        time.sleep(3)
        project_data = rds.hgetall(one_project_name)
        rds.delete(one_project_name)
        for k in project_data:
            data_dic[k] = json.loads(project_data[k])
        task_name = data_dic.get("file_task_name")
        print(data_dic)
        try:
            res_path = es_to_csv(year=data_dic.get("year"),
                                 cycle=data_dic.get("cycle"),
                                 cycle_num=data_dic.get("cycle_num"),
                                 compare_type=data_dic.get("compare_type"),
                                 project_type=data_dic.get("project_type"),
                                 data_type_list=data_dic.get("data_type_list"),
                                 file_path=data_dic.get("file_path"),
                                 file_task_name=data_dic.get("file_task_name"),
                                 save_file_path=data_dic.get("save_file_path"),
                                 start_timestamp=data_dic.get("start_timestamp"),
                                 end_timestamp=data_dic.get("end_timestamp"),
                                 split_month=data_dic.get("split_month"),
                                 email_group=data_dic.get("email_group"),
                                 project_name=data_dic.get("project_name"),
                                 **data_dic.get("kwargs")
                                 )
        except Exception as e:
            print("save error ", e)
            mapping_dic = {
                    "task_name": task_name,
                    "file_path": None,
                    "data_str": "导出任务 %s 失败" % task_name,
                    "email_group": data_dic.get("email_group"),
                    "email_msg_body_str": "问好:\n        导出任务 %s 失败\n        库中没有对应数据" % task_name,
                    "title_str": "导出任务 %s 失败" % task_name,
                    "cc_group": ["litao@csm.com.cn", "zhouyujiang@csm.com.cn", "gengdi@csm.com.cn"],
                    "sender": data_dic.get("email_group")[0]
            }

            delete_task_form_redis(task_name, mapping_dic, "", hasdata=False)
            return None
        mapping_dic = {
                "task_name": task_name,
                "file_path": None,
                "data_str": "导出任务 %s 已完成" % task_name,
                "email_group": data_dic.get("email_group"),
                "email_msg_body_str": "问好:\n        导出任务 %s 已完成" % task_name,
                "title_str": "导出任务 %s 已完成" % task_name,
                "cc_group": ["litao@csm.com.cn", "liushuangdan@csm.com.cn", "gengdi@csm.com.cn"],
                "sender": data_dic.get("email_group")[0]
        }
        print(task_name, mapping_dic, res_path)
        delete_task_form_redis(task_name, mapping_dic, res_path, hasdata=True)
        return res_path
    else:
        return None


def delete_task_form_redis(one_project_name, email_dic, csv_path, hasdata=True):
    print(one_project_name)
    rds.delete(one_project_name)
    dump_str = json.dumps(
            {"timestamp": int(datetime.datetime.now().timestamp() * 1e3), "csv_path": csv_path, "has_data": hasdata})
    rds.hset("csv_task_task_dowm", one_project_name, dump_str)
    if email_dic:
        write_email_task_to_redis(**email_dic)


def get_csv_task_down():
    res = rds.hgetall("csv_task_task_dowm")
    if res:
        for key in res:
            rds.hdel("csv_task_task_dowm", key)
            rds.delete(key)
        return res
    else:
        return None


def do_task():
    now = datetime.datetime.now()
    while True:
        try:
            if rds.llen("csv_task") != 0:
                get_task_name_from_redis()
            else:
                now = datetime.datetime.now()
                print(now.strftime("%Y-%m-%d %H:%M:%S"))
                time.sleep(5)
        except Exception as e:
            print(e)
            continue


# while True:
#     do_task()

if __name__ == "__main__":
    # es_to_csv(year=2019, cycle="month", project_type="purchased_releasers", data_type_list=["summary"],
    #           file_path=r"D:\work_file\发布者账号\一次性需求附件\CCR2月以来采购账号.csv", save_file_path=r"./", file_task_name="156",
    #          )
    # a =  {'kwargs': {}, 'split_month': False, 'project_name': None, 'email_group': ['gengdi@csm.com.cn'], 'start_timestamp': None, 'project_type': 'purchased_releasers', 'compare_type': None, 'save_file_path': '/opt/code/dataManagementSys/dataManagementSys/dataManagementSys/static/zipfile/down_task/', 'cycle_num': None, 'file_path': '/opt/code/dataManagementSys/dataManagementSys/dataManagementSys/static/purchaseCsv/', 'year': None, 'data_type_list': ['summary'], 'file_task_name': 'is_purchase_gengdi2020412', 'cycle': 'week', 'end_timestamp': None}
    #
    # write_csv_task_to_redis(**a)
    # write_csv_task_to_redis(year=2019, cycle="month",cycle_num=9,project_type="target_releasers",data_type_list=["detail"],save_file_path=r"E:\test",file_task_name="1789",start_timestamp=1567267200000,end_timestamp=1572537600000,email_group=["litao@csm.com.cn",],search_body={
    #       "query": {
    #         "bool": {
    #           "filter": [
    #              {"term": {"releaser.keyword": "BTV财经"}}
    #             ]
    #         }
    #       },"sort": [
    #         {
    #           "timestamp": {
    #             "order": "desc"
    #           }
    #         }
    #       ]
    #     })
    # get_task_name_from_redis()
    # time.sleep(2)
    # print(get_csv_task_down())
    ###  写入redis任务 调用 write_csv_task_to_redis()
    ### 查询完成状态  调用 get_csv_task_down()
    # print(month_num(year=2020,cycle_num=1))
    # print(month_num(year=2019,cycle_num=12))
    # do_task()
    # get_csv_task_down()r
    # dic = {'start_timestamp': 1577808000000, 'cycle': 'week', 'compare_type': None, 'project_type': 'all-time', 'year': None,
    #  'file_path': r'D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2020-04\brief2-影视综账号抖音快手.csv',
    #  'end_timestamp': 1585670400000,
    #  'save_file_path': './',
    #  'data_type_list': ['summary', 'detail'], 'file_task_name': '306_天津brief二影视综短视频一至三月', 'email_group': [],
    #  'cycle_num': None, 'project_name': None, 'split_month': True, 'kwargs': {}}
    # es_to_csv(**dic)
    do_task()
    # res= es_to_csv(email_group=[],project_type='weixin',
    #           compare_type=None, save_file_path="./", cycle_num=2,
    #          year=2020,project_name="中山",
    # data_type_list=['summary','detail'], file_task_name= '中山', cycle='all-time',start_timestamp=1577808000000,end_timestamp=1580486400000)
    # dic = {'kwargs': {'search_body': {'query': {'bool': {'filter': [], 'should': [{'match': {'project_tags.keyword': '城市媒体融合'}}], 'minimum_should_match': 1}}}}, 'split_month': False, 'project_name': None, 'email_group': ['litao@csm.com.cn'], 'start_timestamp': None, 'project_type': 'target_releasers', 'compare_type': None, 'save_file_path': './', 'cycle_num': None, 'file_path': None, 'year': None, 'data_type_list': ['detail'], 'file_task_name': 'litao202048', 'cycle': 'week', 'end_timestamp': None}
    # es_to_csv(**dic)
    # res = timestamp_range(1577808000000,1585670400000)
    # print(res)