Commit f607a28e authored by litaolemo's avatar litaolemo

update

parent 206cec24
...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso ...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = [] task_list = []
task_days = 3 task_days = 30
for t in range(1, task_days): for t in range(1, task_days):
day_num = 0 - t day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
...@@ -112,7 +112,7 @@ for t in range(1, task_days): ...@@ -112,7 +112,7 @@ for t in range(1, task_days):
and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more' and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare' ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') ,'search_result_wiki','search_result_question_answer')
AND card_content_type IN ('answer','qa') AND (card_content_type IN ('answer','qa','question') or card_type in ('answer','qa','question'))
)a )a
group by partition_day,card_content_type,device_id group by partition_day,card_content_type,device_id
)t1 )t1
......
...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso ...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = [] task_list = []
task_days = 3 task_days = 30
for t in range(1, task_days): for t in range(1, task_days):
day_num = 0 - t day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
...@@ -112,7 +112,7 @@ for t in range(1, task_days): ...@@ -112,7 +112,7 @@ for t in range(1, task_days):
and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more' and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare' ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') ,'search_result_wiki','search_result_question_answer')
AND card_content_type IN ('diary') AND (card_content_type IN ('diary') or card_type = 'diary')
)a )a
group by partition_day,card_content_type,device_id group by partition_day,card_content_type,device_id
)t1 )t1
......
...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso ...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = [] task_list = []
task_days = 3 task_days = 30
for t in range(1, task_days): for t in range(1, task_days):
day_num = 0 - t day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
...@@ -112,7 +112,7 @@ for t in range(1, task_days): ...@@ -112,7 +112,7 @@ for t in range(1, task_days):
and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more' and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare' ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') ,'search_result_wiki','search_result_question_answer')
AND card_content_type IN ('service') AND (card_content_type IN ('service') or card_type = 'service')
)a )a
group by partition_day,card_content_type,device_id group by partition_day,card_content_type,device_id
)t1 )t1
......
...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso ...@@ -73,7 +73,7 @@ spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJso
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = [] task_list = []
task_days = 3 task_days = 30
for t in range(1, task_days): for t in range(1, task_days):
day_num = 0 - t day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
...@@ -112,7 +112,7 @@ for t in range(1, task_days): ...@@ -112,7 +112,7 @@ for t in range(1, task_days):
and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more' and page_code in ('search_result_diary','search_result_doctor','search_result_hospital','search_result_more'
,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare' ,'search_result_more_infomation','search_result_more_user','search_result_post','search_result_welfare'
,'search_result_wiki','search_result_question_answer') ,'search_result_wiki','search_result_question_answer')
AND card_content_type IN ('user_post') AND (card_content_type IN ('user_post') or card_type = 'user_post')
)a )a
group by partition_day,card_content_type,device_id group by partition_day,card_content_type,device_id
)t1 )t1
......
...@@ -29,7 +29,17 @@ from pyspark.sql import SparkSession, DataFrame ...@@ -29,7 +29,17 @@ from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit # from pyspark.sql.functions import lit
# import pytispark.pytispark as pti # import pytispark.pytispark as pti
from elasticsearch import Elasticsearch
exists_es_dic = {}
es = Elasticsearch([
{
'host': '172.16.31.17',
'port': 9200,
}, {
'host': '172.16.31.11',
'port': 9200,
}])
def con_sql(sql): def con_sql(sql):
# 从数据库的表里获取数据 # 从数据库的表里获取数据
...@@ -78,81 +88,10 @@ spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF ...@@ -78,81 +88,10 @@ spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
huidu_device_id_sql = r"""
select t2.device_id from
--(select distinct(first_device) as device_id from online.ml_user_history_detail where partition_date = {today_str} and last_active_date >= {last_30_day_str}) t2
(select device_id from online.ml_device_day_active_status where partition_date = '{today_str}' and active_type in (1,2)) t2
LEFT JOIN
(
select distinct device_id
from ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除)
WHERE partition_day='{today_str}'
union all
select distinct device_id
from dim.dim_device_user_staff --去除内网用户
)spam_pv
on spam_pv.device_id=t2.device_id
LEFT JOIN
(
SELECT partition_date,device_id
FROM
(--找出user_id当天活跃的第一个设备id
SELECT user_id,partition_date,
if(size(device_list) > 0, device_list [ 0 ], '') AS device_id
FROM online.ml_user_updates
WHERE partition_date='{today_str}'
)t1
JOIN
( --医生账号
SELECT distinct user_id
FROM online.tl_hdfs_doctor_view
WHERE partition_date = '{today_str}'
--马甲账号/模特用户
UNION ALL
SELECT user_id
FROM ml.ml_c_ct_ui_user_dimen_d
WHERE partition_day = '{today_str}'
AND (is_puppet = 'true' or is_classifyuser = 'true')
UNION ALL
--公司内网覆盖用户
select distinct user_id
from dim.dim_device_user_staff
UNION ALL
--登陆过医生设备
SELECT distinct t1.user_id
FROM
(
SELECT user_id, v.device_id as device_id
FROM online.ml_user_history_detail
LATERAL VIEW EXPLODE(device_history_list) v AS device_id
WHERE partition_date = '{today_str}'
) t1
JOIN
(
SELECT device_id
FROM online.ml_device_history_detail
WHERE partition_date = '{today_str}'
AND is_login_doctor = '1'
) t2
ON t1.device_id = t2.device_id
)t2
on t1.user_id=t2.user_id
group by partition_date,device_id
)dev
on t2.device_id=dev.device_id
WHERE spam_pv.device_id IS NULL
and dev.device_id is null
""".format(today_str='20200926', last_30_day_str='20200926')
print(huidu_device_id_sql) # print(huidu_device_id_sql)
huidu_device_id_df = spark.sql(huidu_device_id_sql) # huidu_device_id_df = spark.sql(huidu_device_id_sql)
huidu_device_id_df.createOrReplaceTempView("dev_view") # huidu_device_id_df.createOrReplaceTempView("dev_view")
sql_search_ctr = r""" sql_search_ctr = r"""
SELECT query,search_pv,search_uv SELECT query,search_pv,search_uv
FROM FROM
...@@ -267,4 +206,10 @@ sql_res = search_ctr_df.collect() ...@@ -267,4 +206,10 @@ sql_res = search_ctr_df.collect()
print("-------------------------------") print("-------------------------------")
for res in sql_res: for res in sql_res:
print(res.query,res.search_pv) print(res.query,res.search_pv)
results = es.search(
index='gm-dbmw-diary-read',
doc_type='diary',
timeout='10s',
body=body
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment