import pymysql
from pyspark.conf import SparkConf
import pytispark.pytispark as pti
from pyspark.sql import SparkSession
import datetime
import pandas as pd
import time
from pyspark import StorageLevel



def all_click(x):
    total = []
    sum = 0
    date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")

    print("美购搜索点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'search_result_welfare_click_item' "
                    "and app['version'] >='7.14.0'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    print("美购首页相关推荐")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    home_page_sum = 0
    print("首页点击'全部'icon按钮进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp

    print("首页点击icon进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' "
                    "and params['from'] = 'category' and params['cpc_referer'] = '19'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp
    total.append(home_page_sum)
    sum = sum + home_page_sum

    meigou_homepage_sum = 0
    print("美购首页'全部'点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_homepage_sum = meigou_homepage_sum + tmp

    print("美购首页icon美购点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_homepage_sum = meigou_homepage_sum + tmp
    total.append(meigou_homepage_sum)
    sum = sum + meigou_homepage_sum

    total.append(sum)

    return total



def cpc_click(x):
    total = []
    sum = 0
    date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")

    print("美购搜索点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'search_result_welfare_click_item' "
                    "and app['version'] >='7.14.0' and params['is_cpc'] = '1'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    print("美购首页相关推荐")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' "
                    "and params['is_cpc'] = '1'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    home_page_sum = 0
    print("首页点击'全部'icon按钮进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' "
                    "and params['is_cpc'] = '1'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp

    print("首页点击icon进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' "
                    "and params['from'] = 'category' and params['cpc_referer'] = '19' "
                    "and params['is_cpc'] = '1'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp
    total.append(home_page_sum)
    sum = sum + home_page_sum

    meigou_home_sum = 0
    print("美购首页'全部'点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' "
                    "and params['is_cpc'] = '1'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_home_sum = meigou_home_sum + tmp

    print("美购首页icon美购点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' "
                    "and params['is_cpc'] = '1'"
                    .format(date)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_home_sum = meigou_home_sum + tmp
    total.append(meigou_home_sum)
    sum = sum + meigou_home_sum

    total.append(sum)

    return total


def os_all_click(x,os):
    total = []
    sum = 0
    date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")

    print("美购搜索点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'search_result_welfare_click_item' "
                    "and app['version'] >='7.14.0' and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    print("美购首页相关推荐")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    home_page_sum = 0
    print("首页点击'全部'icon按钮进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp

    print("首页点击icon进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' "
                    "and params['from'] = 'category' and params['cpc_referer'] = '19' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp
    total.append(home_page_sum)
    sum = sum + home_page_sum

    meigou_homepage_sum = 0
    print("美购首页'全部'点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_homepage_sum = meigou_homepage_sum + tmp

    print("美购首页icon美购点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_homepage_sum = meigou_homepage_sum + tmp
    total.append(meigou_homepage_sum)
    sum = sum + meigou_homepage_sum

    total.append(sum)

    return total

def os_cpc_click(x,os):
    total = []
    sum = 0
    date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")

    print("美购搜索点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'search_result_welfare_click_item' "
                    "and app['version'] >='7.14.0' and params['is_cpc'] = '1' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    print("美购首页相关推荐")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' "
                    "and params['is_cpc'] = '1' "
                    "and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    total.append(tmp)
    sum = sum + tmp

    home_page_sum = 0
    print("首页点击'全部'icon按钮进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' "
                    "and params['is_cpc'] = '1' and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp

    print("首页点击icon进入的列表-美购卡片点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
                    "where partition_date='{}'and action = 'goto_welfare_detail' "
                    "and app['version'] >='7.14.0' "
                    "and params['from'] = 'category' and params['cpc_referer'] = '19' "
                    "and params['is_cpc'] = '1' and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    home_page_sum = home_page_sum + tmp
    total.append(home_page_sum)
    sum = sum + home_page_sum

    meigou_home_sum = 0
    print("美购首页'全部'点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' "
                    "and params['is_cpc'] = '1' and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_home_sum = meigou_home_sum + tmp

    print("美购首页icon美购点击")
    tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
                    "and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
                    "and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' "
                    "and params['is_cpc'] = '1' and device['device_type'] = '{}'"
                    .format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
    meigou_home_sum = meigou_home_sum + tmp
    total.append(meigou_home_sum)
    sum = sum + meigou_home_sum

    total.append(sum)

    return total


if __name__ == '__main__':
    sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
        .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
        .set("spark.tispark.plan.allow_index_double_read", "false") \
        .set("spark.tispark.plan.allow_index_read", "true") \
        .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
        .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
        .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")

    spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
    for os in ["ios","android"]:
        all_list = []
        for i in range(1,21):
            date_str = (datetime.date.today() - datetime.timedelta(days=i)).strftime("%Y%m%d")
            tmp_list = [date_str]
            tmp_list.extend(os_all_click(i,os))
            tmp_list.extend(os_cpc_click(i,os))
            all_list.append(tmp_list)
        df = pd.DataFrame(all_list)
        df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home",
                                5: "all_clcik",
                                6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home",
                                10:"cpc_all"})
        df.to_csv('/home/gmuser/cpc_{}.csv'.format(os), index=False)

    # all_list = []
    # for i in range(1, 4):
    #     date_str = (datetime.date.today() - datetime.timedelta(days=i)).strftime("%Y%m%d")
    #     tmp_list = [date_str]
    #     tmp_list.extend(all_click(i))
    #     tmp_list.extend(cpc_click(i))
    #     all_list.append(tmp_list)
    #
    # df = pd.DataFrame(all_list)
    #
    # df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home",
    #                         5: "all_clcik",
    #                         6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home",
    #                         10:"cpc_all"})
    # df.to_csv('/home/gmuser/cpc_1011.csv', index=False)

    spark.stop()