Commit a49389f2 authored by litaolemo's avatar litaolemo

update

parent 7961f94b
# -*- coding:UTF-8 -*-
# @Time : 2020/11/6 10:00
# @File : recommend_strategy_fix.py
# @email : litao@igengmei.com
# @author : litao
# -*- coding:UTF-8 -*-
# @Time : 2020/8/20 9:42
# @File : recommend_strategy_d.py
# @email : litao@igengmei.com
# @author : litao
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
"LR PYSPARK TEST").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_days = 3
for t in range(1, task_days):
day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d")
today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
ctr_sql = """
SELECT
t1.partition_date as day_id,
t1.device_os_type as device_os_type,
t1.active_type as active_type,
t2.card_content_type as card_content_type,
t2.recommend_type as recommend_type,
NVL(sum(t3.session_pv),0) as card_click,
NVL(sum(t2.session_pv),0) as card_exposure,
NVL(round(sum(page_stay)/count(distinct t4.cl_id)/60,2),0) as avg_page_stay,
NVL(sum(t4_pv.pv),0) as page_pv,
NVL(sum(t4_pv.pv_20),0) as page_pv_20,
NVL(sum(navbar_pv),0) as navbar_search,
NVL(sum(highlight_pv),0) as highlight_word,
NVL(sum(self_wel_pv),0) as self_welfare_card,
NVL(sum(recom_wel_pv),0)-NVL(sum(self_wel_pv),0) as recommend_welfare_card,--需要排除关联的商品卡片点击
NVL(sum(recom_content_pv),0) as recommend_content_card,
NULL as recommend_special_card,
NVL(sum(referral_pv),0) as transfer_card,
NVL(sum(video_pv),0) as video_consultation,
NVL(sum(post_pv),0) as total_post_pv,
NVL(sum(post_click_pv),0) as post_click_pv
FROM
(
SELECT partition_date
,device_os_type
,CASE WHEN active_type = '4' THEN '老活'
WHEN active_type IN ('1','2') THEN '新增' END AS active_type
,device_id
FROM online.ml_device_day_active_status
WHERE partition_date={partition_day}
AND active_type IN ('1','2','4')
AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
AND first_channel_source_type not LIKE 'promotion\_jf\_%'
)t1
JOIN
(--精准曝光,卡片id和session_id去重
SELECT partition_date,
card_content_type,
cl_id,
v.recommend_type,
card_id,
count(distinct app_session_id) as session_pv
FROM
(
SELECT partition_date,
cl_id,
case when card_content_type in ('qa','answer') then 'qa'
when card_content_type in ('special_pool') then 'special' else card_content_type end as card_content_type,
CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计')
when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr') ) THEN array('ctr预估','合计')
when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计')
WHEN transaction_type like '%cvr' THEN array('cvr预估','合计')
WHEN transaction_type in ('-1','smr') THEN array('smr','合计')
when transaction_type in ('pgc','hotspot') then array('热点卡片')
when transaction_type in ('newdata') then array('保量卡片')
when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计')
when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计')
when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计')
when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
when transaction_type like 'deeplink%' then array('deeplink策略','合计')
end AS recommend_type,
card_id,
app_session_id
from online.ml_community_precise_exposure_detail
WHERE partition_date={partition_day}
AND action in ('page_precise_exposure','home_choiceness_card_exposure') --7745版本action改为page_precise_exposure
AND is_exposure = '1' ----精准曝光
AND page_name ='home'
AND tab_name = '精选'
AND (transaction_type in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO')
or transaction_type like '%ctr' or transaction_type like '%cvr' or transaction_type like 'deeplink%')
AND card_content_type in ('qa','diary','user_post','answer','special_pool')
group by partition_date,
case when card_content_type in ('qa','answer') then 'qa'
when card_content_type in ('special_pool') then 'special' else card_content_type end,
cl_id,
CASE when transaction_type in ('fmctr','samecity_fmctr') then array('fmctr','合计')
when transaction_type in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
WHEN (transaction_type like '%ctr' and transaction_type not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
when transaction_type in ('high_quality_ctr') then array('high_quality_ctr','合计')
WHEN transaction_type like '%cvr' THEN array('cvr预估','合计')
WHEN transaction_type in ('-1','smr') THEN array('smr','合计')
when transaction_type in ('pgc','hotspot') then array('热点卡片')
when transaction_type in ('newdata') then array('保量卡片')
when transaction_type in ('hotspot_feed') then array('hotspot_feed','合计')
when transaction_type in ('aistragegy') then array('新用户AI帖优先','合计')
when transaction_type in ('excestragegy') then array('新用户精华帖优先','合计')
when transaction_type in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
when transaction_type in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
when transaction_type like 'deeplink%' then array('deeplink策略','合计') end,
card_id,
app_session_id
)a
LATERAL VIEW explode (a.recommend_type) v as recommend_type
group by partition_date,card_content_type,cl_id,v.recommend_type,card_id
)t2
on t1.device_id=t2.cl_id and t1.partition_date=t2.partition_date where t2.session_pv >= 4
LEFT JOIN
(--卡片,卡片id和session_id去重
SELECT partition_date,
card_content_type,
cl_id,
v.recommend_type,
card_id,
count(distinct app_session_id) as session_pv
FROM
(
SELECT partition_date,
cl_id,
case when params['card_content_type'] in ('qa','answer') then 'qa'
when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end as card_content_type,
CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计')
when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计')
WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计')
WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计')
when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片')
when params['transaction_type'] in ('newdata') then array('保量卡片')
when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计')
when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计')
when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计')
when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计')
end AS recommend_type,
params['card_id'] as card_id,
app_session_id
from online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='on_click_card'
AND params['page_name'] ='home'
AND params['tab_name'] = '精选'
AND (params['transaction_type'] in ('-1','smr','hotspot','pgc','newdata','hotspot_feed','aistragegy','excestragegy','FIXEDSTRATEGY','FIXEDSTRATEGY_VIDEO')
or params['transaction_type'] like '%ctr' or params['transaction_type'] like '%cvr' or params['transaction_type'] like 'deeplink%')
AND params['card_content_type'] in ('qa','diary','user_post','answer','special_pool')
GROUP BY partition_date,
cl_id,
case when params['card_content_type'] in ('qa','answer') then 'qa'
when params['card_content_type'] in ('special_pool') then 'special' else params['card_content_type'] end,
CASE when params['transaction_type'] in ('fmctr','samecity_fmctr') then array('fmctr','合计')
when params['transaction_type'] in ('high_quality_fmctr') then array('high_quality_fmctr','合计')
WHEN (params['transaction_type'] like '%ctr' and params['transaction_type'] not in ('high_quality_ctr','high_quality_fmctr','fmctr','samecity_fmctr')) THEN array('ctr预估','合计')
when params['transaction_type'] in ('high_quality_ctr') then array('high_quality_ctr','合计')
WHEN params['transaction_type'] like '%cvr' THEN array('cvr预估','合计')
WHEN params['transaction_type'] in ('-1','smr') THEN array('smr','合计')
when params['transaction_type'] in ('pgc','hotspot') then array('热点卡片')
when params['transaction_type'] in ('newdata') then array('保量卡片')
when params['transaction_type'] in ('hotspot_feed') then array('hotspot_feed','合计')
when params['transaction_type'] in ('aistragegy') then array('新用户AI帖优先','合计')
when params['transaction_type'] in ('excestragegy') then array('新用户精华帖优先','合计')
when params['transaction_type'] in ('FIXEDSTRATEGY') then array('新氧新用户策略一','合计')
when params['transaction_type'] in ('FIXEDSTRATEGY_VIDEO') then array('新氧新用户策略二','合计')
when params['transaction_type'] like 'deeplink%' then array('deeplink策略','合计') end,
params['card_id'],
app_session_id
)a
LATERAL VIEW explode (a.recommend_type) v as recommend_type
group by partition_date,card_content_type,cl_id,v.recommend_type,card_id
)t3
on t2.partition_date=t3.partition_date
and t2.cl_id=t3.cl_id
and t2.card_id=t3.card_id
and t2.card_content_type=t3.card_content_type
and t2.recommend_type=t3.recommend_type
LEFT JOIN
(--页面浏览时长
select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name
,sum(page_stay) as page_stay
from
(
SELECT partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
page_stay,time_str
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='page_view'
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND referrer='home'
AND page_stay>=0 AND page_stay<1000
GROUP BY partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end,page_stay,time_str
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
)t4
on t4.partition_date=t3.partition_date
and t4.cl_id=t3.cl_id
and t4.business_id=t3.card_id
and t4.page_name=t3.card_content_type
LEFT JOIN
(--页面浏览时长
select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name
,count(1) as pv
,count(case when page_stay>=20 then 1 end) as pv_20
from
(
SELECT partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,time_str,page_stay
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='page_view'
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND referrer='home'
GROUP BY partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end,time_str,page_stay
)a
left join
(--在8月份新增了内容专题卡片,需要转换下id
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
)t4_pv
on t4_pv.partition_date=t3.partition_date
and t4_pv.cl_id=t3.cl_id
and t4_pv.business_id=t3.card_id
and t4_pv.page_name=t3.card_content_type
LEFT JOIN
(--搜索框和点击行为
select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,navbar_pv
from
(
SELECT partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as navbar_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action in ('on_click_navbar_search','do_search')
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
)t5
on t5.partition_date=t3.partition_date
and t5.cl_id=t3.cl_id
and t5.business_id=t3.card_id
and t5.page_name=t3.card_content_type
LEFT JOIN
(--点击高亮词
select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,highlight_pv
from
(
SELECT partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as highlight_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='on_click_card'
and params['card_type']='highlight_word'
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
)t6
on t6.partition_date=t3.partition_date
and t6.cl_id=t3.cl_id
and t6.business_id=t3.card_id
and t6.page_name=t3.card_content_type
LEFT JOIN
(--关联的美购卡片
SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as self_wel_pv
FROM
(
SELECT partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND (get_json_object(params['extra_param'], '$.type')='交互栏'
or get_json_object(params['extra_param'], '$.jump_from')='msg_link'
or params['in_page_pos']='top'
or (params['in_page_pos']='bottom' and params['position'] is null and cl_type='android')
or (params['in_page_pos']='bottom' and params['card_but_pos'] is not null and cl_type='ios'))
AND action='on_click_card'
and params['card_content_type']='service'
AND page_name IN ('diary_detail','topic_detail')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,app_session_id,params['card_id'],
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
)t7
on t7.partition_date=t3.partition_date
and t7.cl_id=t3.cl_id
and t7.business_id=t3.card_id
and t7.page_name=t3.card_content_type
LEFT JOIN
(--推荐的美购卡片(需要排除作者消费的美购)
SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as recom_wel_pv
FROM
(
SELECT partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as service_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND (action='on_click_card'and params['card_content_type']='service'
or action='on_click_button' and params['button_name']='unfold' and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail')
or action='on_click_button' and params['button_name'] = 'more_recommendations')
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,app_session_id,params['card_id'],
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
)t8
on t8.partition_date=t3.partition_date
and t8.cl_id=t3.cl_id
and t8.business_id=t3.card_id
and t8.page_name=t3.card_content_type
LEFT JOIN
(--推荐的内容卡片
SELECT partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,count(1) as recom_content_pv
FROM
(
SELECT partition_date,cl_id,business_id,app_session_id,params['card_id'] as card_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as service_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='on_click_card'
and params['card_content_type'] in ('qa','diary','user_post','answer')
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,app_session_id,params['card_id'],
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
group by partition_date,cl_id,coalesce(b.id,a.business_id),a.page_name
)t9
on t9.partition_date=t3.partition_date
and t9.cl_id=t3.cl_id
and t9.business_id=t3.card_id
and t9.page_name=t3.card_content_type
LEFT JOIN
(--视频面诊点击
select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,video_pv
from
(
SELECT partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as video_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='on_click_button'
and params['button_name']='video_interview'
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
)t10
on t10.partition_date=t3.partition_date
and t10.cl_id=t3.cl_id
and t10.business_id=t3.card_id
and t10.page_name=t3.card_content_type
LEFT JOIN
(--转诊按钮点击
select partition_date,cl_id,coalesce(b.id,a.business_id) as business_id,a.page_name,referral_pv
from
(
SELECT partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end as page_name,
count(1) as referral_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='on_click_button'
and params['button_name']='referral'
AND page_name IN ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail','question_answer_detail','custom_special')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,business_id,
case when page_name in ('diary_detail','topic_detail') then 'diary'
when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
when page_name in ('question_detail','answer_detail','question_answer_detail') then 'qa'
when page_name in ('custom_special') then 'special' else null end
)a
left join
(
select id,visual_page_id,'special' as page_name
from tl.tl_zx_api_special_pool
where partition_day ={partition_day}
group by id,visual_page_id
)b
on a.business_id=b.visual_page_id and a.page_name=b.page_name
)t11
on t11.partition_date=t3.partition_date
and t11.cl_id=t3.cl_id
and t11.business_id=t3.card_id
and t11.page_name=t3.card_content_type
LEFT JOIN
(--从帖子页到帖子页
SELECT partition_date,cl_id,params['referrer_id'] as business_id,
case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
else null end as page_name,
count(distinct params['business_id'],app_session_id) as post_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='page_view'
AND page_name IN ('post_detail','user_post_detail','doctor_post_detail','custom_special')
AND (json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]IN ('post_detail','user_post_detail','doctor_post_detail')
and json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-2]='home')
group by partition_date,cl_id,params['referrer_id'],
case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
else null end
)t12
on t12.partition_date=t3.partition_date
and t12.cl_id=t3.cl_id
and t12.business_id=t3.card_id
and t12.page_name=t3.card_content_type
left join
(--在帖子页点击帖子
SELECT partition_date,cl_id,params['business_id'] as business_id,
case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
else null end as page_name,
count(distinct params['card_id'],app_session_id) as post_click_pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date={partition_day}
AND action='on_click_card'
and params['card_content_type'] in ('user_post')
AND page_name IN ('post_detail','user_post_detail','doctor_post_detail')
AND (referrer='home' or
(params['referrer_link'] like '%[%' and
json_split(params['referrer_link'])[size(json_split(params['referrer_link']))-1]='home'))
group by partition_date,cl_id,params['business_id'],
case when page_name in ('post_detail','user_post_detail','doctor_post_detail') then 'user_post'
else null end
)t13
on t13.partition_date=t3.partition_date
and t13.cl_id=t3.cl_id
and t13.business_id=t3.card_id
and t13.page_name=t3.card_content_type
LEFT JOIN
(
select distinct device_id
from ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
WHERE partition_day={partition_day}
union all
select distinct device_id
from dim.dim_device_user_staff --去除内网用户
)spam_pv
on spam_pv.device_id=t2.cl_id
LEFT JOIN
(
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date={partition_day}
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = {partition_day}
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = {partition_day}
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = {partition_day}
) t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = {partition_day}
AND is_login_doctor = '1'
) t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev
on t2.partition_date=dev.partition_date and t2.cl_id=dev.device_id
WHERE spam_pv.device_id IS NULL
and dev.device_id is null and dev.
GROUP BY t1.partition_date,t1.device_os_type,t1.active_type,t2.card_content_type,t2.recommend_type
order by day_id,device_os_type,active_type,card_content_type,recommend_type;
""".format(partition_day=today_str)
ctr_df = spark.sql(ctr_sql)
ctr_df.createOrReplaceTempView("temp_ctr")
grey_select_sql = """SELECT *,
NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_exposure,5),0) as recommend_ctr,
NVL(ROUND(card_click/card_exposure,5),0) as click_ctr,
NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_click,5),0) as second_jump_ctr ,
NVL(ROUND(page_pv_20/page_pv,5),0) as page_pv_20_percent
FROM temp_ctr"""
device_df = spark.sql(grey_select_sql)
device_df.show(1, False)
sql_res = device_df.collect()
print("-----------------------------------------------------------------------------")
for res in sql_res:
# print(res)
day_id = res.day_id
device_os_type = res.device_os_type
active_type = res.active_type
card_content_type = res.card_content_type
recommend_type = res.recommend_type
card_click = res.card_click
card_exposure = res.card_exposure
avg_page_stay = res.avg_page_stay
navbar_search = res.navbar_search
highlight_word = res.highlight_word
self_welfare_card = res.self_welfare_card
page_pv_20 = res.page_pv_20
page_pv_20_percent = res.page_pv_20_percent
recommend_welfare_card = res.recommend_welfare_card
recommend_content_card = res.recommend_content_card
if not recommend_content_card:
recommend_content_card = 0
recommend_special_card = res.recommend_special_card
if not recommend_special_card:
recommend_special_card = 0
transfer_card = res.transfer_card
video_consultation = res.video_consultation
partition_day = res.partition_day
recommend_ctr = res.recommend_ctr
second_jump_ctr = res.second_jump_ctr
click_ctr = res.click_ctr
pid = hashlib.md5((day_id + device_os_type + active_type + card_content_type + recommend_type).encode("utf8")).hexdigest()
instert_sql = """replace into recommend_strategy_d_fix(
day_id,device_os_type,active_type,card_content_type,recommend_type,card_click,card_exposure,avg_page_stay,navbar_search,
highlight_word,self_welfare_card,recommend_welfare_card,recommend_content_card,recommend_special_card,transfer_card,video_consultation,
partition_day,pid,recommend_ctr,second_jump_ctr,click_ctr,page_pv_20_percent
) VALUES('{day_id}','{device_os_type}','{active_type}','{card_content_type}','{recommend_type}',{card_click},{card_exposure},
{avg_page_stay},{navbar_search},{highlight_word},{self_welfare_card},{recommend_welfare_card},{recommend_content_card},{recommend_special_card},
{transfer_card},{video_consultation},'{partition_day}','{pid}',{recommend_ctr},{second_jump_ctr},{click_ctr},{page_pv_20_percent});""".format(
day_id=day_id, device_os_type=device_os_type, active_type=active_type, card_content_type=card_content_type,
card_click=card_click, recommend_type=recommend_type, card_exposure=card_exposure, avg_page_stay=avg_page_stay,
navbar_search=navbar_search, self_welfare_card=self_welfare_card, recommend_welfare_card=recommend_welfare_card,
recommend_content_card=recommend_content_card, recommend_special_card=recommend_special_card,page_pv_20_percent=page_pv_20_percent,
transfer_card=transfer_card,
video_consultation=video_consultation, partition_day=partition_day, pid=pid, recommend_ctr=recommend_ctr,
second_jump_ctr=second_jump_ctr, click_ctr=click_ctr,highlight_word=highlight_word
)
print(instert_sql)
# cursor.execute("set names 'UTF8'")
res = cursor.execute(instert_sql)
db.commit()
print(res)
# cursor.executemany()
db.close()
......@@ -449,7 +449,7 @@ select *,ROUND(cast(card_click as double) /cast(card_exposure as double),4) as c
CREATE TABLE recommend_strategy_d(
CREATE TABLE recommend_strategy_d_fix(
day_id varchar(100),
device_os_type varchar(100),
active_type varchar(100),
......@@ -467,6 +467,9 @@ CREATE TABLE recommend_strategy_d(
transfer_card BIGINT(20),
video_consultation BIGINT(20),
partition_day varchar(100),
recommend_ctr FLOAT,
second_jump_ctr FLOAT,
click_ctr FLOAT,
pid varchar(100),
PRIMARY KEY ( pid )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment