package com.gmei import java.io.Serializable import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams object find_bug { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date //println(param.date) val partition_date = stat_date.replace("-","") val decive_id_oldUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type = '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and partition_date ='${partition_date}' """.stripMargin ) decive_id_oldUser.createOrReplaceTempView("device_id_old") val clk_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_oldUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_all |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_all |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val result1 = clk_count_oldUser_Contrast.join(imp_count_oldUser_Contrast,"stat_date") .join(clk_count_oldUser_all,"stat_date") .join(imp_count_oldUser_all,"stat_date") result1.show() GmeiConfig.writeToJDBCTable(result1, "bug_Recommendation_strategy_temp", SaveMode.Append) //device_id尾号1有点击用户日记本点击数 val clk_active_1 = sc.sql( s""" |select '${stat_date}' as stat_date, count(jd.cid_id) as clk_active_1 |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) //device_id尾号1有点击用户日记本曝光数 val imp_active_1 = sc.sql( s""" |select '${stat_date}' as stat_date, count(je.cid_id) as imp_active_1 |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id in (select distinct(device_id) from data_feed_click where device_id regexp '1$$' and stat_date = '${stat_date}') |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) //device_id尾号1点击日记本用户数 val clk_diary_device = sc.sql( s""" |select '${stat_date}' as stat_date, count(distinct(jd.device_id)) as clk_diary_device |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) //所有有点击用户日记本点击数 val clk_active_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(jd.cid_id) as clk_active_all |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) //所有有点击用户日记本曝光数 val imp_active_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(je.cid_id) as imp_active_all |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id in (select distinct(device_id) from data_feed_click where stat_date = '${stat_date}') |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) //策略命中用户点击日记本用户数 val clk_diary_device_cover = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(device_id)) as clk_diary_device_cover |from merge_queue_table |where device_id in (select distinct(device_id) from data_feed_click where stat_date = '${stat_date}') """.stripMargin ) //策略命中用户总数 val device_all_cover = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(device_id)) as device_all_cover |from merge_queue_table """.stripMargin ) val result2 = clk_active_1.join(imp_active_1,"stat_date") .join(clk_active_all,"stat_date") .join(imp_active_all,"stat_date") .join(clk_diary_device,"stat_date") .join(clk_diary_device_cover,"stat_date") .join(device_all_cover,"stat_date") result2.show() GmeiConfig.writeToJDBCTable(result2, "bug_strategy_other", SaveMode.Append) //统计新用户点击率 val devicee_id_newUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type != '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and partition_date ='${partition_date}' """.stripMargin ) devicee_id_newUser.show() devicee_id_newUser.createOrReplaceTempView("device_id_new") val clk_count_newUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_newUser_Contrast |from data_feed_click jd inner join device_id_new |on jd.device_id = device_id_new.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_newUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast |from data_feed_exposure je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_newUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_newUser_all |from data_feed_click jd inner join device_id_new |on jd.device_id = device_id_new.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_newUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_all |from data_feed_exposure je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val result3 = clk_count_newUser_Contrast.join(imp_count_newUser_Contrast,"stat_date") .join(clk_count_newUser_all,"stat_date") .join(imp_count_newUser_all,"stat_date") result3.show() GmeiConfig.writeToJDBCTable(result3, "bug_Recommendation_strategy_newUser", SaveMode.Append) } } } object CTR_precise { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date //println(param.date) val partition_date = stat_date.replace("-","") val decive_id_oldUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type = '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and partition_date ='${partition_date}' """.stripMargin ) decive_id_oldUser.createOrReplaceTempView("device_id_old") val clk_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast |from data_feed_exposure_precise je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_oldUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_all |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_all |from data_feed_exposure_precise je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val result1 = clk_count_oldUser_Contrast.join(imp_count_oldUser_Contrast,"stat_date") .join(clk_count_oldUser_all,"stat_date") .join(imp_count_oldUser_all,"stat_date") result1.show() GmeiConfig.writeToJDBCTable(result1, "bug_precise_Recommendation_strategy_temp", SaveMode.Append) //统计新用户点击率 val devicee_id_newUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type != '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown') |and partition_date ='${partition_date}' """.stripMargin ) devicee_id_newUser.show() devicee_id_newUser.createOrReplaceTempView("device_id_new") val clk_count_newUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_newUser_Contrast |from data_feed_click jd inner join device_id_new |on jd.device_id = device_id_new.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_newUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast |from data_feed_exposure_precise je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_newUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_newUser_all |from data_feed_click jd inner join device_id_new |on jd.device_id = device_id_new.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_newUser_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_newUser_all |from data_feed_exposure_precise je inner join device_id_new |on je.device_id = device_id_new.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.stat_date ='${stat_date}' """.stripMargin ) val result3 = clk_count_newUser_Contrast.join(imp_count_newUser_Contrast,"stat_date") .join(clk_count_newUser_all,"stat_date") .join(imp_count_newUser_all,"stat_date") result3.show() GmeiConfig.writeToJDBCTable(result3, "bug_precise_Recommendation_strategy_newUser", SaveMode.Append) } } }