# -*- coding:UTF-8 -*-
# @Time  : 2020/11/6 10:00
# @File  : recommend_strategy_fix.py
# @email : litao@igengmei.com
# @author : litao

import hashlib
import json

import pymysql
import xlwt, datetime
# import redis
# from pyhive import hive
# from maintenance.func_send_email_with_file import send_file_email
# from typing import Dict, List
# from elasticsearch_7 import Elasticsearch
# from elasticsearch_7.helpers import scan
# import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti

db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
                     db='doris_olap')
cursor = db.cursor()


def con_sql(sql):
    # 从数据库的表里获取数据
    db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
                         db='doris_olap')
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    db.close()
    return result


startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
              "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
              "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
              "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
              "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
# sparkConf.set("prod.jerry.jdbcuri",
#               "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
# sparkConf.set("prod.tidb.database", "jerry_prod")

spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
         .config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
    "LR PYSPARK TEST").enableHiveSupport().getOrCreate())

spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")


for t in range(0, 3):
    day_num = 0 - t
    now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
    last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d")
    today_str = now.strftime("%Y%m%d")
    yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
    one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")


    ctr_sql = """
    SELECT
        t1.partition_date as day_id,
        t1.device_os_type as device_os_type,
        t1.active_type as active_type,
        t2.card_content_type as card_content_type,
        t2.recommend_type as recommend_type,
        NVL(sum(t3.session_pv),0) as card_click,
        NVL(sum(t2.session_pv),0) as card_exposure,
        NVL(round(sum(page_stay)/count(distinct t4.cl_id)/60,2),0) as avg_page_stay,
        NVL(sum(t4_pv.pv),0) as page_pv,
        NVL(sum(t4_pv.pv_20),0) as page_pv_20,
        NVL(sum(navbar_pv),0) as navbar_search,
        NVL(sum(highlight_pv),0) as highlight_word,
        NVL(sum(self_wel_pv),0) as self_welfare_card,
        NVL(sum(recom_wel_pv),0)-NVL(sum(self_wel_pv),0) as recommend_welfare_card,--需要排除关联的商品卡片点击
        NVL(sum(recom_content_pv),0) as recommend_content_card,
        NULL as recommend_special_card,
        NVL(sum(referral_pv),0) as transfer_card,
        NVL(sum(video_pv),0) as video_consultation,
        NVL(sum(post_pv),0) as total_post_pv,
        NVL(sum(post_click_pv),0) as post_click_pv
    
    FROM

    (
        SELECT partition_date
             ,device_os_type
             ,CASE WHEN active_type = '4'  THEN '老活'
                   WHEN active_type  IN ('1','2')  THEN '新增' END AS active_type
             ,device_id
        FROM online.ml_device_day_active_status
        WHERE partition_date={partition_day}
          AND active_type IN ('1','2','4')
          AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
            ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
            ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
            ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
            ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
            ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
            ,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
          AND first_channel_source_type not LIKE 'promotion\_jf\_%'
    )t1 JOIN
    
     (--卡片,卡片id和session_id去重
     
     SELECT  partition_date,
                    cl_id
                from 
           ( SELECT  partition_date,
                    cl_id,
                    count(distinct app_session_id) as session_pv0
            FROM
            (
               SELECT partition_date,
                      cl_id,
                      case when params['card_content_type'] in ('qa','answer') then 'qa'
                           when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end as card_content_type,
                      CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
                           when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计')
                           WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计')
                           when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片')
                           when params['transaction_type'] in ('newdata') then array('保量卡片')
                           when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计')
                           when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计')
                           when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计')
                           end AS recommend_type,
                      params['card_id'] as card_id,
                      app_session_id
               from online.bl_hdfs_maidian_updates
               WHERE partition_date={partition_day}
               AND action='on_click_card'
               AND params['page_name'] ='home'
               AND params['tab_name'] = '精选'
               AND (params['transaction_type'] in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO')
                    or params['transaction_type'] like '%ctr' or params['transaction_type'] like '%cvr' or params['transaction_type'] like 'deeplink%')
               AND params['card_content_type'] in ('qa','diary','user_post','answer','special_pool')
               GROUP BY partition_date,
                      cl_id,
                      case when params['card_content_type'] in ('qa','answer') then 'qa'
                           when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end,
                      CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
                           when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计')
                           WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计')
                           when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片')
                           when params['transaction_type'] in ('newdata') then array('保量卡片')
                           when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计')
                           when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计')
                           when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end,
                      params['card_id'],
                      app_session_id
            )a
            LATERAL VIEW explode (a.recommend_type) v as recommend_type
            group by partition_date,card_content_type,cl_id,v.recommend_type,card_id having session_pv0 >0
        
          UNION 
     SELECT  partition_date,            
                    cl_id,
                    count(distinct card_id) as session_pv0
            FROM
                     (SELECT partition_date,
                      cl_id,
                      case when card_content_type in ('qa','answer') then 'qa'
                           when card_content_type in ('special_pool') then 'special' else card_content_type end as card_content_type,
                      CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr') ) THEN array('ctr预估','合计')
                           when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN transaction_type like '%cvr' THEN array('cvr预估','合计')
                           WHEN transaction_type in ('-1','smr') THEN array('smr','合计')
                           when transaction_type in ('pgc','hotspot') then array('热点卡片')
                           when transaction_type in ('newdata') then array('保量卡片')
                           when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计')
                           when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计')
                           when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计')
                           when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when transaction_type like 'deeplink%' then array('deeplink策略','合计')
                           end AS recommend_type,
                      card_id,
                      app_session_id
               from online.ml_community_precise_exposure_detail
               WHERE partition_date={partition_day}
               AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure
               AND is_exposure = '1'  ----精准曝光
               AND page_name ='home'
               AND tab_name = '精选'
               AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO')
                    or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%')
               AND card_content_type in ('qa','diary','user_post','answer','special_pool')
               group by partition_date,
                      case when card_content_type in ('qa','answer') then 'qa'
                           when card_content_type in ('special_pool') then 'special' else card_content_type end,
                      cl_id,
                      CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
                           when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN transaction_type like '%cvr' THEN array('cvr预估','合计')
                           WHEN transaction_type in ('-1','smr') THEN array('smr','合计')
                           when transaction_type in ('pgc','hotspot') then array('热点卡片')
                           when transaction_type in ('newdata') then array('保量卡片')
                           when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计')
                           when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计')
                           when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计')
                           when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when transaction_type like 'deeplink%' then array('deeplink策略','合计') end,
                      card_id,
                      app_session_id
            )a
            LATERAL VIEW explode (a.recommend_type) v as recommend_type
            group by partition_date,cl_id having  session_pv0 >= 4) group by partition_date,cl_id 
    ) t0  on t1.device_id = t0.cl_id 
 LEFT JOIN
        (--精准曝光,卡片id和session_id去重
            SELECT  partition_date,
                    card_content_type,
                    cl_id,
                    v.recommend_type,
                    card_id,
                    count(distinct app_session_id) as session_pv
            FROM
            (
               SELECT partition_date,
                      cl_id,
                      case when card_content_type in ('qa','answer') then 'qa'
                           when card_content_type in ('special_pool') then 'special' else card_content_type end as card_content_type,
                      CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr') ) THEN array('ctr预估','合计')
                           when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN transaction_type like '%cvr' THEN array('cvr预估','合计')
                           WHEN transaction_type in ('-1','smr') THEN array('smr','合计')
                           when transaction_type in ('pgc','hotspot') then array('热点卡片')
                           when transaction_type in ('newdata') then array('保量卡片')
                           when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计')
                           when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计')
                           when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计')
                           when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when transaction_type like 'deeplink%' then array('deeplink策略','合计')
                           end AS recommend_type,
                      card_id,
                      app_session_id
               from online.ml_community_precise_exposure_detail
               WHERE partition_date={partition_day}
               AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure
               AND is_exposure = '1'  ----精准曝光
               AND page_name ='home'
               AND tab_name = '精选'
               AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO')
                    or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%')
               AND card_content_type in ('qa','diary','user_post','answer','special_pool')
               group by partition_date,
                      case when card_content_type in ('qa','answer') then 'qa'
                           when card_content_type in ('special_pool') then 'special' else card_content_type end,
                      cl_id,
                      CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
                           when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN transaction_type like '%cvr' THEN array('cvr预估','合计')
                           WHEN transaction_type in ('-1','smr') THEN array('smr','合计')
                           when transaction_type in ('pgc','hotspot') then array('热点卡片')
                           when transaction_type in ('newdata') then array('保量卡片')
                           when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计')
                           when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计')
                           when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计')
                           when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when transaction_type like 'deeplink%' then array('deeplink策略','合计') end,
                      card_id,
                      app_session_id
            )a
            LATERAL VIEW explode (a.recommend_type) v as recommend_type
            group by partition_date,card_content_type,cl_id,v.recommend_type,card_id
          )t2  
        on t0.cl_id=t2.cl_id and t0.partition_date=t2.partition_date 
        LEFT JOIN
        (--卡片,卡片id和session_id去重
            SELECT  partition_date,
                    card_content_type,
                    cl_id,
                    v.recommend_type,
                    card_id,
                    count(distinct app_session_id) as session_pv
            FROM
            (
               SELECT partition_date,
                      cl_id,
                      case when params['card_content_type'] in ('qa','answer') then 'qa'
                           when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end as card_content_type,
                      CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
                           when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计')
                           WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计')
                           when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片')
                           when params['transaction_type'] in ('newdata') then array('保量卡片')
                           when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计')
                           when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计')
                           when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计')
                           end AS recommend_type,
                      params['card_id'] as card_id,
                      app_session_id
               from online.bl_hdfs_maidian_updates
               WHERE partition_date={partition_day}
               AND action='on_click_card'
               AND params['page_name'] ='home'
               AND params['tab_name'] = '精选'
               AND (params['transaction_type'] in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO')
                    or params['transaction_type'] like '%ctr' or params['transaction_type'] like '%cvr' or params['transaction_type'] like 'deeplink%')
               AND params['card_content_type'] in ('qa','diary','user_post','answer','special_pool')
               GROUP BY partition_date,
                      cl_id,
                      case when params['card_content_type'] in ('qa','answer') then 'qa'
                           when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end,
                      CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计')
                           when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
                           WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
                           when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计')
                           WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计')
                           WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计')
                           when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片')
                           when params['transaction_type'] in ('newdata') then array('保量卡片')
                           when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计')
                           when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计')
                           when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
                           when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
                           when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end,
                      params['card_id'],
                      app_session_id
            )a
            LATERAL VIEW explode (a.recommend_type) v as recommend_type
            group by partition_date,card_content_type,cl_id,v.recommend_type,card_id
          )t3
        on t2.partition_date=t3.partition_date
            and t2.cl_id=t3.cl_id
            and t2.card_id=t3.card_id
            and t2.card_content_type=t3.card_content_type
            and t2.recommend_type=t3.recommend_type
        LEFT JOIN
        (--页面浏览时长
            select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name
                    ,sum(page_stay) as page_stay
            from
            (
              SELECT  partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                      page_stay,time_str
              FROM online.bl_hdfs_maidian_updates
              WHERE partition_date={partition_day}
              AND action='page_view'
              AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
              AND referrer='home'
              AND page_stay>=0 AND page_stay<1000
              GROUP BY partition_date,cl_id,business_id,
                        case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end,page_stay,time_str
            )a
            left join
            (
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
            group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
        )t4
        on t4.partition_date=t3.partition_date
            and t4.cl_id=t3.cl_id
            and t4.business_id=t3.card_id
            and t4.page_name=t3.card_content_type
        LEFT JOIN
        (--页面浏览时长
            select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name
                   ,count(1) as pv
                   ,count(case when page_stay>=20 then 1 end) as pv_20
            from
            (
              SELECT  partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,time_str,page_stay
              FROM online.bl_hdfs_maidian_updates
              WHERE partition_date={partition_day}
              AND action='page_view'
              AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
              AND referrer='home'
              GROUP BY partition_date,cl_id,business_id,
                        case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end,time_str,page_stay
            )a
            left join
            (--在8月份新增了内容专题卡片,需要转换下id
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
            group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
        )t4_pv
        on t4_pv.partition_date=t3.partition_date
            and t4_pv.cl_id=t3.cl_id
            and t4_pv.business_id=t3.card_id
            and t4_pv.page_name=t3.card_content_type
        LEFT JOIN
        (--搜索框和点击行为
            select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,navbar_pv
            from
            (
              SELECT partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                     count(1) as navbar_pv
              FROM online.bl_hdfs_maidian_updates
              WHERE partition_date={partition_day}
              AND action in ('on_click_navbar_search','do_search')
              AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
              AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
              group by partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
            )a
            left join
            (
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
          )t5
        on t5.partition_date=t3.partition_date
            and t5.cl_id=t3.cl_id
            and t5.business_id=t3.card_id
            and t5.page_name=t3.card_content_type
        LEFT JOIN
        (--点击高亮词
            select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,highlight_pv
            from
            (
              SELECT partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                     count(1) as highlight_pv
              FROM online.bl_hdfs_maidian_updates
              WHERE partition_date={partition_day}
              AND action='on_click_card'
              and params['card_type']='highlight_word'
              AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
              AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
              group by partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
            )a
            left join
            (
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
        )t6
        on t6.partition_date=t3.partition_date
            and t6.cl_id=t3.cl_id
            and t6.business_id=t3.card_id
            and t6.page_name=t3.card_content_type
        LEFT JOIN
        (--关联的美购卡片
            SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as self_wel_pv
            FROM
            (
                SELECT  partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id,
                         case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                        count(1) as pv
                FROM online.bl_hdfs_maidian_updates
                WHERE partition_date={partition_day}
                AND (get_json_object(params['extra_param'], '$.type')='交互栏'
               or get_json_object(params['extra_param'], '$.jump_from')='msg_link'
               or params['in_page_pos']='top'
               or (params['in_page_pos']='bottom' and params['position'] is null and cl_type='android')
            or (params['in_page_pos']='bottom' and params['card_but_pos'] is not null and cl_type='ios'))
                AND action='on_click_card'
                and params['card_content_type']='service'
                AND page_name IN ('diary_detail','topic_detail')
                AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
                group by partition_date,cl_id,business_id,app_session_id,params['card_id'],
                         case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
            )a
            left join
            (
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
            group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
          )t7
        on t7.partition_date=t3.partition_date
            and t7.cl_id=t3.cl_id
            and t7.business_id=t3.card_id
            and t7.page_name=t3.card_content_type
        LEFT JOIN
        (--推荐的美购卡片(需要排除作者消费的美购)
            SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as recom_wel_pv
            FROM
            (
                SELECT  partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id,
                         case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                        count(1) as service_pv
                FROM online.bl_hdfs_maidian_updates
                WHERE partition_date={partition_day}
                AND (action='on_click_card'and  params['card_content_type']='service'
                       or action='on_click_button' and params['button_name']='unfold' and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail')
                       or action='on_click_button' and params['button_name'] = 'more_recommendations')
                AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
                AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
                group by partition_date,cl_id,business_id,app_session_id,params['card_id'],
                         case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
            )a
            left join
            (
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
            group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
        )t8
        on t8.partition_date=t3.partition_date
            and t8.cl_id=t3.cl_id
            and t8.business_id=t3.card_id
            and t8.page_name=t3.card_content_type
        LEFT JOIN
        (--推荐的内容卡片
            SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as recom_content_pv
            FROM
            (
                SELECT  partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id,
                         case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                        count(1) as service_pv
                FROM online.bl_hdfs_maidian_updates
                WHERE partition_date={partition_day}
                AND action='on_click_card'
                and params['card_content_type'] in ('qa','diary','user_post','answer')
                AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
                AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
                group by partition_date,cl_id,business_id,app_session_id,params['card_id'],
                         case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
            )a
            left join
            (
              select id,visual_page_id,'special' as page_name
              from tl.tl_zx_api_special_pool
              where partition_day ={partition_day}
              group by id,visual_page_id
            )b
            on a.business_id=b.visual_page_id and a.page_name=b.page_name
            group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
          )t9
        on t9.partition_date=t3.partition_date
            and t9.cl_id=t3.cl_id
            and t9.business_id=t3.card_id
            and t9.page_name=t3.card_content_type
        LEFT JOIN
        (--视频面诊点击
          select  partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,video_pv
          from
          (
            SELECT partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                     count(1) as video_pv
              FROM online.bl_hdfs_maidian_updates
              WHERE partition_date={partition_day}
              AND action='on_click_button'
              and params['button_name']='video_interview'
              AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
              AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
              group by partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
          )a
          left join
          (
            select id,visual_page_id,'special' as page_name
            from tl.tl_zx_api_special_pool
            where partition_day ={partition_day}
            group by id,visual_page_id
          )b
          on a.business_id=b.visual_page_id and a.page_name=b.page_name
        )t10
        on t10.partition_date=t3.partition_date
            and t10.cl_id=t3.cl_id
            and t10.business_id=t3.card_id
            and t10.page_name=t3.card_content_type
        LEFT JOIN
        (--转诊按钮点击
          select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,referral_pv
          from
          (
            SELECT partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end as page_name,
                     count(1) as referral_pv
              FROM online.bl_hdfs_maidian_updates
              WHERE partition_date={partition_day}
              AND action='on_click_button'
              and params['button_name']='referral'
              AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
              AND (referrer='home' or
                      (params['referrer_link'] like '%[%' and
                       json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
              group by partition_date,cl_id,business_id,
                      case when page_name in ('diary_detail','topic_detail') then 'diary'
                           when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                           when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
                           when page_name in ('custom_special') then 'special' else null end
          )a
          left join
          (
            select id,visual_page_id,'special' as page_name
            from tl.tl_zx_api_special_pool
            where partition_day ={partition_day}
            group by id,visual_page_id
          )b
          on a.business_id=b.visual_page_id and a.page_name=b.page_name
        )t11
        on t11.partition_date=t3.partition_date
            and t11.cl_id=t3.cl_id
            and t11.business_id=t3.card_id
            and t11.page_name=t3.card_content_type
        LEFT JOIN
        (--从帖子页到帖子页
            SELECT partition_date,cl_id,params['referrer_id'] as business_id,
                    case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                         else null end as page_name,
                   count(distinct params['business_id'],app_session_id) as post_pv
            FROM online.bl_hdfs_maidian_updates
            WHERE partition_date={partition_day}
            AND action='page_view'
            AND page_name IN ('post_detail','user_post_detail','doctor_post_detail','custom_special')
            AND (json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]IN ('post_detail','user_post_detail','doctor_post_detail')
                  and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-2]='home')
            group by partition_date,cl_id,params['referrer_id'],
                    case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                         else null end
        )t12
        on t12.partition_date=t3.partition_date
            and t12.cl_id=t3.cl_id
            and t12.business_id=t3.card_id
            and t12.page_name=t3.card_content_type
        left join
        (--在帖子页点击帖子
            SELECT partition_date,cl_id,params['business_id'] as business_id,
                    case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                         else null end as page_name,
                   count(distinct params['card_id'],app_session_id) as post_click_pv
            FROM online.bl_hdfs_maidian_updates
            WHERE partition_date={partition_day}
            AND action='on_click_card'
            and params['card_content_type'] in ('user_post')
            AND page_name IN ('post_detail','user_post_detail','doctor_post_detail')
            AND (referrer='home' or
                  (params['referrer_link'] like '%[%' and
                   json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
            group by partition_date,cl_id,params['business_id'],
                    case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
                         else null end
        )t13
        on t13.partition_date=t3.partition_date
            and t13.cl_id=t3.cl_id
            and t13.business_id=t3.card_id
            and t13.page_name=t3.card_content_type
        LEFT JOIN
        (
    select distinct device_id
    from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
    where PARTITION_DAY = '{partition_day}'
    AND is_abnormal_device = 'true'
)dev
        on  t2.cl_id=dev.device_id
    WHERE dev.device_id is null
    GROUP BY t1.partition_date,t1.device_os_type,t1.active_type,t2.card_content_type,t2.recommend_type
    order by day_id,device_os_type,active_type,card_content_type,recommend_type
    """.format(partition_day=today_str)
    ctr_df = spark.sql(ctr_sql)
    ctr_df.createOrReplaceTempView("temp_ctr")

    grey_select_sql = """SELECT *,
        NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_exposure,5),0) as recommend_ctr,
        NVL(ROUND(card_click/card_exposure,5),0) as click_ctr,
        NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_click,5),0) as second_jump_ctr ,
        NVL(ROUND(page_pv_20/page_pv,5),0) as page_pv_20_percent 
        FROM temp_ctr"""
    device_df = spark.sql(grey_select_sql)

    device_df.show(1, False)
    sql_res = device_df.collect()

    print("-----------------------------------------------------------------------------")
    for res in sql_res:
        print(res)
        day_id = res.day_id
        device_os_type = res.device_os_type
        active_type = res.active_type
        card_content_type = res.card_content_type
        recommend_type = res.recommend_type
        card_click = res.card_click
        card_exposure = res.card_exposure
        avg_page_stay = res.avg_page_stay
        navbar_search = res.navbar_search
        highlight_word = res.highlight_word
        self_welfare_card = res.self_welfare_card
        page_pv_20 = res.page_pv_20
        page_pv_20_percent = res.page_pv_20_percent
        if not card_content_type or not recommend_type:
            continue

        recommend_welfare_card = res.recommend_welfare_card
        recommend_content_card = res.recommend_content_card
        if not recommend_content_card:
            recommend_content_card = 0
        recommend_special_card = res.recommend_special_card
        if not recommend_special_card:
            recommend_special_card = 0
        transfer_card = res.transfer_card
        video_consultation = res.video_consultation
        partition_day = today_str
        recommend_ctr = res.recommend_ctr
        second_jump_ctr = res.second_jump_ctr
        click_ctr = res.click_ctr
        pid = hashlib.md5((day_id + device_os_type + active_type + card_content_type + recommend_type).encode("utf8")).hexdigest()

        instert_sql = """replace into recommend_strategy_d_fix(
        day_id,device_os_type,active_type,card_content_type,recommend_type,card_click,card_exposure,avg_page_stay,navbar_search,
        highlight_word,self_welfare_card,recommend_welfare_card,recommend_content_card,recommend_special_card,transfer_card,video_consultation,
        partition_day,pid,recommend_ctr,second_jump_ctr,click_ctr,page_pv_20_percent
        ) VALUES('{day_id}','{device_os_type}','{active_type}','{card_content_type}','{recommend_type}',{card_click},{card_exposure},
        {avg_page_stay},{navbar_search},{highlight_word},{self_welfare_card},{recommend_welfare_card},{recommend_content_card},{recommend_special_card},
        {transfer_card},{video_consultation},'{partition_day}','{pid}',{recommend_ctr},{second_jump_ctr},{click_ctr},{page_pv_20_percent});""".format(
            day_id=day_id, device_os_type=device_os_type, active_type=active_type, card_content_type=card_content_type,
            card_click=card_click, recommend_type=recommend_type, card_exposure=card_exposure, avg_page_stay=avg_page_stay,
            navbar_search=navbar_search, self_welfare_card=self_welfare_card, recommend_welfare_card=recommend_welfare_card,
            recommend_content_card=recommend_content_card, recommend_special_card=recommend_special_card,page_pv_20_percent=page_pv_20_percent,
            transfer_card=transfer_card,
            video_consultation=video_consultation, partition_day=partition_day, pid=pid, recommend_ctr=recommend_ctr,
            second_jump_ctr=second_jump_ctr, click_ctr=click_ctr,highlight_word=highlight_word
        )
        print(instert_sql)
        # cursor.execute("set names 'UTF8'")
        res = cursor.execute(instert_sql)
        db.commit()
        print(res)
# cursor.executemany()
db.close()