Commit 071efe9f authored by litaolemo's avatar litaolemo

update

parent 89f4a228
......@@ -55,9 +55,9 @@ sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
# sparkConf.set("spark.executor.extraJavaOptions", "-Djava.library.path=$HADOOP_HOME/lib/native")
# sparkConf.set("spark.executor.extraJavaOptions", "-Djava.library.path=HADOOP_HOME/lib/native")
sparkConf.set("spark.driver.extraLibraryPath", "/opt/hadoop/lib/native")
# sparkConf.set("spark.driver.extraJavaOptions", "-Djava.library.path=$HADOOP_HOME/lib/native")
# sparkConf.set("spark.driver.extraJavaOptions", "-Djava.library.path=HADOOP_HOME/lib/native")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
......@@ -80,28 +80,75 @@ for t in range(0, task_days):
today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
# sql_distinct_device_id = """
# SELECT * FROM online.tl_hdfs_doctor_view limit 200"""
# print(sql_distinct_device_id)
# distinct_device_id_df = spark.sql(sql_distinct_device_id,)
# distinct_device_id_df.show(1)
# sql_res = distinct_device_id_df.collect()
# for res in sql_res:
# print(res)
sql_distinct_device_id = """
SELECT partition_date,device_id FROM
(
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [0], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}'
)t1
JOIN
(
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{yesterday_str}'
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{yesterday_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{yesterday_str}'
)t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{yesterday_str}'
AND is_login_doctor = '1'
)t2
ON t1.device_id = t2.device_id
)t3
on t1.user_id=t3.user_id group by partition_date,device_id""".format(yesterday_str=yesterday_str,today_str=today_str)
print(sql_distinct_device_id)
distinct_device_id_df = spark.sql(sql_distinct_device_id,)
distinct_device_id_df.show(1)
sql_res = distinct_device_id_df.collect()
for res in sql_res:
print(res)
print("-------------------------------")
sql = r"""
SELECT t3.device_os_type as device_type
SELECT
t3.device_os_type as device_type
,t3.active_type as active_type
,t3.channel as channel_type
,NVL(t3.search_pv,0) as pv
,NVL(t3.search_uv,0) as uv
,NVL(t4.hexin_card_click_pv,0) as search_core_pv
,NVL(t4.neirong_card_click_pv,0) as search_pv
---,t3.search_pv as pv
---,t3.search_uv as uv
---,t4.hexin_card_click_pv as search_core_pv
---,t4.neirong_card_click_pv as search_pv
,sum(t3.search_pv) as pv
,sum(t3.search_uv) as uv
,sum(t4.hexin_card_click_pv) as search_core_pv
,sum(t4.neirong_card_click_pv) as search_pv
FROM
(
SELECT active_type,device_os_type,channel,search_pv,search_uv
SELECT query,active_type,device_os_type,channel,search_pv,search_uv
FROM
(
SELECT query,active_type,device_os_type,channel
......@@ -188,9 +235,90 @@ for t in range(0, task_days):
LATERAL VIEW explode(mas.active_type) t2 AS active_type
)t2
on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
GROUP BY active_type,device_os_type,channel
)t2
GROUP BY query,active_type,device_os_type,channel
)t
)t3
LEFT JOIN
(
SELECT t1.query,active_type,device_os_type,channel
,sum(hexin) as hexin_card_click_pv
,sum(neirong) as neirong_card_click_pv
FROM
(
SELECT NVL(t2.partition_date,t3.partition_date) as partition_date
,NVL(t2.cl_id,t3.cl_id) as cl_id
,NVL(t2.query,t3.query) as query
,NVL(t2.pv,0) as hexin
,NVL(t3.pv,0) as neirong
FROM
(
SELECT partition_date
,params['query'] as query
,cl_id
,count(1) as pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND ((action in ('search_result_click_recommend_item','search_result_welfare_click_item','search_result_hospital_click_item','search_result_doctor_click_item','on_click_doctor_card', 'on_click_hospital_card')
AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor'))
or (action = 'goto_welfare_detail' AND params [ 'from' ] = 'search_result_welfare_recommend')
or (action = 'on_click_card' AND params['card_content_type'] in ('service','hospital','doctor') AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor'))
or (action = 'on_click_button' AND params['button_name'] = 'check_plan' AND page_name = 'search_result_more'))
GROUP BY partition_date
,params['query']
,cl_id
)t2
FULL JOIN
(
SELECT partition_date
,params['query'] as query
,cl_id
,count(1) as pv
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND ((action in ('on_click_topic_card','on_click_diary_card','search_result_click_infomation_item')
AND page_name in ('search_result_more','search_result_diary','search_result_post'))
or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer')))
GROUP BY partition_date
,params['query']
,cl_id
)t3
on t3.partition_date=t2.partition_date
AND t3.query=t2.query
AND t3.cl_id=t2.cl_id
)t1
JOIN
(
SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
FROM
(
SELECT
partition_date,m.device_id
,array(device_os_type ,'合计') as device_os_type
,array(case WHEN active_type = '4' THEN '老活'
WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
FROM online.ml_device_day_active_status m
LEFT JOIN
(SELECT code,is_ai_channel,partition_day
FROM DIM.DIM_AI_CHANNEL_ZP_NEW
WHERE partition_day>= {yesterday_str}
AND partition_day < {today_str}) tmp
ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND active_type in ('1','2','4')
) mas
LATERAL VIEW explode(mas.channel) t2 AS channel
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type
)dev
on t1.cl_id=dev.device_id and t1.partition_date = dev.partition_date
GROUP BY t1.query,active_type,device_os_type,channel
)t4
on t3.query=t4.query and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel group by t3.active_type , t3.device_os_type ,t3.channel
""".format(today_str=today_str, yesterday_str=yesterday_str, )
device_df = spark.sql(sql)
device_df.show(1, False)
......@@ -198,7 +326,7 @@ for t in range(0, task_days):
for res in sql_res:
print(res)
device_df.createOrReplaceTempView("data_table")
#
# collects_sql = """
# SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
# ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment