# -*- coding:UTF-8 -*- # @Time : 2020/8/21 16:43 # @File : search_strategy_d.py # @email : litao@igengmei.com # @author : litao import hashlib import json import pymysql # import xlwt import datetime # import redis # from pyhive import hive # from maintenance.func_send_email_with_file import send_file_email # from typing import Dict, List # from elasticsearch_7 import Elasticsearch # from elasticsearch_7.helpers import scan # import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame # from pyspark.sql.functions import lit # import pytispark.pytispark as pti db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm', db='doris_olap') cursor = db.cursor() startTime = time.time() sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") # sparkConf.set("spark.tispark.plan.allow_index_double_read", False) # sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.sql.adaptive.enabled", True) sparkConf.set("spark.sql.adaptive.skewedJoin.enabled", True) sparkConf.set("spark.shuffle.statistics.verbose", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") # sparkConf.set("mapreduce.output.fileoutputformat.compress", False) # sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") # sparkConf.set("prod.jerry.jdbcuri", # "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") # sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("spark.sql.parquet.compression.codec", "snappy") # sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") # sparkConf.set("prod.tidb.database", "jerry_prod") # sparkConf.set("spark.executor.extraJavaOptions", "-Djava.library.path=HADOOP_HOME/lib/native") sparkConf.set("spark.driver.extraLibraryPath", "/opt/hadoop/lib/native") # sparkConf.set("spark.driver.extraJavaOptions", "-Djava.library.path=HADOOP_HOME/lib/native") spark = (SparkSession.builder.config(conf=sparkConf).appName( "search_strategy_d").enableHiveSupport().getOrCreate()) spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") # spark.sql("ADD JAR /srv/apps/meta_base_code/snappy-java-1.1.2.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") task_list = [] task_days = 5 for t in range(0, task_days): day_num = 0 - t now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d") today_str = now.strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") sql_dev_device_id = """ SELECT partition_date,device_id FROM (--找出user_id当天活跃的第一个设备id SELECT user_id,partition_date, if(size(device_list) > 0, device_list [ 0 ], '') AS device_id FROM online.ml_user_updates WHERE partition_date>='{yesterday_str}' AND partition_date<'{today_str}' )t1 JOIN ( --医生账号 SELECT distinct user_id FROM online.tl_hdfs_doctor_view WHERE partition_date = '{yesterday_str}' --马甲账号/模特用户 UNION ALL SELECT user_id FROM ml.ml_c_ct_ui_user_dimen_d WHERE partition_day = '{yesterday_str}' AND (is_puppet = 'true' or is_classifyuser = 'true') UNION ALL --公司内网覆盖用户 select distinct user_id from dim.dim_device_user_staff UNION ALL --登陆过医生设备 SELECT distinct t1.user_id FROM ( SELECT user_id, v.device_id as device_id FROM online.ml_user_history_detail LATERAL VIEW EXPLODE(device_history_list) v AS device_id WHERE partition_date = '{yesterday_str}' )t1 JOIN ( SELECT device_id FROM online.ml_device_history_detail WHERE partition_date = '{yesterday_str}' AND is_login_doctor = '1' )t2 ON t1.device_id = t2.device_id )t2 on t1.user_id=t2.user_id group by partition_date,device_id """.format(yesterday_str=yesterday_str, today_str=today_str) print(sql_dev_device_id) dev_df = spark.sql(sql_dev_device_id) dev_df_view = dev_df.createOrReplaceTempView("dev_view") dev_df.cache() dev_df.show(1) sql_res = dev_df.collect() # for res in sql_res: # print(res) print("-------------------------------") sql_spam_pv_device_id = """ select distinct device_id from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D where PARTITION_DAY = '{yesterday_str}' AND is_abnormal_device = 'true' """.format(yesterday_str=yesterday_str) print(sql_spam_pv_device_id) spam_pv_df = spark.sql(sql_spam_pv_device_id) spam_pv_df.createOrReplaceTempView("spam_pv") spam_pv_df.show(1) sql_res = spam_pv_df.collect() spam_pv_df.cache() # for res in sql_res: # print(res) print("-------------------------------") sql = r""" SELECT t3.partition_date as partition_date ,t3.device_os_type as device_os_type ,t3.active_type as active_type ,t3.channel as channel ,NVL(t3.search_pv,0) as pv ,NVL(t3.search_uv,0) as uv ,if(NVL(t3.search_uv,0) <> 0 ,cast((NVL(t4.hexin_card_click_pv,0)/NVL(t3.search_uv,0)) as decimal(18,5)) , 0) as search_core_pv ,if(NVL(t3.search_uv,0) <> 0 ,cast((NVL(t4.neirong_card_click_pv,0)/NVL(t3.search_uv,0)) as decimal(18,5)) , 0) as search_pv FROM (--昨天总搜索量 SELECT partition_date,active_type,device_os_type,channel,search_pv,search_uv FROM ( SELECT t1.partition_date,active_type,device_os_type,channel ,count(t1.cl_id) as search_pv ,count(distinct t1.cl_id) as search_uv FROM ( SELECT partition_date ,params['query'] as query ,cl_id FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND action in ('do_search','search_result_click_search') UNION ALL SELECT partition_date,coalesce(params['query'],params['card_name']) as query,cl_id FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND action = 'on_click_card' AND params['page_name']='search_home' UNION ALL SELECT partition_date ,params['card_name'] as query ,cl_id FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND action = 'on_click_card' AND params['in_page_pos']='猜你喜欢' AND params['tab_name']='精选' AND params['card_type']='search_word' --AND page_name='home' android的page_name为空 UNION ALL SELECT partition_date ,params['card_name'] as query ,cl_id FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND action = 'on_click_card' AND page_name='welfare_home' AND params['card_type'] ='search_word' AND params['in_page_pos']='大家都在搜' UNION ALL SELECT partition_date ,params['card_name'] as query ,cl_id FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND int(split(app_version,'\\.')[1]) >= 27 AND action='on_click_card' AND params['card_type']='highlight_word' )t1 JOIN ( SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type FROM ( SELECT partition_date,m.device_id ,array(device_os_type ,'合计') as device_os_type ,array(case WHEN active_type = '4' THEN '老活' WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel FROM online.ml_device_day_active_status m LEFT JOIN (SELECT code,is_ai_channel,partition_day FROM DIM.DIM_AI_CHANNEL_ZP_NEW WHERE partition_day>= {yesterday_str} AND partition_day < {today_str}) tmp ON m.partition_date=tmp.partition_day AND first_channel_source_type=code WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND active_type in ('1','2','4') AND first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') AND first_channel_source_type not like 'promotion\_jf\_%' ) mas LATERAL VIEW explode(mas.channel) t2 AS channel LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type LATERAL VIEW explode(mas.active_type) t2 AS active_type )t2 on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date LEFT JOIN spam_pv on spam_pv.device_id=t1.cl_id LEFT JOIN dev_view on t1.partition_date=dev_view.partition_date and t1.cl_id=dev_view.device_id WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') and (dev_view.device_id is null or dev_view.device_id ='') GROUP BY t1.partition_date,t2.active_type,device_os_type,channel )t )t3 LEFT JOIN (--昨天搜索结果页卡片点击pv SELECT t1.partition_date,active_type,device_os_type,channel ,sum(hexin) as hexin_card_click_pv ,sum(neirong) as neirong_card_click_pv FROM ( SELECT NVL(t2.partition_date,t3.partition_date) as partition_date ,NVL(t2.cl_id,t3.cl_id) as cl_id ,NVL(t2.query,t3.query) as query ,NVL(t2.pv,0) as hexin ,NVL(t3.pv,0) as neirong FROM (--核心卡片点击 SELECT partition_date ,params['query'] as query ,cl_id ,count(1) as pv FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND ((action in ('search_result_click_recommend_item','search_result_welfare_click_item','search_result_hospital_click_item','search_result_doctor_click_item','on_click_doctor_card', 'on_click_hospital_card') AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor')) or (action = 'goto_welfare_detail' AND params [ 'from' ] = 'search_result_welfare_recommend') or (action = 'on_click_card' AND params['card_content_type'] in ('service','hospital','doctor') AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor')) or (action = 'on_click_button' AND params['button_name'] = 'check_plan' AND page_name = 'search_result_more')) GROUP BY partition_date ,params['query'] ,cl_id )t2 FULL JOIN (--内容卡片点击 SELECT partition_date ,params['query'] as query ,cl_id ,count(1) as pv FROM online.bl_hdfs_maidian_updates WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND ((action in ('on_click_topic_card','on_click_diary_card','search_result_click_infomation_item') AND page_name in ('search_result_more','search_result_diary','search_result_post')) or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer'))) GROUP BY partition_date ,params['query'] ,cl_id )t3 on t3.partition_date=t2.partition_date AND t3.query=t2.query AND t3.cl_id=t2.cl_id )t1 JOIN ( SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type FROM ( SELECT partition_date,m.device_id ,array(device_os_type ,'合计') as device_os_type ,array(case WHEN active_type = '4' THEN '老活' WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel FROM online.ml_device_day_active_status m LEFT JOIN (SELECT code,is_ai_channel,partition_day FROM DIM.DIM_AI_CHANNEL_ZP_NEW WHERE partition_day>= {yesterday_str} AND partition_day < {today_str}) tmp ON m.partition_date=tmp.partition_day AND first_channel_source_type=code WHERE partition_date >= {yesterday_str} AND partition_date < {today_str} AND active_type in ('1','2','4') AND first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') AND first_channel_source_type not like 'promotion\_jf\_%' ) mas LATERAL VIEW explode(mas.channel) t2 AS channel LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type LATERAL VIEW explode(mas.active_type) t2 AS active_type )dev on t1.cl_id=dev.device_id and t1.partition_date = dev.partition_date LEFT JOIN spam_pv on spam_pv.device_id=t1.cl_id LEFT JOIN dev_view on t1.partition_date=dev_view.partition_date and t1.cl_id=dev_view.device_id WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') and (dev_view.device_id is null or dev_view.device_id ='') GROUP BY t1.partition_date,active_type,device_os_type,channel )t4 on t3.partition_date=t4.partition_date and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel """.format(today_str=today_str, yesterday_str=yesterday_str) device_df = spark.sql(sql) device_df.show(1, False) sql_res = device_df.collect() # for res in sql_res: # print(res) device_df.createOrReplaceTempView("data_table") # # sql = r""" # SELECT t3.partition_date as partition_date # ,t3.device_os_type as device_os_type # ,t3.active_type as active_type # ,t3.channel as channel # ,NVL(t3.search_pv,0) as pv # ,NVL(t3.search_uv,0) as uv # ,if(NVL(t3.search_uv,0) <> 0 ,concat(cast((NVL(t4.hexin_card_click_pv,0)/NVL(t3.search_uv,0)) as decimal(18,2)),'') , '-') as search_core_pv # ,if(NVL(t3.search_uv,0) <> 0 ,concat(cast((NVL(t4.neirong_card_click_pv,0)/NVL(t3.search_uv,0)) as decimal(18,2)),'') , '-') as search_pv # FROM # (--昨天总搜索量 # SELECT partition_date,active_type,device_os_type,channel,search_pv,search_uv # FROM # ( # SELECT t1.partition_date,active_type,device_os_type,channel # ,count(t1.cl_id) as search_pv # ,count(distinct t1.cl_id) as search_uv # FROM # ( # SELECT partition_date # ,params['query'] as query # ,cl_id # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND action in ('do_search','search_result_click_search') # # UNION ALL # SELECT partition_date,params['query'] as query,cl_id # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND action = 'on_click_card' # AND params['page_name']='search_home' # # UNION ALL # SELECT partition_date # ,params['card_name'] as query # ,cl_id # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND action = 'on_click_card' # AND params['in_page_pos']='猜你喜欢' # AND params['tab_name']='精选' # AND params['card_type']='search_word' # --AND page_name='home' android的page_name为空 # # UNION ALL # SELECT partition_date # ,params['card_name'] as query # ,cl_id # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND action = 'on_click_card' # AND page_name='welfare_home' # AND params['card_type'] ='search_word' # AND params['in_page_pos']='大家都在搜' # # UNION ALL # SELECT partition_date # ,params['card_name'] as query # ,cl_id # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND int(split(app_version,'\\.')[1]) >= 27 # AND action='on_click_card' # AND params['card_type']='highlight_word' # )t1 # JOIN # ( # SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type # FROM # ( # SELECT # partition_date,m.device_id # ,array(device_os_type ,'合计') as device_os_type # ,array(case WHEN active_type = '4' THEN '老活' # WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type # ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel # FROM online.ml_device_day_active_status m # LEFT JOIN # (SELECT code,is_ai_channel,partition_day # FROM DIM.DIM_AI_CHANNEL_ZP_NEW # WHERE partition_day>= {yesterday_str} # AND partition_day < {today_str}) tmp # ON m.partition_date=tmp.partition_day AND first_channel_source_type=code # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND active_type in ('1','2','4') # AND first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' # ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' # ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' # ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' # ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' # ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' # ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') # AND first_channel_source_type not like 'promotion\_jf\_%' # ) mas # LATERAL VIEW explode(mas.channel) t2 AS channel # LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type # LATERAL VIEW explode(mas.active_type) t2 AS active_type # )t2 # on t1.cl_id=t2.device_id AND t1.partition_date = t2.partition_date # # LEFT JOIN # ( # SELECT DISTINCT device_id # FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) # WHERE partition_day={yesterday_str} # # UNION ALL # SELECT DISTINCT device_id # FROM dim.dim_device_user_staff --去除内网用户 # )spam_pv # on spam_pv.device_id=t1.cl_id # LEFT JOIN # ( # SELECT partition_date,device_id # FROM # (--找出user_id当天活跃的第一个设备id # SELECT user_id,partition_date, # if(size(device_list) > 0, device_list [ 0 ], '') AS device_id # FROM online.ml_user_updates # WHERE partition_date>='${{yesterday_str}}' AND partition_date<'${{today_str}}' # )t1 # JOIN # ( --医生账号 # SELECT distinct user_id # FROM online.tl_hdfs_doctor_view # WHERE partition_date = {yesterday_str} # # --马甲账号/模特用户 # UNION ALL # SELECT user_id # FROM ml.ml_c_ct_ui_user_dimen_d # WHERE partition_day = {yesterday_str} # AND (is_puppet = 'true' or is_classifyuser = 'true') # # UNION ALL # --公司内网覆盖用户 # select distinct user_id # from dim.dim_device_user_staff # # UNION ALL # --登陆过医生设备 # SELECT distinct t1.user_id # FROM # ( # SELECT user_id, v.device_id as device_id # FROM online.ml_user_history_detail # LATERAL VIEW EXPLODE(device_history_list) v AS device_id # WHERE partition_date = {yesterday_str} # )t1 # JOIN # ( # SELECT device_id # FROM online.ml_device_history_detail # WHERE partition_date = {yesterday_str} # AND is_login_doctor = '1' # )t2 # ON t1.device_id = t2.device_id # )t2 # on t1.user_id=t2.user_id # group by partition_date,device_id # )dev # on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id # WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') # and (dev.device_id is null or dev.device_id ='') # GROUP BY t1.partition_date,t2.active_type,device_os_type,channel # )t # )t3 # # LEFT JOIN # (--昨天搜索结果页卡片点击pv # SELECT t1.partition_date,active_type,device_os_type,channel # ,sum(hexin) as hexin_card_click_pv # ,sum(neirong) as neirong_card_click_pv # FROM # ( # SELECT NVL(t2.partition_date,t3.partition_date) as partition_date # ,NVL(t2.cl_id,t3.cl_id) as cl_id # ,NVL(t2.query,t3.query) as query # ,NVL(t2.pv,0) as hexin # ,NVL(t3.pv,0) as neirong # FROM # (--核心卡片点击 # SELECT partition_date # ,params['query'] as query # ,cl_id # ,count(1) as pv # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND ((action in ('search_result_click_recommend_item','search_result_welfare_click_item','search_result_hospital_click_item','search_result_doctor_click_item','on_click_doctor_card', 'on_click_hospital_card') # AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor')) # or (action = 'goto_welfare_detail' AND params [ 'from' ] = 'search_result_welfare_recommend') # or (action = 'on_click_card' AND params['card_content_type'] in ('service','hospital','doctor') AND page_name in ('search_result_more','search_result_welfare','search_result_hospital','search_result_doctor')) # or (action = 'on_click_button' AND params['button_name'] = 'check_plan' AND page_name = 'search_result_more')) # GROUP BY partition_date # ,params['query'] # ,cl_id # )t2 # FULL JOIN # (--内容卡片点击 # SELECT partition_date # ,params['query'] as query # ,cl_id # ,count(1) as pv # FROM online.bl_hdfs_maidian_updates # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND ((action in ('on_click_topic_card','on_click_diary_card','search_result_click_infomation_item') # AND page_name in ('search_result_more','search_result_diary','search_result_post')) # or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer'))) # GROUP BY partition_date # ,params['query'] # ,cl_id # )t3 # on t3.partition_date=t2.partition_date # AND t3.query=t2.query # AND t3.cl_id=t2.cl_id # )t1 # JOIN # ( # SELECT partition_date,device_id,t2.active_type,t2.channel,t2.device_os_type # FROM # ( # SELECT # partition_date,m.device_id # ,array(device_os_type ,'合计') as device_os_type # ,array(case WHEN active_type = '4' THEN '老活' # WHEN active_type in ('1','2') then '新增' END ,'合计') as active_type # ,array(CASE WHEN is_ai_channel = 'true' THEN 'AI' ELSE '其他' END , '合计') as channel # FROM online.ml_device_day_active_status m # LEFT JOIN # (SELECT code,is_ai_channel,partition_day # FROM DIM.DIM_AI_CHANNEL_ZP_NEW # WHERE partition_day>= {yesterday_str} # AND partition_day < {today_str}) tmp # ON m.partition_date=tmp.partition_day AND first_channel_source_type=code # WHERE partition_date >= {yesterday_str} # AND partition_date < {today_str} # AND active_type in ('1','2','4') # AND first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' # ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' # ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' # ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' # ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' # ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' # ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') # AND first_channel_source_type not like 'promotion\_jf\_%' # ) mas # LATERAL VIEW explode(mas.channel) t2 AS channel # LATERAL VIEW explode(mas.device_os_type) t2 AS device_os_type # LATERAL VIEW explode(mas.active_type) t2 AS active_type # )dev # on t1.cl_id=dev.device_id and t1.partition_date = dev.partition_date # LEFT JOIN # ( # SELECT DISTINCT device_id # FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) # WHERE partition_day={yesterday_str} # # UNION ALL # SELECT DISTINCT device_id # FROM dim.dim_device_user_staff --去除内网用户 # )spam_pv # on spam_pv.device_id=t1.cl_id # LEFT JOIN # ( # SELECT partition_date,device_id # FROM # (--找出user_id当天活跃的第一个设备id # SELECT user_id,partition_date, # if(size(device_list) > 0, device_list [ 0 ], '') AS device_id # FROM online.ml_user_updates # WHERE partition_date>='${{yesterday_str}}' AND partition_date<'${{today_str}}' # )t1 # JOIN # ( --医生账号 # SELECT distinct user_id # FROM online.tl_hdfs_doctor_view # WHERE partition_date = {yesterday_str} # # --马甲账号/模特用户 # UNION ALL # SELECT user_id # FROM ml.ml_c_ct_ui_user_dimen_d # WHERE partition_day = {yesterday_str} # AND (is_puppet = 'true' or is_classifyuser = 'true') # # UNION ALL # --公司内网覆盖用户 # select distinct user_id # from dim.dim_device_user_staff # # UNION ALL # --登陆过医生设备 # SELECT distinct t1.user_id # FROM # ( # SELECT user_id, v.device_id as device_id # FROM online.ml_user_history_detail # LATERAL VIEW EXPLODE(device_history_list) v AS device_id # WHERE partition_date = {yesterday_str} # )t1 # JOIN # ( # SELECT device_id # FROM online.ml_device_history_detail # WHERE partition_date = {yesterday_str} # AND is_login_doctor = '1' # )t2 # ON t1.device_id = t2.device_id # )t2 # on t1.user_id=t2.user_id # group by t1.partition_date,device_id # )dev1 # on t1.partition_date=dev1.partition_date and t1.cl_id=dev1.device_id # WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') # and (dev1.device_id is null or dev1.device_id ='') # GROUP BY t1.partition_date,active_type,device_os_type,channel # )t4 # on t3.partition_date=t4.partition_date and t3.active_type=t4.active_type and t3.device_os_type = t4.device_os_type AND t3.channel = t4.channel # # # # """.format(today_str=today_str, yesterday_str=yesterday_str, ) # device_df = spark.sql(sql) # device_df.show(1, False) # sql_res = device_df.collect() # for res in sql_res: # print(res) # device_df.createOrReplaceTempView("data_table") # # collects_sql = """ # SELECT device_type,active_type,channel_type,ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_core_pv),0)/NVL(sum(uv),0) ,0),5) as core_pv_division_uv, # ROUND(if(NVL(sum(uv),0) <> 0 ,NVL(sum(search_pv),0)/NVL(sum(uv),0) , 0),5) as pv_division_uv # FROM data_table GROUP BY device_type,active_type,channel_type # """ # finnal_df = spark.sql(collects_sql) # # finnal_df.show(1, False) # sql_res = finnal_df.collect() for res in sql_res: # print(res) device_type = res.device_os_type active_type = res.active_type channel_type = res.channel core_pv_division_uv = res.search_core_pv pv_division_uv = res.search_pv pid = hashlib.md5( (today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest() instert_sql = """replace into search_strategy_d( day_id,device_type,active_type,channel_type,core_pv_division_uv,pv_division_uv,pid ) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}',{core_pv_division_uv},{pv_division_uv},'{pid}');""".format( day_id=today_str, device_type=device_type, active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid ) print(instert_sql) # cursor.execute("set names 'UTF8'") res = cursor.execute(instert_sql) db.commit() print(res) db.close()