Commit bbcb6312 authored by litaolemo's avatar litaolemo

update

parent 3fbbc173
...@@ -20,6 +20,7 @@ import datetime ...@@ -20,6 +20,7 @@ import datetime
import time import time
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit # from pyspark.sql.functions import lit
# import pytispark.pytispark as pti # import pytispark.pytispark as pti
...@@ -80,7 +81,7 @@ for t in range(1, task_days): ...@@ -80,7 +81,7 @@ for t in range(1, task_days):
today_str = now.strftime("%Y%m%d") today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
sql_distinct_device_id = """ sql_spam_pv_device_id = """
SELECT DISTINCT device_id SELECT DISTINCT device_id
FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
...@@ -90,61 +91,70 @@ for t in range(1, task_days): ...@@ -90,61 +91,70 @@ for t in range(1, task_days):
SELECT DISTINCT dev.device_id SELECT DISTINCT dev.device_id
FROM dim.dim_device_user_staff --去除内网用户 FROM dim.dim_device_user_staff --去除内网用户
UNION ALL ) """.format(yesterday_str=yesterday_str, today_str=today_str)
( print(sql_spam_pv_device_id)
SELECT t1.device_id spam_pv_df = spark.sql(sql_spam_pv_device_id)
FROM spam_pv_view = spam_pv_df.createOrReplaceTempView("spam_pv")
(--找出user_id当天活跃的第一个设备id spam_pv_df.show(1)
SELECT user_id,partition_date, sql_res = spam_pv_df.collect()
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id for res in sql_res:
FROM online.ml_user_updates print(res)
WHERE partition_date>='${{yesterday_str}}' AND partition_date<'${{today_str}}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = {yesterday_str}
--马甲账号/模特用户 print("-------------------------------")
UNION ALL sql_dev_device_id = """
SELECT user_id SELECT partition_date,device_id
FROM ml.ml_c_ct_ui_user_dimen_d FROM
WHERE partition_day = {yesterday_str} (--找出user_id当天活跃的第一个设备id
AND (is_puppet = 'true' or is_classifyuser = 'true') SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{yesterday_str}'
UNION ALL --马甲账号/模特用户
--公司内网覆盖用户 UNION ALL
select distinct user_id SELECT user_id
from dim.dim_device_user_staff FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{yesterday_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL UNION ALL
--登陆过医生设备 --公司内网覆盖用户
SELECT distinct t1.user_id select distinct user_id
FROM from dim.dim_device_user_staff
(
SELECT user_id, v.device_id as device_id UNION ALL
FROM online.ml_user_history_detail --登陆过医生设备
LATERAL VIEW EXPLODE(device_history_list) v AS device_id SELECT distinct t1.user_id
WHERE partition_date = {yesterday_str} FROM
)t1 (
JOIN SELECT user_id, v.device_id as device_id
( FROM online.ml_user_history_detail
SELECT device_id LATERAL VIEW EXPLODE(device_history_list) v AS device_id
FROM online.ml_device_history_detail WHERE partition_date = '{yesterday_str}'
WHERE partition_date = {yesterday_str} )t1
AND is_login_doctor = '1' JOIN
)t2 (
ON t1.device_id = t2.device_id SELECT device_id
)t2 FROM online.ml_device_history_detail
on t1.user_id=t2.user_id WHERE partition_date = '{yesterday_str}'
group by device_id AND is_login_doctor = '1'
)dev """.format(yesterday_str=yesterday_str,today_str=today_str) )t2
print(sql_distinct_device_id) ON t1.device_id = t2.device_id
distinct_device_id_df = spark.sql(sql_distinct_device_id,) )t2
distinct_device_id_view = distinct_device_id_df.createOrReplaceTempView("distinct_device_id_view") on t1.user_id=t2.user_id
distinct_device_id_df.show(1) group by partition_date,device_id
sql_res = distinct_device_id_df.collect() ) """.format(yesterday_str=yesterday_str, today_str=today_str)
print(sql_dev_device_id)
dev_df = spark.sql(sql_dev_device_id)
dev_df_view = spam_pv_df.createOrReplaceTempView("dev_view")
dev_df_view.show(1)
sql_res = dev_df_view.collect()
for res in sql_res: for res in sql_res:
print(res) print(res)
...@@ -257,10 +267,12 @@ FROM ...@@ -257,10 +267,12 @@ FROM
)t2 )t2
on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date
LEFT JOIN distinct_device_id_view LEFT JOIN spam_pv on spam_pv.device_id=t1.cl_id
on t1.partition_date=distinct_device_id_view.partition_date AND t1.cl_id=distinct_device_id_view.device_id LEFT JOIN dev_view
WHERE (distinct_device_id_view.dev_device_id IS NULL or distinct_device_id_view.dev_device_id ='') on t1.partition_date=dev_view.partition_date and t1.cl_id=dev_view.device_id
GROUP BY t1.partition_date,t2.active_type,device_os_type,channel WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='')
and (dev_view.device_id is null or dev_view.device_id ='')
GROUP BY t1.partition_date,active_type,device_os_type,channel
)t )t
)t3 )t3
...@@ -348,11 +360,13 @@ LEFT JOIN ...@@ -348,11 +360,13 @@ LEFT JOIN
LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type
LATERAL VIEW explode(mas.active_type) t2 AS active_type LATERAL VIEW explode(mas.active_type) t2 AS active_type
)dev )dev
on t1.cl_id=dev.device_id and t1.partition_date = dev.partition_date on t1.cl_id=dev.device_id and t1.partition_date = dev.partition_date
LEFT JOIN distinct_device_id_view LEFT JOIN spam_pv on spam_pv.device_id=t1.cl_id
on t1.cl_id=distinct_device_id_view.device_id LEFT JOIN dev_view
WHERE (distinct_device_id_view.dev_device_id IS NULL or distinct_device_id_view.dev_device_id ='') on t1.partition_date=dev_view.partition_date and t1.cl_id=dev_view.device_id
GROUP BY t1.partition_date,active_type,device_os_type,channel WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='')
and (dev_view.device_id is null or dev_view.device_id ='')
GROUP BY t1.partition_date,active_type,device_os_type,channel
)t4 )t4
on t3.partition_date=t4.partition_date and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel on t3.partition_date=t4.partition_date and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel
""".format(today_str=today_str, yesterday_str=yesterday_str, ) """.format(today_str=today_str, yesterday_str=yesterday_str, )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment