from pyspark import SparkConf from pyspark.sql import SparkSession from pytispark import pytispark as pti from utils.date import get_ndays_before def connect_doris(spark, table): return spark.read.format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://172.16.30.136:3306/doris_prod") \ .option("dbtable", table) \ .option("user", "doris") \ .option("password", "o5gbA27hXHHm") \ .load() def get_content_tag3(spark, card_type): if card_type == "diary": content_tag3 = connect_doris(spark, "strategy_content_tagv3_info") elif card_type == "user_post": content_tag3 = connect_doris(spark, "strategy_tractate_tagv3_info") else: content_tag3 = connect_doris(spark, "strategy_answer_tagv3_info") return content_tag3 def get_spark(app_name=""): sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.map.output.compress", False) spark = SparkSession.builder.config(conf=sparkConf).config( "spark.sql.extensions", "org.apache.spark.sql.TiExtensions").config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName(app_name).enableHiveSupport().getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # sc.addPyFile("/srv/apps/strategy_embedding/utils/date.py") ti = pti.TiContext(spark) ti.tidbMapDatabase("jerry_test") return spark def get_device_tags(spark): sql = """ SELECT cl_id, first_demands, first_solutions, first_positions, second_demands, second_solutions, second_positions, projects, business_tags FROM user_tag3_portrait WHERE date = '{}' """.format(get_ndays_before(1)) return spark.sql(sql).toPandas() def get_click_data(spark, card_type, start, end): reg = r"""^\\d+$""" sql = """ SELECT DISTINCT t1.cl_id device_id, cast(t1.business_id as int) card_id,t1.partition_date FROM (select partition_date,cl_id,business_id,action,page_name,page_stay,time_stamp from online.bl_hdfs_maidian_updates where action = 'page_view' AND partition_date BETWEEN '{}' AND '{}' AND page_name='{}_detail' AND page_stay>=5 AND cl_id is not null AND cl_id != '' AND business_id is not null AND business_id != '' AND business_id rlike '{}' ) AS t1 JOIN (select partition_date,active_type,first_channel_source_type,device_id from online.ml_device_day_active_status where partition_date BETWEEN '{}' AND '{}' AND active_type IN ('1', '2', '4') AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') AND first_channel_source_type not LIKE 'promotion\\_jf\\_%') as t2 ON t1.cl_id = t2.device_id AND t1.partition_date = t2.partition_date LEFT JOIN ( SELECT DISTINCT device_id FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) WHERE partition_day='{}' UNION ALL SELECT DISTINCT device_id FROM dim.dim_device_user_staff --去除内网用户 )spam_pv on spam_pv.device_id=t1.cl_id LEFT JOIN ( SELECT partition_date,device_id FROM (--找出user_id当天活跃的第一个设备id SELECT user_id,partition_date, if(size(device_list) > 0, device_list [ 0 ], '') AS device_id FROM online.ml_user_updates WHERE partition_date>='{}' AND partition_date<'{}' )t1 JOIN ( --医生账号 SELECT distinct user_id FROM online.tl_hdfs_doctor_view WHERE partition_date = '{}' --马甲账号/模特用户 UNION ALL SELECT user_id FROM ml.ml_c_ct_ui_user_dimen_d WHERE partition_day = '{}' AND (is_puppet = 'true' or is_classifyuser = 'true') UNION ALL --公司内网覆盖用户 select distinct user_id from dim.dim_device_user_staff UNION ALL --登陆过医生设备 SELECT distinct t1.user_id FROM ( SELECT user_id, v.device_id as device_id FROM online.ml_user_history_detail LATERAL VIEW EXPLODE(device_history_list) v AS device_id WHERE partition_date = '{}' )t1 JOIN ( SELECT device_id FROM online.ml_device_history_detail WHERE partition_date = '{}' AND is_login_doctor = '1' )t2 ON t1.device_id = t2.device_id )t2 on t1.user_id=t2.user_id group by partition_date,device_id )dev on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') and (dev.device_id is null or dev.device_id ='') """.format(start, end, card_type, reg, start, end, end, start, end, end, end, end, end) return spark.sql(sql).toPandas() def get_exposure_data(spark, card_type, start, end): reg = r"""^\\d+$""" sql = """ SELECT DISTINCT t1.cl_id device_id, cast(card_id AS int) card_id, t1.partition_date FROM (select * from online.ml_community_precise_exposure_detail where cl_id IS NOT NULL AND card_id IS NOT NULL AND card_id rlike '{}' AND action='page_precise_exposure' AND card_content_type = '{}' AND is_exposure = 1 ) AS t1 LEFT JOIN online.ml_device_day_active_status AS t2 ON t1.cl_id = t2.device_id AND t1.partition_date = t2.partition_date LEFT JOIN ( SELECT DISTINCT device_id FROM ml.ml_d_ct_dv_devicespam_d --去除机构刷单设备,即作弊设备(浏览和曝光事件去除) WHERE partition_day='{}' UNION ALL SELECT DISTINCT device_id FROM dim.dim_device_user_staff --去除内网用户 )spam_pv on spam_pv.device_id=t1.cl_id LEFT JOIN ( SELECT partition_date,device_id FROM (--找出user_id当天活跃的第一个设备id SELECT user_id,partition_date, if(size(device_list) > 0, device_list [ 0 ], '') AS device_id FROM online.ml_user_updates WHERE partition_date>='{}' AND partition_date<'{}' )t1 JOIN ( --医生账号 SELECT distinct user_id FROM online.tl_hdfs_doctor_view WHERE partition_date = '{}' --马甲账号/模特用户 UNION ALL SELECT user_id FROM ml.ml_c_ct_ui_user_dimen_d WHERE partition_day = '{}' AND (is_puppet = 'true' or is_classifyuser = 'true') UNION ALL --公司内网覆盖用户 select distinct user_id from dim.dim_device_user_staff UNION ALL --登陆过医生设备 SELECT distinct t1.user_id FROM ( SELECT user_id, v.device_id as device_id FROM online.ml_user_history_detail LATERAL VIEW EXPLODE(device_history_list) v AS device_id WHERE partition_date = '{}' )t1 JOIN ( SELECT device_id FROM online.ml_device_history_detail WHERE partition_date = '{}' AND is_login_doctor = '1' )t2 ON t1.device_id = t2.device_id )t2 on t1.user_id=t2.user_id group by partition_date,device_id )dev on t1.partition_date=dev.partition_date and t1.cl_id=dev.device_id WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='') and (dev.device_id is null or dev.device_id ='') AND t2.partition_date BETWEEN '{}' AND '{}' AND t2.active_type IN ('1','2','4') AND t2.first_channel_source_type NOT IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' ,'promotion_shike','promotion_julang_jl03','promotion_zuimei') AND t2.first_channel_source_type not LIKE 'promotion\\_jf\\_%' """.format(reg, card_type, end, start, end, end, end, end, end, start, end) return spark.sql(sql).toPandas() def get_card_feature_df(spark, card_type, yesterday): content_tag3 = get_content_tag3(spark, card_type) content_tag3.createOrReplaceTempView("content_tag3") reg = r"""^\\d+$""" sql = """ SELECT CAST(card_id as INT) as card_id, partition_date, is_pure_author, is_have_pure_reply, is_have_reply, CAST(card_feature.content_level as FLOAT) as content_level, topic_seven_click_num, topic_thirty_click_num, topic_num, seven_transform_num, thirty_transform_num, favor_num, favor_pure_num, vote_num, vote_display_num, reply_num, reply_pure_num, one_click_num, three_click_num, seven_click_num, fifteen_click_num, thirty_click_num, sixty_click_num, ninety_click_num, history_click_num, one_precise_exposure_num, three_precise_exposure_num, seven_precise_exposure_num, fifteen_precise_exposure_num, thirty_precise_exposure_num, sixty_precise_exposure_num, ninety_precise_exposure_num, history_precise_exposure_num, one_vote_user_num, three_vote_user_num, seven_vote_user_num, fifteen_vote_user_num, thirty_vote_user_num, sixty_vote_user_num, ninety_vote_user_num, history_vote_user_num, one_reply_user_num, three_reply_user_num, seven_reply_user_num, fifteen_reply_user_num, thirty_reply_user_num, sixty_reply_user_num, ninety_reply_user_num, history_reply_user_num, one_browse_user_num, three_browse_user_num, seven_browse_user_num, fifteen_browse_user_num, thirty_browse_user_num, sixty_browse_user_num, ninety_browse_user_num, history_browse_user_num, one_reply_num, three_reply_num, seven_reply_num, fifteen_reply_num, thirty_reply_num, sixty_reply_num, ninety_reply_num, history_reply_num, one_ctr, three_ctr, seven_ctr, fifteen_ctr, thirty_ctr, sixty_ctr, ninety_ctr, history_ctr, one_vote_pure_rate, three_vote_pure_rate, seven_vote_pure_rate, fifteen_vote_pure_rate, thirty_vote_pure_rate, sixty_vote_pure_rate, ninety_vote_pure_rate, history_vote_pure_rate, one_reply_pure_rate, three_reply_pure_rate, seven_reply_pure_rate, fifteen_reply_pure_rate, thirty_reply_pure_rate, sixty_reply_pure_rate, ninety_reply_pure_rate, history_reply_pure_rate, IFNULL(content_tag3.first_demands, "") AS card_first_demands, IFNULL(content_tag3.second_demands, "") AS card_second_demands, IFNULL(content_tag3.first_solutions, "") AS card_first_solutions, IFNULL(content_tag3.second_solutions, "") AS card_second_solutions, IFNULL(content_tag3.first_positions, "") AS card_first_positions, IFNULL(content_tag3.second_positions, "") AS card_second_positions, IFNULL(content_tag3.project_tags, "") AS card_projects FROM online.al_community_forecast_character_day_v3 card_feature JOIN content_tag3 ON card_feature.card_id = content_tag3.id where partition_date = '{}' and card_content_type = '{}' and card_id rlike '{}' """.format(yesterday, card_type, reg) return spark.sql(sql).toPandas()