Commit b863c216 authored by 宋柯's avatar 宋柯

降配

parent 5d9a5864
# -*- coding:UTF-8 -*-
# @Time : 2021/5/14 10:13
# @File : home_feed_recommend_strategy_d.py
# @email : songke@igengmei.com
# @author : songke
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
db='doris_olap')
cursor = db.cursor()
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
db='doris_olap')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark = (SparkSession.builder.config(conf=sparkConf).appName(
"home_feed_recommend_strategy_d").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
START_PARTITION_DATE = '20210512'
HIVE_SQL = """
SELECT partition_date as `日期`
,a.device_os_type as `设备类型`
,b.active_type as `活跃类型`
,d.card_content_type as `卡片类型`
,e.is_cpc as `是否cpc`
,sum(exp_pv) as `卡片曝光pv`
,sum(click_pv) as `卡片点击pv`
FROM
( SELECT t1.partition_date
,array(device_os_type,'合计') as device_os_type
,array(active_type,'合计') as active_type
,array(t2.card_content_type,'合计') as card_content_type
,array(t2.is_cpc,'合计') as is_cpc
,t1.device_id
,sum(exp_pv) as exp_pv
,sum(click_pv) as click_pv
FROM
( --4.10日前灰度比例30%,4.10改为70%
SELECT concat_ws('-',substr(partition_date,1,4),substr(partition_date,5,2),substr(partition_date,7,2)) as partition_date
,device_id
,device_os_type
,active_type
FROM
( --渠道,新老
SELECT partition_date
,device_id
,device_os_type
,CASE WHEN active_type = '4' THEN '老活' when active_type in ('1','2') then '新增' END as active_type
FROM online.ml_device_day_active_status
where partition_date >='{}' and partition_date <=regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) a
)t1
JOIN
( --新首页卡片曝光
SELECT concat_ws('-',substr(partition_date,1,4),substr(partition_date,5,2),substr(partition_date,7,2)) as partition_date
,cl_id
,case when card_content_type in ('diary','topic') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '日记'
when card_content_type in ('question','answer','qa') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '问答'
when card_content_type in ('user_post','doctor_post') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '帖子'
when card_content_type in ('service') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '商品'
when get_json_object(exposure_card,'$.redirect_content_type')='service' then '商品评价' END as card_content_type
,case when get_json_object(exposure_card,'$.is_cpc')=1 then '是' else '否' end as is_cpc
,case when get_json_object(exposure_card,'$.redirect_content_type')='service' then get_json_object(exposure_card,'$.redirect_id') else card_id end as card_id
,count(distinct app_session_id) as exp_pv
FROM online.ml_community_precise_exposure_detail
where partition_date>='{}' and partition_date<=regexp_replace(DATE_SUB(current_date,1) ,'-','')
and action in ('page_precise_exposure','home_choiceness_card_exposure')
and is_exposure='1'
and page_name='new_community_home'
and tab_name='精选'
and card_type in ('card','video')
and card_content_type in ('diary','question','answer','qa','topic','user_post','doctor_post','service')
group by concat_ws('-',substr(partition_date,1,4),substr(partition_date,5,2),substr(partition_date,7,2))
,cl_id
,case when card_content_type in ('diary','topic') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '日记'
when card_content_type in ('question','answer','qa') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '问答'
when card_content_type in ('user_post','doctor_post') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '帖子'
when card_content_type in ('service') and (get_json_object(exposure_card,'$.redirect_content_type') not in ('service') or get_json_object(exposure_card,'$.redirect_content_type') is null) then '商品'
when get_json_object(exposure_card,'$.redirect_content_type')='service' then '商品评价' END
,case when get_json_object(exposure_card,'$.is_cpc')=1 then '是' else '否' end
,case when get_json_object(exposure_card,'$.redirect_content_type')='service' then get_json_object(exposure_card,'$.redirect_id') else card_id end
)t2
on t2.partition_date=t1.partition_date and t2.cl_id=t1.device_id
LEFT JOIN
( --首页卡片点击
SELECT concat_ws('-',substr(partition_date,1,4),substr(partition_date,5,2),substr(partition_date,7,2)) as partition_date
,cl_id
,case when params['card_content_type'] in ('diary','topic') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '日记'
when params['card_content_type'] in ('question','answer','qa') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '问答'
when params['card_content_type'] in ('user_post','doctor_post') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '帖子'
when params['card_content_type'] in ('service') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '商品'
when params['redirect_content_type']='service' then '商品评价' END as card_content_type
,case when params['is_cpc']=1 then '是' else '否' end as is_cpc
,case when params['redirect_content_type']='service' then params['redirect_id'] else params['card_id'] end as card_id
,count(distinct time_str) as click_pv
FROM online.bl_hdfs_maidian_updates
where partition_date>='{}' and partition_date<=regexp_replace(DATE_SUB(current_date,1) ,'-','')
and action='on_click_card'
and page_name='new_community_home'
and params['tab_name']='精选'
and params['card_type'] in ('card','video')
and params['card_content_type'] in ('diary','question','answer','qa','topic','user_post','doctor_post','service')
group by concat_ws('-',substr(partition_date,1,4),substr(partition_date,5,2),substr(partition_date,7,2))
,cl_id
,case when params['card_content_type'] in ('diary','topic') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '日记'
when params['card_content_type'] in ('question','answer','qa') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '问答'
when params['card_content_type'] in ('user_post','doctor_post') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '帖子'
when params['card_content_type'] in ('service') and (params['redirect_content_type'] not in ('service') or params['redirect_content_type'] is null) then '商品'
when params['redirect_content_type']='service' then '商品评价' END
,case when params['is_cpc']=1 then '是' else '否' end
,case when params['redirect_content_type']='service' then params['redirect_id'] else params['card_id'] end
)t3
on t3.partition_date=t2.partition_date and t3.cl_id=t2.cl_id and t3.card_content_type=t2.card_content_type and t3.card_id=t2.card_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t5
on t5.device_id=t3.cl_id
WHERE t5.device_id is null
group by t1.partition_date
,array(device_os_type,'合计')
,array(active_type,'合计')
,array(t2.card_content_type,'合计')
,array(t2.is_cpc,'合计')
,t1.device_id
)t
lateral view explode(t.device_os_type ) a as device_os_type
lateral view explode(t.active_type ) b as active_type
lateral view explode(t.card_content_type ) d as card_content_type
lateral view explode(t.is_cpc ) e as is_cpc
group by partition_date
,a.device_os_type
,b.active_type
,d.card_content_type
,e.is_cpc
""".format(START_PARTITION_DATE,START_PARTITION_DATE,START_PARTITION_DATE)
spark.sql(HIVE_SQL).show(100,False)
# t = 1
# day_num = 0 - t
# now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
# last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d")
# today_str = now.strftime("%Y%m%d")
# today_str_format = now.strftime("%Y-%m-%d")
# yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
# yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
# one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
# grey_select_sql = """SELECT *,
# NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_exposure,5),0) as recommend_ctr,
# NVL(ROUND(card_click/card_exposure,5),0) as click_ctr,
# NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_click,5),0) as second_jump_ctr ,
# NVL(ROUND(page_pv_20/page_pv,5),0) as page_pv_20_percent
# FROM pm.tl_pm_recommend_strategy_d"""
# device_df = spark.sql(grey_select_sql)
#
# device_df.show(1, False)
# sql_res = device_df.collect()
# print("-----------------------------------------------------------------------------")
# for res in sql_res:
# # print(res)
# day_id = res.day_id
# device_os_type = res.device_os_type
# active_type = res.active_type
# card_content_type = res.card_content_type
# recommend_type = res.recommend_type
# card_click = res.card_click
# card_exposure = res.card_exposure
# avg_page_stay = res.avg_page_stay
# navbar_search = res.navbar_search
# highlight_word = res.highlight_word
# self_welfare_card = res.self_welfare_card
# page_pv_20 = res.page_pv_20
# page_pv_20_percent = res.page_pv_20_percent
#
#
# recommend_welfare_card = res.recommend_welfare_card
# recommend_content_card = res.recommend_content_card
# if not recommend_content_card:
# recommend_content_card = 0
# recommend_special_card = res.recommend_special_card
# if not recommend_special_card:
# recommend_special_card = 0
# transfer_card = res.transfer_card
# video_consultation = res.video_consultation
# partition_day = res.partition_day
# recommend_ctr = res.recommend_ctr
# second_jump_ctr = res.second_jump_ctr
# click_ctr = res.click_ctr
# pid = hashlib.md5((day_id + device_os_type + active_type + card_content_type + recommend_type).encode("utf8")).hexdigest()
# instert_sql = """replace into recommend_strategy_d(
# day_id,device_os_type,active_type,card_content_type,recommend_type,card_click,card_exposure,avg_page_stay,navbar_search,
# highlight_word,self_welfare_card,recommend_welfare_card,recommend_content_card,recommend_special_card,transfer_card,video_consultation,
# partition_day,pid,recommend_ctr,second_jump_ctr,click_ctr,page_pv_20_percent
# ) VALUES('{day_id}','{device_os_type}','{active_type}','{card_content_type}','{recommend_type}',{card_click},{card_exposure},
# {avg_page_stay},{navbar_search},{highlight_word},{self_welfare_card},{recommend_welfare_card},{recommend_content_card},{recommend_special_card},
# {transfer_card},{video_consultation},'{partition_day}','{pid}',{recommend_ctr},{second_jump_ctr},{click_ctr},{page_pv_20_percent});""".format(
# day_id=day_id, device_os_type=device_os_type, active_type=active_type, card_content_type=card_content_type,
# card_click=card_click, recommend_type=recommend_type, card_exposure=card_exposure, avg_page_stay=avg_page_stay,
# navbar_search=navbar_search, self_welfare_card=self_welfare_card, recommend_welfare_card=recommend_welfare_card,
# recommend_content_card=recommend_content_card, recommend_special_card=recommend_special_card,page_pv_20_percent=page_pv_20_percent,
# transfer_card=transfer_card,
# video_consultation=video_consultation, partition_day=partition_day, pid=pid, recommend_ctr=recommend_ctr,
# second_jump_ctr=second_jump_ctr, click_ctr=click_ctr,highlight_word=highlight_word
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# # cursor.executemany()
# db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment