Commit ad65b004 authored by litaolemo's avatar litaolemo

update

parent d0ab1290
......@@ -91,155 +91,89 @@ for t in range(2, task_days):
yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
new_urser_device_id_sql = r"""
select t2.device_id as device_id from
(select device_id from online.ml_device_day_active_status where partition_date = '{today_str}' and active_type in (1,2)) t2
LEFT join (
select first_device from online.ml_user_history_detail where partition_date = '{tomorrow_str}' and last_active_date = '{today_str}'
) on first_device = t2.device_id
LEFT JOIN
(
select distinct device_id
from ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
WHERE partition_day='{today_str}'
union all
select distinct device_id
from dim.dim_device_user_staff --去除内网用户
)spam_pv
on spam_pv.device_id=t2.device_id
LEFT JOIN
(
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date='{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{today_str}'
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{today_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{today_str}'
) t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{today_str}'
AND is_login_doctor = '1'
) t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev
on t2.device_id=dev.device_id
WHERE spam_pv.device_id IS NULL
and dev.device_id is null and first_device is not null
""".format(today_str=today_str, yesterday_str_format=yesterday_str_format, today_str_format=today_str_format,tomorrow_str=tomorrow_str)
print(new_urser_device_id_sql)
select cl_type,app_version,count(app_version) as count_num from online.bl_hdfs_maidian_updates where partition_date >= "20200222" group by cl_type,app_version
""".format(start_date="20210201", end_date="20200222", today_str_format=today_str_format,tomorrow_str=tomorrow_str)
# print(new_urser_device_id_sql)
new_urser_device_id_df = spark.sql(new_urser_device_id_sql)
new_urser_device_id_df.createOrReplaceTempView("device_id_view")
new_urser_device_id_df.show(1)
sql_res = new_urser_device_id_df.collect()
res_dict = {}
portrait_dict = {
"first_demands": {},
"second_demands": {},
"first_solutions": {},
"second_solutions": {},
"first_positions": {},
"second_positions": {},
"projects": {},
'anecdote_tags':{}
}
no_portrait_device_id_list = []
print("-------------------------------")
count_not_has_portratit = 0
for count_user_count, res in enumerate(sql_res):
# print(count, res)
portratit_res = get_user_portrait_tag3_from_redis(res.device_id)
sql = """select cl_id, projects from kafka_tag3_log
where cl_id = '%s' and event_cn = 'kyc' """ % res.device_id
# print(count_user_count, res, portratit_res)
sql_res_list = con_sql(sql)
kyc_str_list= []
if sql_res_list:
print(sql_res_list,type(sql_res_list))
kyc_str_list = sql_res_list[0][1].split(",")
temp_count = 0
for demand in portratit_res:
if portratit_res[demand]:
try:
for tag in portratit_res[demand][0:3]:
if tag in portrait_dict[demand]:
portrait_dict[demand][tag] += 1
else:
portrait_dict[demand][tag] = 1
if tag in kyc_str_list and demand == "projects":
if portrait_dict["projects"].get(tag):
portrait_dict["projects"][tag] -= 1
except Exception as e:
print("error ", e)
temp_count += 1
if not temp_count:
count_not_has_portratit += 1
no_portrait_device_id_list.append(res.device_id)
print(portrait_dict)
print(count_user_count+1,count_not_has_portratit)
print("-------------------------------")
for protratit_type in portrait_dict["projects"]:
partition_date = today_str
pid = hashlib.md5((partition_date + protratit_type).encode("utf8")).hexdigest()
action_count = portrait_dict["projects"][protratit_type]
instert_sql = """replace into new_user_project_count(
partition_day,pid,protratit_count,protratit_type) VALUES('{partition_day}','{pid}',{protratit_count},'{protratit_type}');""".format(
partition_day=today_str, pid=pid, protratit_count=action_count
, protratit_type=protratit_type
)
print(instert_sql)
# cursor.execute("set names 'UTF8'")
db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
db='doris_olap')
cursor = db.cursor()
res = cursor.execute(instert_sql)
db.commit()
for res in sql_res:
print(res)
# sql_res = new_urser_device_id_df.collect()
# res_dict = {}
# portrait_dict = {
# "first_demands": {},
# "second_demands": {},
# "first_solutions": {},
# "second_solutions": {},
# "first_positions": {},
# "second_positions": {},
# "projects": {},
# 'anecdote_tags':{}
# }
# no_portrait_device_id_list = []
# print("-------------------------------")
# count_not_has_portratit = 0
#
# for count_user_count, res in enumerate(sql_res):
# # print(count, res)
# portratit_res = get_user_portrait_tag3_from_redis(res.device_id)
# sql = """select cl_id, projects from kafka_tag3_log
# where cl_id = '%s' and event_cn = 'kyc' """ % res.device_id
# # print(count_user_count, res, portratit_res)
# sql_res_list = con_sql(sql)
# kyc_str_list= []
# if sql_res_list:
# print(sql_res_list,type(sql_res_list))
# kyc_str_list = sql_res_list[0][1].split(",")
#
# temp_count = 0
# for demand in portratit_res:
# if portratit_res[demand]:
# try:
# for tag in portratit_res[demand][0:3]:
# if tag in portrait_dict[demand]:
# portrait_dict[demand][tag] += 1
# else:
# portrait_dict[demand][tag] = 1
# if tag in kyc_str_list and demand == "projects":
# if portrait_dict["projects"].get(tag):
# portrait_dict["projects"][tag] -= 1
# except Exception as e:
# print("error ", e)
#
# temp_count += 1
# if not temp_count:
# count_not_has_portratit += 1
# no_portrait_device_id_list.append(res.device_id)
#
#
# print(portrait_dict)
# print(count_user_count+1,count_not_has_portratit)
# print("-------------------------------")
#
#
# for protratit_type in portrait_dict["projects"]:
# partition_date = today_str
# pid = hashlib.md5((partition_date + protratit_type).encode("utf8")).hexdigest()
# action_count = portrait_dict["projects"][protratit_type]
#
# instert_sql = """replace into new_user_project_count(
# partition_day,pid,protratit_count,protratit_type) VALUES('{partition_day}','{pid}',{protratit_count},'{protratit_type}');""".format(
# partition_day=today_str, pid=pid, protratit_count=action_count
# , protratit_type=protratit_type
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
# db='doris_olap')
# cursor = db.cursor()
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
#
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment