Commit 63352f25 authored by litaolemo's avatar litaolemo

update

parent baef791e
...@@ -55,6 +55,9 @@ sparkConf.set("prod.jerry.jdbcuri", ...@@ -55,6 +55,9 @@ sparkConf.set("prod.jerry.jdbcuri",
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod") sparkConf.set("prod.tidb.database", "jerry_prod")
sparkConf.set("spark.driver.extraClassPath", "/opt/hadoop/share/hadoop/common/hadoop-common-2.6.0-cdh5.16.1.jar")
sparkConf.set("spark.driver.extraLibraryPath", "/opt/hadoop/lib/native")
sparkConf.set("spark.executor.extraClassPath", "/opt/hadoop/share/hadoop/common/hadoop-common-2.6.0-cdh5.16.1.jar")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName( .config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(
...@@ -76,147 +79,147 @@ for t in range(0, task_days): ...@@ -76,147 +79,147 @@ for t in range(0, task_days):
today_str = now.strftime("%Y%m%d") today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
sql_distinct_device_id = """ # sql_distinct_device_id = """
SELECT count(distinct user_id) FROM online.tl_hdfs_doctor_view WHERE partition_date = '20200827'""" # SELECT count(distinct user_id) FROM online.tl_hdfs_doctor_view WHERE partition_date = '20200827'"""
print(sql_distinct_device_id) # print(sql_distinct_device_id)
distinct_device_id_df = spark.sql(sql_distinct_device_id,) # distinct_device_id_df = spark.sql(sql_distinct_device_id,)
distinct_device_id_df.show(1) # distinct_device_id_df.show(1)
sql_res = distinct_device_id_df.collect() # sql_res = distinct_device_id_df.collect()
# for res in sql_res:
# print(res)
print("-------------------------------")
sql = r"""SELECT
,t3.device_os_type as device_type
,t3.active_type as active_type
,t3.channel as channel_type
,NVL(t3.search_pv,0) as pv
,NVL(t3.search_uv,0) as uv
FROM
(
SELECT active_type,device_os_type,channel,search_pv,search_uv
FROM
(
SELECT active_type,device_os_type,channel
,count(t1.cl_id) as search_pv
,count(distinct t1.cl_id) as search_uv
FROM
(
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action in ('do_search','search_result_click_search')
UNION ALL
SELECT cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action = 'on_click_card'
AND params['page_name']='search_home'
UNION ALL
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action = 'on_click_card'
AND params['in_page_pos']='猜你喜欢'
AND params['tab_name']='精选'
AND params['card_type']='search_word'
UNION ALL
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action = 'on_click_card'
AND page_name='welfare_home'
AND params['card_type'] ='search_word'
AND params['in_page_pos']='大家都在搜'
UNION ALL
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND int(split(app_version,'\\.')[1]) >= 27
AND action='on_click_card'
AND params['card_type']='highlight_word'
)t1
JOIN
(
SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
FROM
(
SELECT
partition_date,m.device_id
,array(device_os_type ,'合计') as device_os_type
,array(case WHEN active_type = '4' THEN '老活'
WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
FROM online.ml_device_day_active_status m
LEFT JOIN
(SELECT code,is_ai_channel,partition_day
FROM DIM.DIM_AI_CHANNEL_ZP_NEW
WHERE partition_day>= {yesterday_str}
AND partition_day < {today_str}) tmp
ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND active_type in ('1','2','4')
) mas
LATERAL VIEW explode(mas.channel) t2 AS channel
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type
)t2
on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
GROUP BY active_type,device_os_type,channel
)t
)t3
""".format(today_str=today_str, yesterday_str=yesterday_str, )
device_df = spark.sql(sql)
device_df.show(1, False)
sql_res = device_df.collect()
for res in sql_res: for res in sql_res:
print(res) print(res)
device_df.createOrReplaceTempView("data_table")
print("-------------------------------") collects_sql = """
SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
FROM data_table GROUP BY device_type,active_type,channel_type
"""
finnal_df = spark.sql(collects_sql)
# sql = r"""SELECT finnal_df.show(1, False)
# ,t3.device_os_type as device_type sql_res = finnal_df.collect()
# ,t3.active_type as active_type for res in sql_res:
# ,t3.channel as channel_type # print(res)
# ,NVL(t3.search_pv,0) as pv device_type = res.device_type
# ,NVL(t3.search_uv,0) as uv active_type = res.active_type
# FROM channel_type = res.channel_type
# ( core_pv_division_uv = res.core_pv_division_uv
# SELECT active_type,device_os_type,channel,search_pv,search_uv pv_division_uv = res.pv_division_uv
# FROM pid = hashlib.md5(
# ( (today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest()
# SELECT active_type,device_os_type,channel instert_sql = """replace into search_strategy_d(
# ,count(t1.cl_id) as search_pv day_id,device_type,active_type,channel_type,core_pv_division_uv,pv_division_uv,pid
# ,count(distinct t1.cl_id) as search_uv ) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}',{core_pv_division_uv},{pv_division_uv},'{pid}');""".format(
# FROM day_id=today_str, device_type=device_type,
# ( active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid
# SELECT partition_date
# ,cl_id )
# FROM online.bl_hdfs_maidian_updates print(instert_sql)
# WHERE partition_date >= {yesterday_str} # cursor.execute("set names 'UTF8'")
# AND partition_date < {today_str} res = cursor.execute(instert_sql)
# AND action in ('do_search','search_result_click_search') db.commit()
# print(res)
# UNION ALL db.close()
# SELECT cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action = 'on_click_card'
# AND params['page_name']='search_home'
#
# UNION ALL
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action = 'on_click_card'
# AND params['in_page_pos']='猜你喜欢'
# AND params['tab_name']='精选'
# AND params['card_type']='search_word'
#
#
# UNION ALL
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action = 'on_click_card'
# AND page_name='welfare_home'
# AND params['card_type'] ='search_word'
# AND params['in_page_pos']='大家都在搜'
#
# UNION ALL
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND int(split(app_version,'\\.')[1]) >= 27
# AND action='on_click_card'
# AND params['card_type']='highlight_word'
# )t1
# JOIN
# (
# SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
# FROM
# (
# SELECT
# partition_date,m.device_id
# ,array(device_os_type ,'合计') as device_os_type
# ,array(case WHEN active_type = '4' THEN '老活'
# WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
# ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
# FROM online.ml_device_day_active_status m
# LEFT JOIN
# (SELECT code,is_ai_channel,partition_day
# FROM DIM.DIM_AI_CHANNEL_ZP_NEW
# WHERE partition_day>= {yesterday_str}
# AND partition_day < {today_str}) tmp
# ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND active_type in ('1','2','4')
# ) mas
# LATERAL VIEW explode(mas.channel) t2 AS channel
# LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
# LATERAL VIEW explode(mas.active_type) t2 AS active_type
# )t2
# on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
# GROUP BY active_type,device_os_type,channel
# )t
# )t3
# """.format(today_str=today_str, yesterday_str=yesterday_str, )
# device_df = spark.sql(sql)
# device_df.show(1, False)
# sql_res = device_df.collect()
# for res in sql_res:
# print(res)
# device_df.createOrReplaceTempView("data_table")
# collects_sql = """
# SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
# ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
# FROM data_table GROUP BY device_type,active_type,channel_type
# """
# finnal_df = spark.sql(collects_sql)
#
# finnal_df.show(1, False)
# sql_res = finnal_df.collect()
# for res in sql_res:
# # print(res)
# device_type = res.device_type
# active_type = res.active_type
# channel_type = res.channel_type
# core_pv_division_uv = res.core_pv_division_uv
# pv_division_uv = res.pv_division_uv
# pid = hashlib.md5(
# (today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest()
# instert_sql = """replace into search_strategy_d(
# day_id,device_type,active_type,channel_type,core_pv_division_uv,pv_division_uv,pid
# ) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}',{core_pv_division_uv},{pv_division_uv},'{pid}');""".format(
# day_id=today_str, device_type=device_type,
# active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid
#
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment