from pyspark import SparkConf
from pyspark.sql import SparkSession
from pytispark import pytispark as pti

from utils.date import get_ndays_before


def connect_doris(spark, table):
    return spark.read.format("jdbc") \
            .option("driver", "com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://172.16.30.136:3306/doris_prod") \
            .option("dbtable", table) \
            .option("user", "doris") \
            .option("password", "o5gbA27hXHHm") \
            .load()


def get_content_tag3(spark, card_type):
    if card_type == "diary":
        content_tag3 = connect_doris(spark, "strategy_content_tagv3_info")
    elif card_type == "user_post":
        content_tag3 = connect_doris(spark, "strategy_tractate_tagv3_info")
    else:
        content_tag3 = connect_doris(spark, "strategy_answer_tagv3_info")
    return content_tag3


def get_spark(app_name=""):
    sparkConf = SparkConf()
    sparkConf.set("spark.sql.crossJoin.enabled", True)
    sparkConf.set("spark.debug.maxToStringFields", "100")
    sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
    sparkConf.set("spark.tispark.plan.allow_index_read", True)
    sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
    sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
    sparkConf.set("mapreduce.map.output.compress", False)
    spark = SparkSession.builder.config(conf=sparkConf).config(
        "spark.sql.extensions",
        "org.apache.spark.sql.TiExtensions").config("spark.tispark.pd.addresses",
                                                    "172.16.40.170:2379").appName(app_name).enableHiveSupport().getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    # sc.addPyFile("/srv/apps/strategy_embedding/utils/date.py")
    ti = pti.TiContext(spark)
    ti.tidbMapDatabase("jerry_test")
    return spark


def get_device_tags(spark):
    sql = """
    SELECT cl_id, first_demands, first_solutions, first_positions, second_demands,
           second_solutions, second_positions, projects, business_tags
    FROM user_tag3_portrait
    WHERE date = '{}'
    """.format(get_ndays_before(1))
    return spark.sql(sql).toPandas()


def get_click_data(spark, card_type, start, end):
    reg = r"""^\\d+$"""
    sql = """
        SELECT DISTINCT t1.cl_id device_id, cast(t1.business_id as int) card_id,t1.partition_date
          FROM
          (select partition_date,cl_id,business_id,action,page_name,page_stay,time_stamp
          from online.bl_hdfs_maidian_updates
          where action = 'page_view'
            AND partition_date BETWEEN '{}' AND '{}'
            AND page_name='{}_detail'
            AND page_stay>=5
            AND cl_id is not null
            AND cl_id != ''
            AND business_id is not null
            AND business_id != ''
            AND business_id rlike '{}'
            ) AS t1
          JOIN
          (select partition_date,active_type,first_channel_source_type,device_id
          from online.ml_device_day_active_status
          where partition_date BETWEEN '{}' AND '{}'
            AND active_type IN ('1', '2', '4')
            AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
                  ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
                  ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
                  ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
                  ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
                  ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
                  ,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
            AND first_channel_source_type not LIKE 'promotion\\_jf\\_%') as t2
          ON t1.cl_id = t2.device_id
          AND t1.partition_date = t2.partition_date
      LEFT JOIN
    (
        SELECT DISTINCT device_id
        FROM ml.ml_d_ct_dv_devicespam_d  --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
        WHERE partition_day='{}'

        UNION ALL
        SELECT DISTINCT device_id
        FROM dim.dim_device_user_staff   --去除内网用户
    )spam_pv
    on spam_pv.device_id=t1.cl_id
        LEFT JOIN
    (
        SELECT partition_date,device_id
        FROM
        (--找出user_id当天活跃的第一个设备id
            SELECT user_id,partition_date,
                    if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
              FROM online.ml_user_updates
              WHERE partition_date>='{}' AND partition_date<'{}'
        )t1
        JOIN
        (  --医生账号
            SELECT distinct user_id
            FROM online.tl_hdfs_doctor_view
            WHERE partition_date = '{}'

            --马甲账号/模特用户
            UNION ALL
            SELECT user_id
            FROM ml.ml_c_ct_ui_user_dimen_d
            WHERE partition_day = '{}'
            AND (is_puppet = 'true' or is_classifyuser = 'true')

            UNION ALL
            --公司内网覆盖用户
            select distinct user_id
            from dim.dim_device_user_staff

            UNION ALL
            --登陆过医生设备
            SELECT distinct t1.user_id
            FROM
            (
                SELECT user_id, v.device_id as device_id
                FROM online.ml_user_history_detail
                    LATERAL VIEW EXPLODE(device_history_list) v AS device_id
                WHERE partition_date = '{}'
            )t1
            JOIN
            (
                SELECT device_id
                FROM online.ml_device_history_detail
                WHERE partition_date = '{}'
                AND is_login_doctor = '1'
            )t2
                ON t1.device_id = t2.device_id
        )t2
        on t1.user_id=t2.user_id
        group by partition_date,device_id
    )dev
    on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id
    WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='')
    and (dev.device_id is null or dev.device_id ='')
    """.format(start, end, card_type, reg, start, end, end, start, end, end, end, end, end)
    return spark.sql(sql).toPandas()


def get_exposure_data(spark, card_type, start, end):
    reg = r"""^\\d+$"""
    sql = """
        SELECT DISTINCT
        t1.cl_id device_id,
        cast(card_id AS int) card_id,
        t1.partition_date
        FROM (select * from online.ml_community_precise_exposure_detail
        where cl_id IS NOT NULL
        AND card_id IS NOT NULL
        AND card_id rlike '{}'
        AND action='page_precise_exposure'
        AND card_content_type = '{}'
        AND is_exposure = 1
        ) AS t1
        LEFT JOIN online.ml_device_day_active_status AS t2
        ON t1.cl_id = t2.device_id
        AND t1.partition_date = t2.partition_date
        LEFT JOIN
        (
          SELECT DISTINCT device_id
          FROM ml.ml_d_ct_dv_devicespam_d  --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
          WHERE partition_day='{}'
          UNION ALL
          SELECT DISTINCT device_id
          FROM dim.dim_device_user_staff   --去除内网用户
        )spam_pv
        on spam_pv.device_id=t1.cl_id
        LEFT JOIN
        (
        SELECT partition_date,device_id
        FROM
        (--找出user_id当天活跃的第一个设备id
            SELECT user_id,partition_date,
                    if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
              FROM online.ml_user_updates
              WHERE partition_date>='{}' AND partition_date<'{}'
        )t1
        JOIN
        (  --医生账号
            SELECT distinct user_id
            FROM online.tl_hdfs_doctor_view
            WHERE partition_date = '{}'

            --马甲账号/模特用户
            UNION ALL
            SELECT user_id
            FROM ml.ml_c_ct_ui_user_dimen_d
            WHERE partition_day = '{}'
            AND (is_puppet = 'true' or is_classifyuser = 'true')

            UNION ALL
            --公司内网覆盖用户
            select distinct user_id
            from dim.dim_device_user_staff

            UNION ALL
            --登陆过医生设备
            SELECT distinct t1.user_id
            FROM
            (
                SELECT user_id, v.device_id as device_id
                FROM online.ml_user_history_detail
                    LATERAL VIEW EXPLODE(device_history_list) v AS device_id
                WHERE partition_date = '{}'
            )t1
            JOIN
            (
                SELECT device_id
                FROM online.ml_device_history_detail
                WHERE partition_date = '{}'
                AND is_login_doctor = '1'
            )t2
                ON t1.device_id = t2.device_id
        )t2
        on t1.user_id=t2.user_id
        group by partition_date,device_id
    )dev
    on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id
    WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='')
    and (dev.device_id is null or dev.device_id ='')
      AND t2.partition_date BETWEEN '{}' AND '{}'
      AND t2.active_type IN ('1','2','4')
      AND t2.first_channel_source_type NOT IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
                      ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
                      ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
                      ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
                      ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
                      ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
                      ,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
      AND t2.first_channel_source_type not LIKE 'promotion\\_jf\\_%'
      """.format(reg, card_type, end, start, end, end, end, end, end, start, end)
    return spark.sql(sql).toPandas()


def get_card_feature_df(spark, card_type, yesterday):
    content_tag3 = get_content_tag3(spark, card_type)
    content_tag3.createOrReplaceTempView("content_tag3")

    reg = r"""^\\d+$"""
    sql = """
          SELECT CAST(card_id as INT) as card_id,
               partition_date,
               is_pure_author,
               is_have_pure_reply,
               is_have_reply,
               CAST(card_feature.content_level as FLOAT) as content_level,
               topic_seven_click_num,
               topic_thirty_click_num,
               topic_num,
               seven_transform_num,
               thirty_transform_num,
               favor_num,
               favor_pure_num,
               vote_num,
               vote_display_num,
               reply_num,
               reply_pure_num,
               one_click_num,
               three_click_num,
               seven_click_num,
               fifteen_click_num,
               thirty_click_num,
               sixty_click_num,
               ninety_click_num,
               history_click_num,
               one_precise_exposure_num,
               three_precise_exposure_num,
               seven_precise_exposure_num,
               fifteen_precise_exposure_num,
               thirty_precise_exposure_num,
               sixty_precise_exposure_num,
               ninety_precise_exposure_num,
               history_precise_exposure_num,
               one_vote_user_num,
               three_vote_user_num,
               seven_vote_user_num,
               fifteen_vote_user_num,
               thirty_vote_user_num,
               sixty_vote_user_num,
               ninety_vote_user_num,
               history_vote_user_num,
               one_reply_user_num,
               three_reply_user_num,
               seven_reply_user_num,
               fifteen_reply_user_num,
               thirty_reply_user_num,
               sixty_reply_user_num,
               ninety_reply_user_num,
               history_reply_user_num,
               one_browse_user_num,
               three_browse_user_num,
               seven_browse_user_num,
               fifteen_browse_user_num,
               thirty_browse_user_num,
               sixty_browse_user_num,
               ninety_browse_user_num,
               history_browse_user_num,
               one_reply_num,
               three_reply_num,
               seven_reply_num,
               fifteen_reply_num,
               thirty_reply_num,
               sixty_reply_num,
               ninety_reply_num,
               history_reply_num,
               one_ctr,
               three_ctr,
               seven_ctr,
               fifteen_ctr,
               thirty_ctr,
               sixty_ctr,
               ninety_ctr,
               history_ctr,
               one_vote_pure_rate,
               three_vote_pure_rate,
               seven_vote_pure_rate,
               fifteen_vote_pure_rate,
               thirty_vote_pure_rate,
               sixty_vote_pure_rate,
               ninety_vote_pure_rate,
               history_vote_pure_rate,
               one_reply_pure_rate,
               three_reply_pure_rate,
               seven_reply_pure_rate,
               fifteen_reply_pure_rate,
               thirty_reply_pure_rate,
               sixty_reply_pure_rate,
               ninety_reply_pure_rate,
               history_reply_pure_rate,
               IFNULL(content_tag3.first_demands, "") AS card_first_demands,
               IFNULL(content_tag3.second_demands, "") AS card_second_demands,
               IFNULL(content_tag3.first_solutions, "") AS card_first_solutions,
               IFNULL(content_tag3.second_solutions, "") AS card_second_solutions,
               IFNULL(content_tag3.first_positions, "") AS card_first_positions,
               IFNULL(content_tag3.second_positions, "") AS card_second_positions,
               IFNULL(content_tag3.project_tags, "") AS card_projects
          FROM
          online.al_community_forecast_character_day_v3 card_feature
          JOIN content_tag3
          ON card_feature.card_id = content_tag3.id
          where partition_date = '{}'
           and card_content_type = '{}'
           and card_id rlike '{}'
       """.format(yesterday, card_type, reg)
    return spark.sql(sql).toPandas()