Commit 2f92d2d0 authored by litaolemo's avatar litaolemo

update

parent 71cda957
...@@ -19,8 +19,8 @@ import sys ...@@ -19,8 +19,8 @@ import sys
import time import time
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit # from pyspark.sql.functions import lit
import pytispark.pytispark as pti # import pytispark.pytispark as pti
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod') db='jerry_prod')
...@@ -83,84 +83,84 @@ for t in range(0, task_days): ...@@ -83,84 +83,84 @@ for t in range(0, task_days):
today_str = now.strftime("%Y%m%d") today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
# sql_dev_device_id = """ sql_dev_device_id = """
# SELECT partition_date,device_id SELECT partition_date,device_id
# FROM FROM
# (--找出user_id当天活跃的第一个设备id (--找出user_id当天活跃的第一个设备id
# SELECT user_id,partition_date, SELECT user_id,partition_date,
# if(size(device_list) > 0, device_list [ 0 ], '') AS device_id if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
# FROM online.ml_user_updates FROM online.ml_user_updates
# WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}' WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}'
# )t1 )t1
# JOIN JOIN
# ( --医生账号 ( --医生账号
# SELECT distinct user_id SELECT distinct user_id
# FROM online.tl_hdfs_doctor_view FROM online.tl_hdfs_doctor_view
# WHERE partition_date = '{yesterday_str}' WHERE partition_date = '{yesterday_str}'
#
# --马甲账号/模特用户 --马甲账号/模特用户
# UNION ALL UNION ALL
# SELECT user_id SELECT user_id
# FROM ml.ml_c_ct_ui_user_dimen_d FROM ml.ml_c_ct_ui_user_dimen_d
# WHERE partition_day = '{yesterday_str}' WHERE partition_day = '{yesterday_str}'
# AND (is_puppet = 'true' or is_classifyuser = 'true') AND (is_puppet = 'true' or is_classifyuser = 'true')
#
# UNION ALL UNION ALL
# --公司内网覆盖用户 --公司内网覆盖用户
# select distinct user_id select distinct user_id
# from dim.dim_device_user_staff from dim.dim_device_user_staff
#
# UNION ALL UNION ALL
# --登陆过医生设备 --登陆过医生设备
# SELECT distinct t1.user_id SELECT distinct t1.user_id
# FROM FROM
# ( (
# SELECT user_id, v.device_id as device_id SELECT user_id, v.device_id as device_id
# FROM online.ml_user_history_detail FROM online.ml_user_history_detail
# LATERAL VIEW EXPLODE(device_history_list) v AS device_id LATERAL VIEW EXPLODE(device_history_list) v AS device_id
# WHERE partition_date = '{yesterday_str}' WHERE partition_date = '{yesterday_str}'
# )t1 )t1
# JOIN JOIN
# ( (
# SELECT device_id SELECT device_id
# FROM online.ml_device_history_detail FROM online.ml_device_history_detail
# WHERE partition_date = '{yesterday_str}' WHERE partition_date = '{yesterday_str}'
# AND is_login_doctor = '1' AND is_login_doctor = '1'
# )t2 )t2
# ON t1.device_id = t2.device_id ON t1.device_id = t2.device_id
# )t2 )t2
# on t1.user_id=t2.user_id on t1.user_id=t2.user_id
# group by partition_date,device_id group by partition_date,device_id
# """.format(yesterday_str=yesterday_str, today_str=today_str) """.format(yesterday_str=yesterday_str, today_str=today_str)
# print(sql_dev_device_id) print(sql_dev_device_id)
# dev_df = spark.sql(sql_dev_device_id) dev_df = spark.sql(sql_dev_device_id)
# dev_df_view = dev_df.createOrReplaceTempView("dev_view") dev_df_view = dev_df.createOrReplaceTempView("dev_view")
# dev_df.cache() dev_df.cache()
# dev_df.show(1) dev_df.show(1)
# sql_res = dev_df.collect() sql_res = dev_df.collect()
# for res in sql_res: for res in sql_res:
# print(res) print(res)
#
# print("-------------------------------") print("-------------------------------")
#
# sql_spam_pv_device_id = """ sql_spam_pv_device_id = """
# SELECT DISTINCT device_id SELECT DISTINCT device_id
# FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
# WHERE partition_day='{yesterday_str}' WHERE partition_day='{yesterday_str}'
#
# UNION ALL UNION ALL
# SELECT DISTINCT device_id SELECT DISTINCT device_id
# FROM dim.dim_device_user_staff FROM dim.dim_device_user_staff
#
# """.format(yesterday_str=yesterday_str) """.format(yesterday_str=yesterday_str)
# print(sql_spam_pv_device_id) print(sql_spam_pv_device_id)
# spam_pv_df = spark.sql(sql_spam_pv_device_id) spam_pv_df = spark.sql(sql_spam_pv_device_id)
# spam_pv_df.createOrReplaceTempView("spam_pv") spam_pv_df.createOrReplaceTempView("spam_pv")
# spam_pv_df.show(1) spam_pv_df.show(1)
# sql_res = spam_pv_df.collect() sql_res = spam_pv_df.collect()
# spam_pv_df.cache() spam_pv_df.cache()
# for res in sql_res: for res in sql_res:
# print(res) print(res)
print("-------------------------------") print("-------------------------------")
...@@ -189,7 +189,7 @@ for t in range(0, task_days): ...@@ -189,7 +189,7 @@ for t in range(0, task_days):
""".format(partition_day=yesterday_str) """.format(partition_day=yesterday_str)
print(sql_spam_pv_device_id) print(sql_spam_pv_device_id)
spam_pv_df = spark.sql(sql_spam_pv_device_id) spam_pv_df = spark.sql(sql_spam_pv_device_id)
spam_pv_df.createOrReplaceTempView("spam_pv") spam_pv_df.createOrReplaceTempView("dev_view")
spam_pv_df.show(1) spam_pv_df.show(1)
sql_res = spam_pv_df.collect() sql_res = spam_pv_df.collect()
spam_pv_df.cache() spam_pv_df.cache()
...@@ -197,6 +197,52 @@ for t in range(0, task_days): ...@@ -197,6 +197,52 @@ for t in range(0, task_days):
print(res) print(res)
print("-------------------------------") print("-------------------------------")
sql_search_ctr = r"""
select D.ACTIVE_TYPE,D.DEVICE_OS_TYPE,sum(T.CLICK_NUM) as CLICK_NUM,sum(C.EXPOSURE) as EXPOSURE,if(NVL(sum(C.EXPOSURE),0) <> 0 ,cast((NVL(sum(T.CLICK_NUM),0)/NVL(sum(C.EXPOSURE),0)) as decimal(18,5)) , 0) as search_ctr from
(SELECT T.DEVICE_ID, --设备ID
T.CARD_ID, --卡片ID
SUM(T.CLICK_NUM) AS CLICK_NUM --点击次数
FROM ML.ML_C_ET_CK_CLICK_DIMEN_D T
WHERE T.PARTITION_DAY = '${partition_day}'
AND T.PAGE_CODE = 'search_result_welfare'
AND T.ACTION IN ('goto_welfare_detail','search_result_welfare_click_item')
GROUP BY T.DEVICE_ID,
T.CARD_ID) T
left join
(SELECT T.DEVICE_ID as DEVICE_ID, --设备ID
T.CARD_ID as CARD_ID, --卡片ID
COUNT(T.CARD_ID) AS EXPOSURE --点击次数
FROM ML.MID_ML_C_ET_PE_PRECISEEXPOSURE_DIMEN_D T
WHERE T.PARTITION_DAY = '${partition_day}'
AND T.PAGE_CODE = 'search_result_welfare'
AND T.CARD_TYPE = 'common_card'
GROUP BY T.DEVICE_ID,
T.CARD_ID) C on T.DEVICE_ID=C.DEVICE_ID and T.CARD_ID = C.CARD_ID LEFT JOIN
(
SELECT T.DEVICE_ID,
T.DEVICE_OS_TYPE,
T.ACTIVE_TYPE
FROM ML.ML_C_CT_DV_DEVICE_DIMEN_D T
WHERE T.PARTITION_DAY = '${partition_day}'
AND T.ACTIVE_TYPE IN ('1', '2', '4'))
D on T.DEVICE_ID = D.DEVICE_ID
LEFT JOIN spam_pv on spam_pv.device_id= T.DEVICE_ID
LEFT JOIN dev_view on dev_view.device_id= T.DEVICE_ID
WHERE (spam_pv.device_id IS NULL or spam_pv.device_id = '')
and (dev.device_id is null or dev.device_id='')
GROUP by D.DEVICE_OS_TYPE,
D.ACTIVE_TYPE
"""
print(sql_search_ctr)
search_ctr_df = spark.sql(sql_search_ctr)
# spam_pv_df.createOrReplaceTempView("dev_view")
search_ctr_df.show(1)
sql_res = search_ctr_df.collect()
for res in sql_res:
print(res)
# for res in sql_res: # for res in sql_res:
# # print(res) # # print(res)
# day_id = res.day_id # day_id = res.day_id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment