Commit 6c5886ee authored by litaolemo's avatar litaolemo

update

parent 59fdff43
...@@ -30,29 +30,27 @@ cursor = db.cursor() ...@@ -30,29 +30,27 @@ cursor = db.cursor()
startTime = time.time() startTime = time.time()
sparkConf = SparkConf() sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.sql.crossJoin.enabled", True)
# sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.debug.maxToStringFields", "100")
# sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
# sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.tispark.plan.allow_index_read", True)
# sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.sql.adaptive.enabled", True) sparkConf.set("spark.sql.adaptive.enabled", True)
# sparkConf.set("spark.sql.adaptive.skewedJoin.enabled", True) sparkConf.set("spark.sql.adaptive.skewedJoin.enabled", True)
sparkConf.set("spark.shuffle.statistics.verbose", True) sparkConf.set("spark.shuffle.statistics.verbose", True)
# sparkConf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "67108864")
# sparkConf.set("spark.sql.adaptive.shuffle.targetPostShuffleRowCount", "20000000")
# sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
# sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("mapreduce.map.output.compress", False)
# sparkConf.set("prod.gold.jdbcuri", sparkConf.set("prod.gold.jdbcuri",
# "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
# sparkConf.set("prod.mimas.jdbcuri", sparkConf.set("prod.mimas.jdbcuri",
# "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
# sparkConf.set("prod.gaia.jdbcuri", sparkConf.set("prod.gaia.jdbcuri",
# "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
# sparkConf.set("prod.tidb.jdbcuri", sparkConf.set("prod.tidb.jdbcuri",
# "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
# sparkConf.set("prod.jerry.jdbcuri", sparkConf.set("prod.jerry.jdbcuri",
# "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy") sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
...@@ -82,126 +80,126 @@ for t in range(0, task_days): ...@@ -82,126 +80,126 @@ for t in range(0, task_days):
today_str = now.strftime("%Y%m%d") today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
sql_distinct_device_id = """ # sql_distinct_device_id = """
SELECT * FROM online.tl_hdfs_doctor_view limit 200""" # SELECT * FROM online.tl_hdfs_doctor_view limit 200"""
print(sql_distinct_device_id) # print(sql_distinct_device_id)
distinct_device_id_df = spark.sql(sql_distinct_device_id,) # distinct_device_id_df = spark.sql(sql_distinct_device_id,)
distinct_device_id_df.show(1) # distinct_device_id_df.show(1)
sql_res = distinct_device_id_df.collect() # sql_res = distinct_device_id_df.collect()
# for res in sql_res:
# print(res)
print("-------------------------------")
sql = r"""SELECT
,t3.device_os_type as device_type
,t3.active_type as active_type
,t3.channel as channel_type
,NVL(t3.search_pv,0) as pv
,NVL(t3.search_uv,0) as uv
FROM
(
SELECT active_type,device_os_type,channel,search_pv,search_uv
FROM
(
SELECT active_type,device_os_type,channel
,count(t1.cl_id) as search_pv
,count(distinct t1.cl_id) as search_uv
FROM
(
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action in ('do_search','search_result_click_search')
UNION ALL
SELECT cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action = 'on_click_card'
AND params['page_name']='search_home'
UNION ALL
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action = 'on_click_card'
AND params['in_page_pos']='猜你喜欢'
AND params['tab_name']='精选'
AND params['card_type']='search_word'
UNION ALL
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND action = 'on_click_card'
AND page_name='welfare_home'
AND params['card_type'] ='search_word'
AND params['in_page_pos']='大家都在搜'
UNION ALL
SELECT partition_date
,cl_id
FROM online.bl_hdfs_maidian_updates
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND int(split(app_version,'\\.')[1]) >= 27
AND action='on_click_card'
AND params['card_type']='highlight_word'
)t1
JOIN
(
SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
FROM
(
SELECT
partition_date,m.device_id
,array(device_os_type ,'合计') as device_os_type
,array(case WHEN active_type = '4' THEN '老活'
WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
FROM online.ml_device_day_active_status m
LEFT JOIN
(SELECT code,is_ai_channel,partition_day
FROM DIM.DIM_AI_CHANNEL_ZP_NEW
WHERE partition_day>= {yesterday_str}
AND partition_day < {today_str}) tmp
ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
WHERE partition_date >= {yesterday_str}
AND partition_date < {today_str}
AND active_type in ('1','2','4')
) mas
LATERAL VIEW explode(mas.channel) t2 AS channel
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type
)t2
on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
GROUP BY active_type,device_os_type,channel
)t
)t3
""".format(today_str=today_str, yesterday_str=yesterday_str, )
device_df = spark.sql(sql)
device_df.show(1, False)
sql_res = device_df.collect()
for res in sql_res: for res in sql_res:
print(res) print(res)
device_df.createOrReplaceTempView("data_table")
#
# collects_sql = """
# SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
# ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
# FROM data_table GROUP BY device_type,active_type,channel_type
# """
# finnal_df = spark.sql(collects_sql)
print("-------------------------------")
#
# sql = r"""SELECT
# ,t3.device_os_type as device_type
# ,t3.active_type as active_type
# ,t3.channel as channel_type
# ,NVL(t3.search_pv,0) as pv
# ,NVL(t3.search_uv,0) as uv
# FROM
# (
# SELECT active_type,device_os_type,channel,search_pv,search_uv
# FROM
# (
# SELECT active_type,device_os_type,channel
# ,count(t1.cl_id) as search_pv
# ,count(distinct t1.cl_id) as search_uv
# FROM
# (
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action in ('do_search','search_result_click_search')
#
# UNION ALL
# SELECT cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action = 'on_click_card'
# AND params['page_name']='search_home'
#
# UNION ALL
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action = 'on_click_card'
# AND params['in_page_pos']='猜你喜欢'
# AND params['tab_name']='精选'
# AND params['card_type']='search_word'
#
#
# UNION ALL
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND action = 'on_click_card'
# AND page_name='welfare_home'
# AND params['card_type'] ='search_word'
# AND params['in_page_pos']='大家都在搜'
#
# UNION ALL
# SELECT partition_date
# ,cl_id
# FROM online.bl_hdfs_maidian_updates
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND int(split(app_version,'\\.')[1]) >= 27
# AND action='on_click_card'
# AND params['card_type']='highlight_word'
# )t1
# JOIN
# (
# SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type
# FROM
# (
# SELECT
# partition_date,m.device_id
# ,array(device_os_type ,'合计') as device_os_type
# ,array(case WHEN active_type = '4' THEN '老活'
# WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type
# ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel
# FROM online.ml_device_day_active_status m
# LEFT JOIN
# (SELECT code,is_ai_channel,partition_day
# FROM DIM.DIM_AI_CHANNEL_ZP_NEW
# WHERE partition_day>= {yesterday_str}
# AND partition_day < {today_str}) tmp
# ON m.partition_date=tmp.partition_day AND first_channel_source_type=code
# WHERE partition_date >= {yesterday_str}
# AND partition_date < {today_str}
# AND active_type in ('1','2','4')
# ) mas
# LATERAL VIEW explode(mas.channel) t2 AS channel
# LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
# LATERAL VIEW explode(mas.active_type) t2 AS active_type
# )t2
# on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
# GROUP BY active_type,device_os_type,channel
# )t
# )t3
# """.format(today_str=today_str, yesterday_str=yesterday_str, )
# device_df = spark.sql(sql)
# device_df.show(1, False)
# sql_res = device_df.collect()
# for res in sql_res:
# print(res)
# device_df.createOrReplaceTempView("data_table")
#
# collects_sql = """
# SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv,
# ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv
# FROM data_table GROUP BY device_type,active_type,channel_type
# """
# finnal_df = spark.sql(collects_sql)
#
# finnal_df.show(1, False) # finnal_df.show(1, False)
# sql_res = finnal_df.collect() # sql_res = finnal_df.collect()
# for res in sql_res: # for res in sql_res:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment