# -*- coding:UTF-8 -*- # @Time : 2020/11/6 10:00 # @File : recommend_strategy_fix.py # @email : litao@igengmei.com # @author : litao import hashlib import json import pymysql import xlwt, datetime # import redis # from pyhive import hive # from maintenance.func_send_email_with_file import send_file_email # from typing import Dict, List # from elasticsearch_7 import Elasticsearch # from elasticsearch_7.helpers import scan # import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame # from pyspark.sql.functions import lit # import pytispark.pytispark as pti db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_olap') cursor = db.cursor() def con_sql(sql): # 从数据库的表里获取数据 db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_olap') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result startTime = time.time() sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") # sparkConf.set("prod.jerry.jdbcuri", # "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") # sparkConf.set("prod.tidb.database", "jerry_prod") spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") .config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName( "LR PYSPARK TEST").enableHiveSupport().getOrCreate()) spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") for t in range(0, 3): day_num = 0 - t now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d") today_str = now.strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") ctr_sql = """ SELECT t1.partition_date as day_id, t1.device_os_type as device_os_type, t1.active_type as active_type, t2.card_content_type as card_content_type, t2.recommend_type as recommend_type, NVL(sum(t3.session_pv),0) as card_click, NVL(sum(t2.session_pv),0) as card_exposure, NVL(round(sum(page_stay)/count(distinct t4.cl_id)/60,2),0) as avg_page_stay, NVL(sum(t4_pv.pv),0) as page_pv, NVL(sum(t4_pv.pv_20),0) as page_pv_20, NVL(sum(navbar_pv),0) as navbar_search, NVL(sum(highlight_pv),0) as highlight_word, NVL(sum(self_wel_pv),0) as self_welfare_card, NVL(sum(recom_wel_pv),0)-NVL(sum(self_wel_pv),0) as recommend_welfare_card,--需要排除关联的商品卡片点击 NVL(sum(recom_content_pv),0) as recommend_content_card, NULL as recommend_special_card, NVL(sum(referral_pv),0) as transfer_card, NVL(sum(video_pv),0) as video_consultation, NVL(sum(post_pv),0) as total_post_pv, NVL(sum(post_click_pv),0) as post_click_pv FROM ( SELECT partition_date ,device_os_type ,CASE WHEN active_type = '4' THEN '老活' WHEN active_type IN ('1','2') THEN '新增' END AS active_type ,device_id FROM online.ml_device_day_active_status WHERE partition_date={partition_day} AND active_type IN ('1','2','4') AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') AND first_channel_source_type not LIKE 'promotion\_jf\_%' )t1 JOIN (--卡片,卡片id和session_id去重 SELECT partition_date, cl_id from ( SELECT partition_date, cl_id, count(distinct app_session_id) as session_pv0 FROM ( SELECT partition_date, cl_id, case when params['card_content_type'] in ('qa','answer') then 'qa' when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end as card_content_type, CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计') when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计') when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计') WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计') when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片') when params['transaction_type'] in ('newdata') then array('保量卡片') when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计') when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计') when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计') when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end AS recommend_type, params['card_id'] as card_id, app_session_id from online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_card' AND params['page_name'] ='home' AND params['tab_name'] = '精选' AND (params['transaction_type'] in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO') or params['transaction_type'] like '%ctr' or params['transaction_type'] like '%cvr' or params['transaction_type'] like 'deeplink%') AND params['card_content_type'] in ('qa','diary','user_post','answer','special_pool') GROUP BY partition_date, cl_id, case when params['card_content_type'] in ('qa','answer') then 'qa' when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end, CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计') when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计') when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计') WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计') when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片') when params['transaction_type'] in ('newdata') then array('保量卡片') when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计') when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计') when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计') when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end, params['card_id'], app_session_id )a LATERAL VIEW explode (a.recommend_type) v as recommend_type group by partition_date,card_content_type,cl_id,v.recommend_type,card_id having session_pv0 >0 UNION SELECT partition_date, cl_id, count(distinct card_id) as session_pv0 FROM (SELECT partition_date, cl_id, case when card_content_type in ('qa','answer') then 'qa' when card_content_type in ('special_pool') then 'special' else card_content_type end as card_content_type, CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计') when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr') ) THEN array('ctr预估','合计') when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN transaction_type like '%cvr' THEN array('cvr预估','合计') WHEN transaction_type in ('-1','smr') THEN array('smr','合计') when transaction_type in ('pgc','hotspot') then array('热点卡片') when transaction_type in ('newdata') then array('保量卡片') when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计') when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计') when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计') when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when transaction_type like 'deeplink%' then array('deeplink策略','合计') end AS recommend_type, card_id, app_session_id from online.ml_community_precise_exposure_detail WHERE partition_date={partition_day} AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure AND is_exposure = '1' ----精准曝光 AND page_name ='home' AND tab_name = '精选' AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO') or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%') AND card_content_type in ('qa','diary','user_post','answer','special_pool') group by partition_date, case when card_content_type in ('qa','answer') then 'qa' when card_content_type in ('special_pool') then 'special' else card_content_type end, cl_id, CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计') when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计') when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN transaction_type like '%cvr' THEN array('cvr预估','合计') WHEN transaction_type in ('-1','smr') THEN array('smr','合计') when transaction_type in ('pgc','hotspot') then array('热点卡片') when transaction_type in ('newdata') then array('保量卡片') when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计') when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计') when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计') when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when transaction_type like 'deeplink%' then array('deeplink策略','合计') end, card_id, app_session_id )a LATERAL VIEW explode (a.recommend_type) v as recommend_type group by partition_date,cl_id having session_pv0 >= 4) group by partition_date,cl_id ) t0 on t1.device_id = t0.cl_id LEFT JOIN (--精准曝光,卡片id和session_id去重 SELECT partition_date, card_content_type, cl_id, v.recommend_type, card_id, count(distinct app_session_id) as session_pv FROM ( SELECT partition_date, cl_id, case when card_content_type in ('qa','answer') then 'qa' when card_content_type in ('special_pool') then 'special' else card_content_type end as card_content_type, CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计') when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr') ) THEN array('ctr预估','合计') when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN transaction_type like '%cvr' THEN array('cvr预估','合计') WHEN transaction_type in ('-1','smr') THEN array('smr','合计') when transaction_type in ('pgc','hotspot') then array('热点卡片') when transaction_type in ('newdata') then array('保量卡片') when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计') when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计') when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计') when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when transaction_type like 'deeplink%' then array('deeplink策略','合计') end AS recommend_type, card_id, app_session_id from online.ml_community_precise_exposure_detail WHERE partition_date={partition_day} AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure AND is_exposure = '1' ----精准曝光 AND page_name ='home' AND tab_name = '精选' AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO') or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%') AND card_content_type in ('qa','diary','user_post','answer','special_pool') group by partition_date, case when card_content_type in ('qa','answer') then 'qa' when card_content_type in ('special_pool') then 'special' else card_content_type end, cl_id, CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计') when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计') when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN transaction_type like '%cvr' THEN array('cvr预估','合计') WHEN transaction_type in ('-1','smr') THEN array('smr','合计') when transaction_type in ('pgc','hotspot') then array('热点卡片') when transaction_type in ('newdata') then array('保量卡片') when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计') when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计') when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计') when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when transaction_type like 'deeplink%' then array('deeplink策略','合计') end, card_id, app_session_id )a LATERAL VIEW explode (a.recommend_type) v as recommend_type group by partition_date,card_content_type,cl_id,v.recommend_type,card_id )t2 on t0.cl_id=t2.cl_id and t0.partition_date=t2.partition_date LEFT JOIN (--卡片,卡片id和session_id去重 SELECT partition_date, card_content_type, cl_id, v.recommend_type, card_id, count(distinct app_session_id) as session_pv FROM ( SELECT partition_date, cl_id, case when params['card_content_type'] in ('qa','answer') then 'qa' when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end as card_content_type, CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计') when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计') when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计') WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计') when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片') when params['transaction_type'] in ('newdata') then array('保量卡片') when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计') when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计') when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计') when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end AS recommend_type, params['card_id'] as card_id, app_session_id from online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_card' AND params['page_name'] ='home' AND params['tab_name'] = '精选' AND (params['transaction_type'] in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO') or params['transaction_type'] like '%ctr' or params['transaction_type'] like '%cvr' or params['transaction_type'] like 'deeplink%') AND params['card_content_type'] in ('qa','diary','user_post','answer','special_pool') GROUP BY partition_date, cl_id, case when params['card_content_type'] in ('qa','answer') then 'qa' when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end, CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计') when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计') WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计') when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计') WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计') WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计') when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片') when params['transaction_type'] in ('newdata') then array('保量卡片') when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计') when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计') when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计') when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计') when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计') when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end, params['card_id'], app_session_id )a LATERAL VIEW explode (a.recommend_type) v as recommend_type group by partition_date,card_content_type,cl_id,v.recommend_type,card_id )t3 on t2.partition_date=t3.partition_date and t2.cl_id=t3.cl_id and t2.card_id=t3.card_id and t2.card_content_type=t3.card_content_type and t2.recommend_type=t3.recommend_type LEFT JOIN (--页面浏览时长 select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name ,sum(page_stay) as page_stay from ( SELECT partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, page_stay,time_str FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='page_view' AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND referrer='home' AND page_stay>=0 AND page_stay<1000 GROUP BY partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end,page_stay,time_str )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name )t4 on t4.partition_date=t3.partition_date and t4.cl_id=t3.cl_id and t4.business_id=t3.card_id and t4.page_name=t3.card_content_type LEFT JOIN (--页面浏览时长 select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name ,count(1) as pv ,count(case when page_stay>=20 then 1 end) as pv_20 from ( SELECT partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name,time_str,page_stay FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='page_view' AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND referrer='home' GROUP BY partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end,time_str,page_stay )a left join (--在8月份新增了内容专题卡片,需要转换下id select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name )t4_pv on t4_pv.partition_date=t3.partition_date and t4_pv.cl_id=t3.cl_id and t4_pv.business_id=t3.card_id and t4_pv.page_name=t3.card_content_type LEFT JOIN (--搜索框和点击行为 select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,navbar_pv from ( SELECT partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as navbar_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action in ('on_click_navbar_search','do_search') AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name )t5 on t5.partition_date=t3.partition_date and t5.cl_id=t3.cl_id and t5.business_id=t3.card_id and t5.page_name=t3.card_content_type LEFT JOIN (--点击高亮词 select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,highlight_pv from ( SELECT partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as highlight_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_card' and params['card_type']='highlight_word' AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name )t6 on t6.partition_date=t3.partition_date and t6.cl_id=t3.cl_id and t6.business_id=t3.card_id and t6.page_name=t3.card_content_type LEFT JOIN (--关联的美购卡片 SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as self_wel_pv FROM ( SELECT partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND (get_json_object(params['extra_param'], '$.type')='交互栏' or get_json_object(params['extra_param'], '$.jump_from')='msg_link' or params['in_page_pos']='top' or (params['in_page_pos']='bottom' and params['position'] is null and cl_type='android') or (params['in_page_pos']='bottom' and params['card_but_pos'] is not null and cl_type='ios')) AND action='on_click_card' and params['card_content_type']='service' AND page_name IN ('diary_detail','topic_detail') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id,app_session_id,params['card_id'], case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name )t7 on t7.partition_date=t3.partition_date and t7.cl_id=t3.cl_id and t7.business_id=t3.card_id and t7.page_name=t3.card_content_type LEFT JOIN (--推荐的美购卡片(需要排除作者消费的美购) SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as recom_wel_pv FROM ( SELECT partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as service_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND (action='on_click_card'and params['card_content_type']='service' or action='on_click_button' and params['button_name']='unfold' and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail') or action='on_click_button' and params['button_name'] = 'more_recommendations') AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id,app_session_id,params['card_id'], case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name )t8 on t8.partition_date=t3.partition_date and t8.cl_id=t3.cl_id and t8.business_id=t3.card_id and t8.page_name=t3.card_content_type LEFT JOIN (--推荐的内容卡片 SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as recom_content_pv FROM ( SELECT partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as service_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_card' and params['card_content_type'] in ('qa','diary','user_post','answer') AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id,app_session_id,params['card_id'], case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name )t9 on t9.partition_date=t3.partition_date and t9.cl_id=t3.cl_id and t9.business_id=t3.card_id and t9.page_name=t3.card_content_type LEFT JOIN (--视频面诊点击 select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,video_pv from ( SELECT partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as video_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_button' and params['button_name']='video_interview' AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name )t10 on t10.partition_date=t3.partition_date and t10.cl_id=t3.cl_id and t10.business_id=t3.card_id and t10.page_name=t3.card_content_type LEFT JOIN (--转诊按钮点击 select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,referral_pv from ( SELECT partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end as page_name, count(1) as referral_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_button' and params['button_name']='referral' AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,business_id, case when page_name in ('diary_detail','topic_detail') then 'diary' when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa' when page_name in ('custom_special') then 'special' else null end )a left join ( select id,visual_page_id,'special' as page_name from tl.tl_zx_api_special_pool where partition_day ={partition_day} group by id,visual_page_id )b on a.business_id=b.visual_page_id and a.page_name=b.page_name )t11 on t11.partition_date=t3.partition_date and t11.cl_id=t3.cl_id and t11.business_id=t3.card_id and t11.page_name=t3.card_content_type LEFT JOIN (--从帖子页到帖子页 SELECT partition_date,cl_id,params['referrer_id'] as business_id, case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' else null end as page_name, count(distinct params['business_id'],app_session_id) as post_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='page_view' AND page_name IN ('post_detail','user_post_detail','doctor_post_detail','custom_special') AND (json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]IN ('post_detail','user_post_detail','doctor_post_detail') and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-2]='home') group by partition_date,cl_id,params['referrer_id'], case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' else null end )t12 on t12.partition_date=t3.partition_date and t12.cl_id=t3.cl_id and t12.business_id=t3.card_id and t12.page_name=t3.card_content_type left join (--在帖子页点击帖子 SELECT partition_date,cl_id,params['business_id'] as business_id, case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' else null end as page_name, count(distinct params['card_id'],app_session_id) as post_click_pv FROM online.bl_hdfs_maidian_updates WHERE partition_date={partition_day} AND action='on_click_card' and params['card_content_type'] in ('user_post') AND page_name IN ('post_detail','user_post_detail','doctor_post_detail') AND (referrer='home' or (params['referrer_link'] like '%[%' and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home')) group by partition_date,cl_id,params['business_id'], case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post' else null end )t13 on t13.partition_date=t3.partition_date and t13.cl_id=t3.cl_id and t13.business_id=t3.card_id and t13.page_name=t3.card_content_type LEFT JOIN ( select distinct device_id from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D where PARTITION_DAY = '{partition_day}' AND is_abnormal_device = 'true' )dev on t2.cl_id=dev.device_id WHERE dev.device_id is null GROUP BY t1.partition_date,t1.device_os_type,t1.active_type,t2.card_content_type,t2.recommend_type order by day_id,device_os_type,active_type,card_content_type,recommend_type """.format(partition_day=today_str) ctr_df = spark.sql(ctr_sql) ctr_df.createOrReplaceTempView("temp_ctr") grey_select_sql = """SELECT *, NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_exposure,5),0) as recommend_ctr, NVL(ROUND(card_click/card_exposure,5),0) as click_ctr, NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_click,5),0) as second_jump_ctr , NVL(ROUND(page_pv_20/page_pv,5),0) as page_pv_20_percent FROM temp_ctr""" device_df = spark.sql(grey_select_sql) device_df.show(1, False) sql_res = device_df.collect() print("-----------------------------------------------------------------------------") for res in sql_res: print(res) day_id = res.day_id device_os_type = res.device_os_type active_type = res.active_type card_content_type = res.card_content_type recommend_type = res.recommend_type card_click = res.card_click card_exposure = res.card_exposure avg_page_stay = res.avg_page_stay navbar_search = res.navbar_search highlight_word = res.highlight_word self_welfare_card = res.self_welfare_card page_pv_20 = res.page_pv_20 page_pv_20_percent = res.page_pv_20_percent if not card_content_type or not recommend_type: continue recommend_welfare_card = res.recommend_welfare_card recommend_content_card = res.recommend_content_card if not recommend_content_card: recommend_content_card = 0 recommend_special_card = res.recommend_special_card if not recommend_special_card: recommend_special_card = 0 transfer_card = res.transfer_card video_consultation = res.video_consultation partition_day = today_str recommend_ctr = res.recommend_ctr second_jump_ctr = res.second_jump_ctr click_ctr = res.click_ctr pid = hashlib.md5((day_id + device_os_type + active_type + card_content_type + recommend_type).encode("utf8")).hexdigest() instert_sql = """replace into recommend_strategy_d_fix( day_id,device_os_type,active_type,card_content_type,recommend_type,card_click,card_exposure,avg_page_stay,navbar_search, highlight_word,self_welfare_card,recommend_welfare_card,recommend_content_card,recommend_special_card,transfer_card,video_consultation, partition_day,pid,recommend_ctr,second_jump_ctr,click_ctr,page_pv_20_percent ) VALUES('{day_id}','{device_os_type}','{active_type}','{card_content_type}','{recommend_type}',{card_click},{card_exposure}, {avg_page_stay},{navbar_search},{highlight_word},{self_welfare_card},{recommend_welfare_card},{recommend_content_card},{recommend_special_card}, {transfer_card},{video_consultation},'{partition_day}','{pid}',{recommend_ctr},{second_jump_ctr},{click_ctr},{page_pv_20_percent});""".format( day_id=day_id, device_os_type=device_os_type, active_type=active_type, card_content_type=card_content_type, card_click=card_click, recommend_type=recommend_type, card_exposure=card_exposure, avg_page_stay=avg_page_stay, navbar_search=navbar_search, self_welfare_card=self_welfare_card, recommend_welfare_card=recommend_welfare_card, recommend_content_card=recommend_content_card, recommend_special_card=recommend_special_card,page_pv_20_percent=page_pv_20_percent, transfer_card=transfer_card, video_consultation=video_consultation, partition_day=partition_day, pid=pid, recommend_ctr=recommend_ctr, second_jump_ctr=second_jump_ctr, click_ctr=click_ctr,highlight_word=highlight_word ) print(instert_sql) # cursor.execute("set names 'UTF8'") res = cursor.execute(instert_sql) db.commit() print(res) # cursor.executemany() db.close()