Commit a812f582 authored by litaolemo's avatar litaolemo

update

parent 66b9432d
...@@ -72,7 +72,7 @@ for t in range(0, task_days): ...@@ -72,7 +72,7 @@ for t in range(0, task_days):
today_str = now.strftime("%Y%m%d") today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
sql = r"""SELECT t3.query as query sql = r"""SELECT
,t3.device_os_type as device_type ,t3.device_os_type as device_type
,t3.active_type as active_type ,t3.active_type as active_type
,t3.channel as channel_type ,t3.channel as channel_type
...@@ -82,16 +82,15 @@ for t in range(0, task_days): ...@@ -82,16 +82,15 @@ for t in range(0, task_days):
,NVL(t4.neirong_card_click_pv,0) as search_pv ,NVL(t4.neirong_card_click_pv,0) as search_pv
FROM FROM
( (
SELECT query,active_type,device_os_type,channel,search_pv,search_uv SELECT active_type,device_os_type,channel,search_pv,search_uv
FROM FROM
( (
SELECT query,active_type,device_os_type,channel SELECT active_type,device_os_type,channel
,count(t1.cl_id) as search_pv ,count(t1.cl_id) as search_pv
,count(distinct t1.cl_id) as search_uv ,count(distinct t1.cl_id) as search_uv
FROM FROM
( (
SELECT partition_date SELECT partition_date
,params['query'] as query
,cl_id ,cl_id
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str} WHERE partition_date >= {yesterday_str}
...@@ -99,7 +98,7 @@ for t in range(0, task_days): ...@@ -99,7 +98,7 @@ for t in range(0, task_days):
AND action in ('do_search','search_result_click_search') AND action in ('do_search','search_result_click_search')
UNION ALL UNION ALL
SELECT partition_date,params['query'] as query,cl_id SELECT cl_id
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str} WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str} AND partition_date < {today_str}
...@@ -108,7 +107,6 @@ for t in range(0, task_days): ...@@ -108,7 +107,6 @@ for t in range(0, task_days):
UNION ALL UNION ALL
SELECT partition_date SELECT partition_date
,params['card_name'] as query
,cl_id ,cl_id
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str} WHERE partition_date >= {yesterday_str}
...@@ -121,7 +119,6 @@ for t in range(0, task_days): ...@@ -121,7 +119,6 @@ for t in range(0, task_days):
UNION ALL UNION ALL
SELECT partition_date SELECT partition_date
,params['card_name'] as query
,cl_id ,cl_id
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str} WHERE partition_date >= {yesterday_str}
...@@ -133,7 +130,6 @@ for t in range(0, task_days): ...@@ -133,7 +130,6 @@ for t in range(0, task_days):
UNION ALL UNION ALL
SELECT partition_date SELECT partition_date
,params['card_name'] as query
,cl_id ,cl_id
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str} WHERE partition_date >= {yesterday_str}
...@@ -169,7 +165,57 @@ for t in range(0, task_days): ...@@ -169,7 +165,57 @@ for t in range(0, task_days):
LATERAL VIEW explode(mas.active_type) t2 AS active_type LATERAL VIEW explode(mas.active_type) t2 AS active_type
)t2 )t2
on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
GROUP BY query,active_type,device_os_type,channel LEFT JOIN
(
SELECT partition_date,device_id
FROM
(
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date>='${start_date}' AND partition_date<'${end_date}'
)t1
JOIN
(
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '${partition_date}'
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '${partition_date}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '${partition_date}'
)t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '${partition_date}'
AND is_login_doctor = '1'
)t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id
WHERE (spam_pv.device_id IS NULL or spam_pv.device_id = '')
and (dev.device_id is NULL or dev.device_id='')
GROUP BY active_type,device_os_type,channel
)t )t
)t3 )t3
...@@ -188,7 +234,6 @@ for t in range(0, task_days): ...@@ -188,7 +234,6 @@ for t in range(0, task_days):
FROM FROM
( (
SELECT partition_date SELECT partition_date
,params['query'] as query
,cl_id ,cl_id
,count(1) as pv ,count(1) as pv
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
...@@ -200,13 +245,11 @@ for t in range(0, task_days): ...@@ -200,13 +245,11 @@ for t in range(0, task_days):
or (action = 'on_click_card' AND params['card_content_type'] in ('service','hospital','doctor') AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor')) or (action = 'on_click_card' AND params['card_content_type'] in ('service','hospital','doctor') AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor'))
or (action = 'on_click_button' AND params['button_name'] = 'check_plan' AND page_name = 'search_result_more')) or (action = 'on_click_button' AND params['button_name'] = 'check_plan' AND page_name = 'search_result_more'))
GROUP BY partition_date GROUP BY partition_date
,params['query']
,cl_id ,cl_id
)t2 )t2
FULL JOIN FULL JOIN
( (
SELECT partition_date SELECT partition_date
,params['query'] as query
,cl_id ,cl_id
,count(1) as pv ,count(1) as pv
FROM online.bl_hdfs_maidian_updates FROM online.bl_hdfs_maidian_updates
...@@ -216,11 +259,9 @@ for t in range(0, task_days): ...@@ -216,11 +259,9 @@ for t in range(0, task_days):
AND page_name in ('search_result_more','search_result_diary','search_result_post')) AND page_name in ('search_result_more','search_result_diary','search_result_post'))
or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer'))) or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer')))
GROUP BY partition_date GROUP BY partition_date
,params['query']
,cl_id ,cl_id
)t3 )t3
on t3.partition_date=t2.partition_date on t3.partition_date=t2.partition_date
AND t3.query=t2.query
AND t3.cl_id=t2.cl_id AND t3.cl_id=t2.cl_id
)t1 )t1
JOIN JOIN
...@@ -248,47 +289,98 @@ for t in range(0, task_days): ...@@ -248,47 +289,98 @@ for t in range(0, task_days):
LATERAL VIEW explode(mas.channel) t2 AS channel LATERAL VIEW explode(mas.channel) t2 AS channel
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type LATERAL VIEW explode(mas.active_type) t2 AS active_type
)dev )dev0
on t1.cl_id=dev.device_id and t1.partition_date = dev.partition_date on t1.cl_id=dev0.device_id and t1.partition_date = dev0.partition_date
GROUP BY t1.query,active_type,device_os_type,channel LEFT JOIN
)t4 (
on t3.query=t4.query and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel SELECT partition_date,device_id
FROM
(
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date>='${start_date}' AND partition_date<'${end_date}'
)t1
JOIN
(
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '${partition_date}'
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '${partition_date}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '${partition_date}'
)t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '${partition_date}'
AND is_login_doctor = '1'
)t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id
WHERE (spam_pv.device_id IS NULL or spam_pv.device_id = '')
and (dev.device_id is NULL or dev.device_id='')
GROUP BY active_type,device_os_type,channel
)t4
on t3.query=t4.query and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel
""".format(today_str=today_str,yesterday_str=yesterday_str,) """.format(today_str=today_str,yesterday_str=yesterday_str,)
device_df = spark.sql(sql) device_df = spark.sql(sql)
device_df.createOrReplaceTempView("data_table") device_df.show(1, False)
sql_res = device_df.collect()
collects_sql = """
SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
FROM data_table GROUP BY device_type,active_type,channel_type
"""
finnal_df = spark.sql(collects_sql)
finnal_df.show(1, False)
sql_res = finnal_df.collect()
for res in sql_res: for res in sql_res:
# print(res)
device_type = res.device_type
active_type = res.active_type
channel_type = res.channel_type
core_pv_division_uv = res.core_pv_division_uv
pv_division_uv = res.pv_division_uv
pid = hashlib.md5(
(today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest()
instert_sql = """replace into search_strategy_d(
day_id,device_type,active_type,channel_type,core_pv_division_uv,pv_division_uv,pid
) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}',{core_pv_division_uv},{pv_division_uv},'{pid}');""".format(
day_id=today_str, device_type=device_type,
active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid
)
print(instert_sql)
# cursor.execute("set names 'UTF8'")
res = cursor.execute(instert_sql)
db.commit()
print(res) print(res)
db.close() # device_df.createOrReplaceTempView("data_table")
# collects_sql = """
# SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
# ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
# FROM data_table GROUP BY device_type,active_type,channel_type
# """
# finnal_df = spark.sql(collects_sql)
#
# finnal_df.show(1, False)
# sql_res = finnal_df.collect()
# for res in sql_res:
# # print(res)
# device_type = res.device_type
# active_type = res.active_type
# channel_type = res.channel_type
# core_pv_division_uv = res.core_pv_division_uv
# pv_division_uv = res.pv_division_uv
# pid = hashlib.md5(
# (today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest()
# instert_sql = """replace into search_strategy_d(
# day_id,device_type,active_type,channel_type,core_pv_division_uv,pv_division_uv,pid
# ) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}',{core_pv_division_uv},{pv_division_uv},'{pid}');""".format(
# day_id=today_str, device_type=device_type,
# active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid
#
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment