Commit 98c206ca authored by litaolemo's avatar litaolemo

update

parent 047f8c17
......@@ -81,16 +81,73 @@ for t in range(1, task_days):
today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
sql_spam_pv_device_id = """
sql_dev_device_id = """
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{yesterday_str}'
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{yesterday_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{yesterday_str}'
)t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{yesterday_str}'
AND is_login_doctor = '1'
)t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id""".format(yesterday_str=yesterday_str, today_str=today_str)
print(sql_dev_device_id)
dev_df = spark.sql(sql_dev_device_id)
dev_df_view = dev_df.createOrReplaceTempView("dev_view")
dev_df_view.show(1)
sql_res = dev_df_view.collect()
for res in sql_res:
print(res)
print("-------------------------------")
sql_spam_pv_device_id = """
SELECT DISTINCT device_id
FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
WHERE partition_day={yesterday_str}
UNION ALL
SELECT DISTINCT dev.device_id
FROM dim.dim_device_user_staff --去除内网用户
""".format(yesterday_str=yesterday_str, today_str=today_str)
SELECT DISTINCT dev_view.device_id
FROM dev_view.dim_device_user_staff --去除内网用户
""".format(yesterday_str=yesterday_str)
print(sql_spam_pv_device_id)
spam_pv_df = spark.sql(sql_spam_pv_device_id)
spam_pv_view = spam_pv_df.createOrReplaceTempView("spam_pv")
......@@ -100,63 +157,7 @@ for t in range(1, task_days):
print(res)
print("-------------------------------")
sql_dev_device_id = """
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{yesterday_str}'
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{yesterday_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{yesterday_str}'
)t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{yesterday_str}'
AND is_login_doctor = '1'
)t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id""".format(yesterday_str=yesterday_str, today_str=today_str)
print(sql_dev_device_id)
dev_df = spark.sql(sql_dev_device_id)
dev_df_view = spam_pv_df.createOrReplaceTempView("dev_view")
dev_df_view.show(1)
sql_res = dev_df_view.collect()
for res in sql_res:
print(res)
print("-------------------------------")
sql = r"""
SELECT t3.partition_date as partition_date
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment