Commit f8cfa2b2 authored by litaolemo's avatar litaolemo

update

parent db1ff715
......@@ -89,6 +89,83 @@ for t in range(1, task_days):
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
#quanzhong_dau
quanzhong_dau_sql = """
--quanzhong_dau
SELECT mas.partition_date
,round(count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'android' AND channel_type = 'AI' THEN device_id END)*0.14
+count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'android' AND channel_type = '医美' THEN device_id END)*0.64
+count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'android' AND channel_type = 'AI' THEN device_id END)*0.08
+count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'android' AND channel_type = '医美' THEN device_id END)*0.19
+count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'ios' AND channel_type = 'AI' THEN device_id END)*0.32
+count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'ios' AND channel_type = '积分墙' THEN device_id END)*0.28
+count(DISTINCT CASE WHEN device_type = '老活' AND device_os_type = 'ios' AND channel_type = '医美' THEN device_id END)*1.00
+count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'ios' AND channel_type = 'AI' THEN device_id END)*0.19
+count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'ios' AND channel_type = '积分墙' THEN device_id END)*0.03
+count(DISTINCT CASE WHEN device_type = '新增' AND device_os_type = 'ios' AND channel_type = '医美' THEN device_id END)*0.57,0) as quanzhong_dau
FROM
(
SELECT
partition_date,m.device_id,device_os_type
,case WHEN active_type = '4' THEN '老活'
WHEN active_type in ('1','2') then '新增' END as device_type
,CASE WHEN is_ai_channel = 'true' THEN 'AI'
WHEN first_channel_source_type in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown') then '积分墙' ELSE '医美' END as channel_type
FROM online.ml_device_day_active_status m
LEFT JOIN
(SELECT code,is_ai_channel,partition_day
FROM DIM.DIM_AI_CHANNEL_ZP_NEW
WHERE partition_day>= '{start_date}' AND partition_day < '{end_date}' ) tmp
ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
where partition_date >= '{start_date}'
AND partition_date < '{end_date}'
AND active_type in ('1','2','4')
) mas
GROUP BY mas.partition_date
""".format(start_date=yesterday_str,end_date=today_str)
print(quanzhong_dau_sql)
quanzhong_dau_df = spark.sql(quanzhong_dau_sql)
quanzhong_dau_df.createOrReplaceTempView("quanzhong_dau_view")
# quanzhong_dau_df.show(1)
# sql_res = quanzhong_dau_df.collect()
# DAU
DAU_sql = """
SELECT mas.partition_date,count(DISTINCT mas.device_id) as dau
FROM
(
SELECT
partition_date,m.device_id
,array(device_os_type ,'合计') as device_os_type
,array(case WHEN active_type = '4' THEN '老活'
WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
FROM online.ml_device_day_active_status m
LEFT JOIN
(SELECT code,is_ai_channel,partition_day
FROM DIM.DIM_AI_CHANNEL_ZP_NEW
WHERE partition_day>= '{start_date}' AND partition_day < '{end_date}' ) tmp
ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
where partition_date >= '{start_date}'
AND partition_date < '{end_date}'
AND active_type in ('1','2','4')
) mas
LATERAL VIEW explode(mas.channel) t2 AS channel
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type
GROUP BY mas.partition_date
""".format(start_date=yesterday_str,end_date=today_str)
print(DAU_sql)
dau_df = spark.sql(DAU_sql)
dau_df.createOrReplaceTempView("dau_view")
# dau_df.show(1)
# sql_res = dau_df.collect()
# CPT日均点击
cpc_daily_click_sql = r"""
SELECT partition_date,count(1) as pv
......@@ -101,44 +178,86 @@ OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_home_list_item'
OR (ACTION = 'goto_welfare_detail' AND PARAMS['from'] = 'welfare_list' AND PARAMS['transaction_type'] = 'advertise')
OR (ACTION = 'on_click_card' AND PARAMS['card_content_type'] = 'service' AND PARAMS['page_name'] IN ('new_sign','search_result_welfare','category','welfare_home_list_item','welfare_list') AND PARAMS['transaction_type'] = 'advertise'))
group BY partition_date
""".format(partition_day=yesterday_str, end_date=today_str)
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
print(cpc_daily_click_sql)
cpc_daily_click_df = spark.sql(cpc_daily_click_sql)
cpc_daily_click_df.createOrReplaceTempView("cpc_daily_click")
cpc_daily_click_df.show(1)
sql_res = cpc_daily_click_df.collect()
# cpc_daily_click_df.show(1)
# sql_res = cpc_daily_click_df.collect()
# 商详页PV
bus_detail_sql = r"""
SELECT
partition_date,count(1) welfare_pv
FROM
(
SELECT cl_id,partition_date
FROM bl_hdfs_maidian_updates
WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
--页面浏览pvuv
SELECT
page.partition_date as partition_date
,count(case when page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') then page.cl_id else NULL end) as search_home_pv
,count(distinct case when page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor') then page.cl_id else NULL end) as search_home_uv
,count(CASE when referrer in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') and page_name in ('welfare_detail','organization_detail','expert_detail') THEN page.cl_id else NULL END) as referrer_search_hexin_pv
,count(CASE when referrer in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') and page_name in ('welfare_detail') THEN page.cl_id else NULL END) as referrer_search_welfare_pv
,count(CASE when referrer in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
,'question_answer_detail','article_detail') THEN page.cl_id else NULL END) as referrer_search_neirong_pv
,count(DISTINCT CASE WHEN referrer in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
,'question_answer_detail','article_detail') and page_stay >= '0' and page_stay < '1000' THEN page.cl_id else NULL END) as referrer_search_neirong_uv_1000
,sum(CASE WHEN referrer in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') and page_name in ('diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
,'question_answer_detail','article_detail') and page_stay >= '0' and page_stay < '1000' THEN page.page_stay else NULL END) as referrer_search_neirong_pagestay
FROM
(
SELECT cl_id,partition_date,page_name,params['referrer'] as referrer,page_stay
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= '{start_date}'
AND partition_date < '{end_date}'
AND action='page_view'
AND params['page_name'] = 'welfare_detail'
)a1
JOIN
(
SELECT device_id,partition_date
from online.ml_device_day_active_status
WHERE partition_date >='{start_date}'and partition_date < '{end_date}'
AND page_name in ('search_home','search_home_more','search_home_welfare','search_home_diary','search_home_wiki','search_home_post','search_home_hospital','search_home_doctor'
,'diary_detail','topic_detail','post_detail','user_post_detail','doctor_post_detail','question_detail','answer_detail'
,'question_answer_detail','article_detail','welfare_detail','organization_detail','expert_detail','level_one_plan_detail')
)page
JOIN
(
SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
FROM
(
SELECT
partition_date,m.device_id
,array(device_os_type ,'合计') as device_os_type
,array(case WHEN active_type = '4' THEN '老活'
WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
FROM online.ml_device_day_active_status m
LEFT JOIN
(SELECT code,is_ai_channel,partition_day
FROM DIM.DIM_AI_CHANNEL_ZP_NEW
WHERE partition_day>= '{start_date}' AND partition_day < '{end_date}' ) tmp
ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
where partition_date >= '{start_date}'
AND partition_date < '{end_date}'
AND active_type in ('1','2','4')
)a2
on a2.device_id = a1.cl_id
AND a2.partition_date=a1.partition_date
group by partition_date
""".format(partition_day=yesterday_str, end_date=today_str)
) mas
LATERAL VIEW explode(mas.channel) t2 AS channel
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type
)dev_channel
on dev_channel.device_id = page.cl_id
AND dev_channel.partition_date = page.partition_date
GROUP BY page.partition_date
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
print(bus_detail_sql)
bus_detail_df = spark.sql(bus_detail_sql)
bus_detail_df.createOrReplaceTempView("bus_detail")
bus_detail_df.show(1)
sql_res = bus_detail_df.collect()
# bus_detail_df.show(1)
# sql_res = bus_detail_df.collect()
# --cpc当日预算(有效口径)
......@@ -192,18 +311,19 @@ FROM
AND T1.merchant_doctor_id=T2.merchant_doctor_id
)T
GROUP BY day_id
""".format(partition_day=yesterday_str, end_date=today_str)
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
print(cpc_budget_sql)
cpc_budget_df = spark.sql(cpc_budget_sql)
cpc_budget_df.createOrReplaceTempView("cpc_budget")
cpc_budget_df.show(1)
sql_res = cpc_budget_df.collect()
# cpc_budget_df.show(1)
# sql_res = cpc_budget_df.collect()
# cpc收入、广告总消耗
cpc_income_sql = r"""
select partition_day,
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_log' then cpc_click_num end) cpc_click_num,--- 当天cpc商品点击量
sum(case when advertise_type = 'cpt' AND advertise_business_type in('service') and advertise_calculate_type='cpt_schedule' then cpc_click_num end) cpt_click_num,--- 当天cpt商品点击量
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_amount end) cpc_proportion_expend_amount,--- 当天cpc总收入(含返点)
sum(case when advertise_type = 'cpc' AND advertise_business_type in('service') and advertise_calculate_type='cpc_flownext' then proportion_expend_recharge_amount end) cpc_proportion_expend_recharge_amount,--- 当天cpc收入(不含返点)
SUM(CASE
......@@ -222,32 +342,45 @@ SUM(CASE
from ml.ml_c_ct_mc_merchantadclassify_indic_d
where partition_day>='{start_date}' AND partition_day <'{end_date}'
group by partition_day
""".format(partition_day=yesterday_str, end_date=today_str)
""".format(partition_day=yesterday_str, end_date=today_str,start_date=yesterday_str)
print(cpc_income_sql)
cpc_income_df = spark.sql(cpc_income_sql)
cpc_income_df.createOrReplaceTempView("cpc_income")
cpc_income_df.show(1)
sql_res = cpc_income_df.collect()
# cpc_income_df.show(1)
# sql_res = cpc_income_df.collect()
out_put_sql = """
SELECT bus_detail.referrer_search_welfare_pv / dau_view.dau as pv_div_dau,
bus_detail.referrer_search_welfare_pv / quanzhong_dau_view.quanzhong_dau as pv_div_quanzhong_dau,
(cpc_income.cpt_click_num + cpc_income.cpc_click_num) / bus_detail.referrer_search_welfare_pv as ad_flow_rat,
cpc_income.cpc_proportion_expend_amount/cpc_budget.budget as budget_consumption_rate,
cpc_income.cpc_proportion_expend_recharge_amount/cpc_income.cpc_click_num as cpc_item_pricing
cpc_income.tol_proportion_expend_amount as tol_proportion_expend_amount
"""
out_df = spark.sql(out_put_sql)
# out_df.createOrReplaceTempView("out_df")
out_df.show(1)
sql_res = out_df.collect()
for res in sql_res:
print(res)
for active_type in res_dict:
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
partition_date = yesterday_str
pid = hashlib.md5((partition_date + device_os_type + active_type).encode("utf8")).hexdigest()
cpc_daily_click_sql = """replace into search_diary_ctr(
partition_date,device_os_type,active_type,pid,click_num,exposure,search_ctr) VALUES('{partition_date}','{device_os_type}','{active_type}','{pid}',{click_num},{exposure},{search_ctr});""".format(
partition_date=partition_date, device_os_type=device_os_type, active_type=active_type, pid=pid, click_num=click_num,
exposure=exposure, search_ctr=search_ctr
)
print(instert_sql)
# cursor.execute("set names 'UTF8'")
res = cursor.execute(instert_sql)
db.commit()
print(res)
# cursor.executemany()
db.close()
# for active_type in res_dict:
# db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
# db='jerry_prod')
# cursor = db.cursor()
# partition_date = yesterday_str
# pid = hashlib.md5((partition_date + device_os_type + active_type).encode("utf8")).hexdigest()
# cpc_daily_click_sql = """replace into search_diary_ctr(
# partition_date,device_os_type,active_type,pid,click_num,exposure,search_ctr) VALUES('{partition_date}','{device_os_type}','{active_type}','{pid}',{click_num},{exposure},{search_ctr});""".format(
# partition_date=partition_date, device_os_type=device_os_type, active_type=active_type, pid=pid, click_num=click_num,
# exposure=exposure, search_ctr=search_ctr
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# # cursor.executemany()
# db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment