# -*- coding:UTF-8 -*-
# @Time  : 2020/8/11 14:33
# @File  : crawler_week_report.py
# @email : litao@igengmei.com
# @author : litao

# coding=utf-8
import json

import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit
import pytispark.pytispark as pti



startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri", "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")


spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName("LR PYSPARK TEST").enableHiveSupport().getOrCreate())

spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")




rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')

conn_mimas = pymysql.connect(host='172.16.30.138', port=3306, user='mimas', passwd='GJL3UJe1Ck9ggL6aKnZCq4cRvM',
                             db='mimas_prod', charset='utf8mb4')
cur_mimas = conn_mimas.cursor()

db_zhengxing_eagle = pymysql.connect(host="172.16.30.136", port=3306, user="doris",
                                     password="o5gbA27hXHHm",
                                     db="doris_prod",
                                     charset='utf8',
                                     cursorclass=pymysql.cursors.DictCursor)
zhengxing_cursor = db_zhengxing_eagle.cursor()

es = Elasticsearch([
    {
        'host': '172.16.31.17',
        'port': 9200,
    }, {
        'host': '172.16.31.11',
        'port': 9200,
    }])


# class Hive_conn():
#     def __init__(self):
#         self._hive_host = "bj-gm-prod-cos-datacenter003"
#         self._hive_port = 10000
#         self._hive_username = "strategy"
#         self._hive_database = "online"
#         self._conn = hive.Connection(host=self._hive_host, port=self._hive_port, username=self._hive_username,
#                                     database=self._hive_database)
#         self.cursor = self._conn.cursor()
#         # self.cursor.execute('show tables')
#
#     def __del__(self):
#         self.cursor.close()
#         self._conn.close()


# def zipDir(dirpath,outFullName):
#     """
#     压缩指定文件夹
#     :param dirpath: 目标文件夹路径
#     :param outFullName: 压缩文件保存路径+xxxx.zip
#     :return: 无
#     """
#     import zipfile
#     zip = zipfile.ZipFile(outFullName,"w",zipfile.ZIP_DEFLATED)
#     for path,dirnames,filenames in os.walk(dirpath):
#         # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
#         fpath = path.replace(dirpath,'')
#
#         for filename in filenames:
#             zip.write(os.path.join(path,filename),os.path.join(fpath,filename))
#     zip.close()

def send_email_tome():
    try:
        date = datetime.datetime.now().date() - datetime.timedelta(days=1)
        fromaddr = 'litao@igengmei.com'
        content = 'hi  all:附件为' + str(date) + '的搜索词数据统计结果以及近一周的数据统计结果,请查收!'
        zipFile = "/srv/apps/crawler/近1周数据统计结果.xls"
        send_file_email("", "", email_group=["<litao@igengmei.com>","‎<duanyingrong@igengmei.com>"], title_str=content
                        , email_msg_body_str=content, file=zipFile)
    except Exception as e:
        print(e)


def get_es_word(word, start_ts):
    # tractate oneline
    results = es.search(
        index='gm-dbmw-tractate-read',
        doc_type='tractate',
        timeout='10s',
        size=0,
        body={
            "query": {
                "bool": {
                    "minimum_should_match": 1,
                    "should": [{"match_phrase": {"content": {"query": word, "analyzer": "gm_default_index"}}}, {
                        "match_phrase": {"tractate_tag_name": {"query": word, "analyzer": "gm_default_index"}}}, {
                                   "match_phrase": {"tractate_tag_name_content": {"query": word,
                                                                                  "analyzer": "gm_default_index"}}}],
                    "must": [{"term": {"is_online": True}}, {
                        "terms":
                            {"content_level": [6, 5, 4, 3]}
                    }]
                }
            }, "size": 0, "aggs": {
                "content_level": {
                    "terms": {
                        "field": "content_level",
                        "size": 10
                    }
                }
            }
        }
    )
    tractate_content_num = results["hits"]["total"]
    content_level_count = results["aggregations"]["content_level"]["buckets"]

    # tractate craw
    results = es.search(
        index='gm-dbmw-tractate-read',
        doc_type='tractate',
        timeout='10s',
        size=0,
        body={
            "query": {
                "bool": {
                    "minimum_should_match": 1,
                    "should": [{"match_phrase": {"content": {"query": word, "analyzer": "gm_default_index"}}}, {
                        "match_phrase": {"tractate_tag_name": {"query": word, "analyzer": "gm_default_index"}}}, {
                                   "match_phrase": {"tractate_tag_name_content": {"query": word,
                                                                                  "analyzer": "gm_default_index"}}}],
                    "must": [
                        {"terms": {"platform": [13, 14, 15, 16, 17]}},
                        {"range": {"create_time_epoch": {"gte": start_ts / 1e3}}}
                    ]
                }
            }, "size": 0, "aggs": {
                "content_level": {
                    "terms": {
                        "field": "content_level",
                        "size": 10
                    },
                    "aggs": {
                        "is_online": {
                            "terms": {
                                "field": "is_online",
                                "size": 10
                            }
                        }
                    }
                }
            }
        }
    )
    craw_tractate_num = results["hits"]["total"]
    craw_level_count = results["aggregations"]["content_level"]["buckets"]
    res_list = []
    # 可见3456星帖子
    try:
        for content_level in ("3", "4", "5", "6"):
            doc_count = 0
            for content_level_dic in content_level_count:
                if content_level_dic["key"] == content_level:
                    doc_count = content_level_dic["doc_count"]
                    break
            res_list.append(doc_count)
    except:
        print("craw_level_count error", craw_level_count)
    res_list.append(craw_tractate_num)
    for content_level in ("3", "4", "5", "6"):
        doc_count = 0
        for content_level_dic in craw_level_count:
            if content_level_dic["key"] == content_level:
                for buck in content_level_dic["is_online"]["buckets"]:
                    if buck["key"] == 1:
                        doc_count = buck["doc_count"]
                        break
        res_list.append(doc_count)

    return res_list


class WritrExcel():
    def __init__(self):
        self.workbook = xlwt.Workbook(encoding='utf-8')

    def set_style(self, name, height, bold=False):
        style = xlwt.XFStyle()  # 初始化样式
        font = xlwt.Font()  # 为样式创建字体
        font.name = name
        font.bold = bold
        font.color_index = 4
        font.height = height
        style.font = font
        return style

    # 写入Excel
    def write_excel(self, sheet_name, rows):
        # 创建工作簿
        # 创建sheet
        data_sheet = self.workbook.add_sheet(sheet_name)
        # 将样式定义在循环之外
        default = self.set_style('Times New Roman', 220, True)
        j = k = 0
        # 循环读取每一行数据并写入Excel
        for row in rows[:65530]:
            for i in range(len(row)):
                try:
                    # 写入
                    data_sheet.write((j + k), i, row[i], default)
                except:
                    print(i)
                    raise
                # data_sheet.write(1, i, row1[i], self.set_style('Times New Roman', 220, True))
            k = k + 1
        print("写入sheet成功,共" + str(k) + "行数据")

    def save_excel(self, path):
        self.workbook.save(path)


def get_search_keywrod_dict():
    es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
    search_body = {
        "query": {
            "bool": {
                "filter": [

                ]
            }
        }
    }
    return_dict = {}
    scan_res = scan(es_framework, query=search_body, index="search_word")
    for res in scan_res:
        return_dict[res["_source"]["title"]] = {}
    return return_dict


def get_hot_crawler_data_count(index, start_ts):
    es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
    search_body = {
        "query": {
            "bool": {
                "filter": [
                    {"range": {"fetch_time": {"gte": int(start_ts)}}}
                ], "must_not": [
                    {"exists": {"field": "search_word"}}
                ]
            }
        }, "size": 0, "aggs": {
            "NAME": {
                "terms": {
                    "field": "platform",
                    "size": 10
                }
            }
        }
    }
    print(search_body)
    search_res = es_framework.search(body=search_body, index=index)
    platform_count = search_res["aggregations"]["NAME"]["buckets"]
    return platform_count


def get_count_hot_crawler_data_from_mysql(start_ts: int) -> Dict:
    data_platform_id_list = [11, 12]
    res_dict = {}
    for platform_id in data_platform_id_list:
        start_date = datetime.datetime.fromtimestamp(start_ts / 1e3)
        start_date_str = start_date.strftime("%Y-%m-%d %H:%M:%S")
        match_all_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and platform = {platfrom}""".format(
            time=start_date_str, platfrom=platform_id)
        cur_mimas.execute(match_all_sql)
        data = cur_mimas.fetchall()
        match_all_num = data[0][0]
        # print(match_all_num)
        oneline_sql = """select count(id) as nums from api_tractate where create_time >= '{time}' and is_online = 1 and pgc_type = 1""".format(
            time=start_date_str)
        cur_mimas.execute(oneline_sql)
        data = cur_mimas.fetchall()
        match_oneline_num = data[0][0]
        # print(match_oneline_num)
        if platform_id == 11:
            platform = 'weibo'
        elif platform_id == 12:
            platform = "douban"
        res_dict[platform] = []
        res_dict[platform].append(match_all_num)
        res_dict[platform].append(match_oneline_num)
    return res_dict


def week_num(year=None, cycle=None, cycle_num=None, compare_type=None):
    now = datetime.datetime.now()
    now_canlendar = now.isocalendar()
    if not cycle_num:
        week_canlendar = now_canlendar
    else:
        week_canlendar = (now.year, cycle_num + 1, 0)
    year = week_canlendar[0]
    this_week = week_canlendar[1]
    if this_week == 0:
        last_year = year - 1
        this_week = 1
    else:
        last_year = year
    if this_week == 1:
        last_week = "52"
    else:
        last_week = this_week - 1
    today = datetime.datetime(datetime.datetime.now().year, datetime.datetime.now().month, datetime.datetime.now().day)
    # today = datetime.datetime(year=2018, month=12, day=25)
    first_day_in_week = today - datetime.timedelta(
        days=now_canlendar[2] + 7 * (now_canlendar[1] - week_canlendar[1]) + 2)
    fisrt_day_ts = int(first_day_in_week.timestamp() * 1e3)
    last_day_in_week = first_day_in_week + datetime.timedelta(days=7)
    last_day_ts = int(last_day_in_week.timestamp() * 1e3)

    this_week_index = 'crawler-data-raw'
    return this_week_index, fisrt_day_ts, last_day_ts, this_week, last_week, last_year


def get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year):
    res_data = [("周", "抓取平台", "入库内容数", "推送候选内容入库数", "上线内容数", "推送内容数")]
    craw_data_count = get_hot_crawler_data_count(data_index, start_ts)
    res_dict = get_count_hot_crawler_data_from_mysql(start_ts)
    for platform in ("douban", "weibo"):
        craw_one_week_count = ""
        for buck in craw_data_count:
            if buck["key"] == platform:
                craw_one_week_count = buck["doc_count"]
        push_num = res_dict[platform][0]
        oneline_num = res_dict[platform][1]
        new_line = (week_num, platform, craw_one_week_count, push_num, oneline_num, "")
        res_data.append(new_line)
    return res_data


def get_keyword_ctr(start_ts,end_ts):
    start_date = datetime.datetime.fromtimestamp(start_ts/1e3)
    end_date = datetime.datetime.fromtimestamp(end_ts/1e3)
    start_date_str = start_date.strftime("%Y%m%d")
    end_date_str = end_date.strftime("%Y%m%d")
    data_dic = {}
    # --query词曝光
    baoguang_sql = """
SELECT card_id as query,count(*) as query_count FROM online.ml_community_precise_exposure_detail
WHERE partition_date>='{start_date}' AND partition_date<'{end_date}' and page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') group by query
""".format(start_date=str(start_date_str),end_date=str(end_date_str))
    device_df = spark.sql(baoguang_sql)
    device_df.show(1, False)
    sql_res = device_df.collect()
    print("-----------------------------------------------------------------------------")
    for res in sql_res:
        data_dic[res.query] = res.query_count
    # --query词曝光
    query_sql = """
SELECT params['query_words'] as query_words
FROM online.bl_hdfs_maidian_updates
WHERE partition_date>='{start_date}' AND partition_date<'{end_date}'
AND action = 'report_status'
AND page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor')
    """.format(start_date=start_date_str,end_date=end_date_str)

    device_df = spark.sql(query_sql)
    device_df.show(1, False)
    sql_res = device_df.collect()
    for res in sql_res:
        try:
            res_json = json.loads(res.query_words)
            # print(res_json, type(res_json))
        except:
            continue
        for single_keyword in res_json:
            data_count = data_dic.get(single_keyword)
            if data_count:
                data_dic[single_keyword] = data_dic[single_keyword] + 1
            else:
                data_dic[single_keyword] = 1
    print(data_dic)
    return data_dic


def craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year,word_count):

    tag_names_list_week = []
    one_month_ago = datetime.datetime.now().date() - datetime.timedelta(days=30)
    search_keyword_dict = get_search_keywrod_dict()
    sql = 'select keywords,sum(sorted) as nums,sum(uv) as uvs  from api_search_words where is_delete = 0 and create_time >= "' + str(
        one_month_ago) + '" group by keywords  order by  nums  desc'

    print(sql)
    zhengxing_cursor.execute("set names 'UTF8'")
    zhengxing_cursor.execute(sql)
    data = zhengxing_cursor.fetchall()

    tup_title = ("关键词", "近一周搜索内容ctr", "过去30天搜索pv", "过去30天搜索uv", "帖子-3星内容数量", "帖子-4星内容数量",
                 "帖子-5星内容数量", "帖子-6星内容数量", "近一周抓取数量", "近一周审核上线3星数量",
                 "近一周审核上线4星数量", "近一周审核上线5星数量", "近一周审核上线6星数量")
    tag_names_list_week.append(tup_title)
    for name in list(data):
        word = name.get("keywords", None)
        if word in search_keyword_dict:
            sorteds = name.get("nums", 0)
            uv = name.get("uvs", 0)
            tractate_content_num = get_es_word(word, start_ts)
            exposure_count = word_count.get(word)
            if not exposure_count:
                ctr = ""
            else:
                ctr = str(round(sorteds/exposure_count * 100, 2)) + "%"
            new_line = [word,ctr, sorteds, uv]
            tag_names_list_week.append(tuple(new_line + tractate_content_num))
    for word in search_keyword_dict:
        tractate_content_num = get_es_word(word, start_ts)
        if tractate_content_num[0] != 0:
            new_line = [word, 0, 0, 0]
            tag_names_list_week.append(tuple(new_line + tractate_content_num))
    return tag_names_list_week


if __name__ == "__main__":
    data_index, start_ts, end_ts, week_num, last_week_num, year = week_num()
    start_ts = 1597593600000
    craw_one_week = get_how_word_crawler_count(data_index, start_ts, end_ts, week_num, last_week_num, year)
    word_count = get_keyword_ctr(start_ts, end_ts)
    # 一周爬虫抓取热点数

    print(craw_one_week)
    # query 一周抓取详情
    all_data_week = craw_query_one_week(data_index, start_ts, end_ts, week_num, last_week_num, year,word_count)
    path = "近1周数据统计结果.xls"
    exl = WritrExcel()
    exl.write_excel("热点内容抓取周报", tuple(craw_one_week))
    exl.write_excel("query抓取周报", tuple(all_data_week))
    exl.save_excel(path)
    print(u'创建demo.xls文件成功')
    send_email_tome()