package com.gmei import java.io.Serializable import java.text.SimpleDateFormat import breeze.linalg.split import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.{Row, SaveMode, SparkSession} //import org.apache.spark.sql.{Row, SaveMode, SparkSession, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import com.github.nscala_time.time.Imports._ import java.text.SimpleDateFormat import java.util.Date import scala.util.parsing.json.JSON object temp_count { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date //println(param.date) val partition_date = stat_date.replace("-","") val decive_id_oldUser = sc.sql( s""" |select distinct(device_id) as device_id |from online.ml_device_day_active_status |where active_type = '4' |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and partition_date ='${partition_date}' """.stripMargin ) decive_id_oldUser.createOrReplaceTempView("device_id_old") val clk_count_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(jd.cid_id) as clk_count_oldUser |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id not in (select device_id from blacklist) |and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id not in (select device_id from blacklist) |and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and je.stat_date ='${stat_date}' """.stripMargin ) val clk_count_all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_all |from data_feed_click |where (cid_type = 'diary' or cid_type = 'diary_video') |and device_id not in (select device_id from blacklist) |and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and stat_date ='${stat_date}' """.stripMargin ) val imp_count__all = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_all |from data_feed_exposure |where cid_type = 'diary' |and device_id not in (select device_id from blacklist) |and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and stat_date ='${stat_date}' """.stripMargin ) val clk_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast |from data_feed_click jd inner join device_id_old |on jd.device_id = device_id_old.device_id |where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video') |and jd.device_id regexp'1$$' |and jd.device_id not in (select device_id from blacklist) |and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and jd.stat_date ='${stat_date}' """.stripMargin ) val imp_count_oldUser_Contrast = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast |from data_feed_exposure je inner join device_id_old |on je.device_id = device_id_old.device_id |where je.cid_type = 'diary' |and je.device_id regexp'1$$' |and je.device_id not in (select device_id from blacklist) |and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan") |and je.stat_date ='${stat_date}' """.stripMargin ) val result1 = clk_count_oldUser.join(imp_count_oldUser,"stat_date") .join(clk_count_all,"stat_date") .join(imp_count__all,"stat_date") .join(clk_count_oldUser_Contrast,"stat_date") .join(imp_count_oldUser_Contrast,"stat_date") result1.show() // GmeiConfig.writeToJDBCTable(result1, "ffm_diary_ctr", SaveMode.Append) // GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="ffm_diary_ctr",SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",result1, table="ffm_diary_ctr",SaveMode.Append) // println("写入完成") GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="ffm_diary_ctr",SaveMode.Append) println("写入完成") } } } object Repeated_content_recommendation { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") // val stat_date = GmeiConfig.getMinusNDate(1) val stat_date = param.date val partition_date = stat_date.replace("-","") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |select distinct(device_id) |from blacklist """.stripMargin ) agency_id.createOrReplaceTempView("agency_id") val device_id_oldUser = sc.sql( s""" |select distinct(om.device_id) as device_id |from online.ml_device_day_active_status om left join agency_id |on om.device_id = agency_id.device_id |where om.active_type = '4' |and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and om.partition_date ='${partition_date}' |and agency_id.device_id is null """.stripMargin ) device_id_oldUser.createOrReplaceTempView("device_id_old") device_id_oldUser.show() val device_id_newUser = sc.sql( s""" |select distinct(om.device_id) as device_id |from online.ml_device_day_active_status om left join agency_id |on om.device_id = agency_id.device_id |where (om.active_type = '1' or om.active_type='2') |and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and om.partition_date ='${partition_date}' |and agency_id.device_id is null """.stripMargin ) device_id_newUser.createOrReplaceTempView("device_id_new") device_id_newUser.show() val exp_diary_new = sc.sql( s""" |select concat_ws('|',de.device_id,de.cid_id) |from data_feed_exposure de inner join device_id_new |on de.device_id=device_id_new.device_id |where de.cid_type = 'diary' |and de.stat_date ='${stat_date}' """.stripMargin ) val get_result_new =exp_diary_new.rdd.map((_, 1)).reduceByKey(_ + _) .sortBy(_._2,false) val more_than2_new=get_result_new.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y) println(more_than2_new) val all_new =get_result_new.map(_._2).reduce((x,y)=>x+y) println(all_new) val repeated_rate_new= more_than2_new / all_new.toDouble println(repeated_rate_new) val exp_diary_old = sc.sql( s""" |select concat_ws('|',de.device_id,de.cid_id) |from data_feed_exposure de inner join device_id_old |on de.device_id=device_id_old.device_id |where de.cid_type = 'diary' |and de.stat_date ='${stat_date}' """.stripMargin ) val get_result_old =exp_diary_old.rdd.map((_, 1)).reduceByKey(_ + _) .sortBy(_._2,false) val more_than2_old=get_result_old.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y) println(more_than2_old) val all_old =get_result_old.map(_._2).reduce((x,y)=>x+y) println(all_old) val repeated_rate_old= more_than2_old / all_old.toDouble println(repeated_rate_old) val result2=List((stat_date,more_than2_old,all_old,more_than2_new,all_new)) val df2 = sc.createDataFrame(result2).toDF("stat_date","old_rep_count","old_imp_all","new_rep_count","new_imp_all") // GmeiConfig.writeToJDBCTable(df2, table = "Repeated_evaluation_indicator", SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",df2, table="Repeated_evaluation_indicator",SaveMode.Append) // println("写入完成") GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df2, table="Repeated_evaluation_indicator",SaveMode.Append) println("写入完成") // val exp_diary_old = sc.sql( // s""" // |select concat_ws('|',de.device_id,de.cid_id) // |from data_feed_exposure de inner join device_id_old // |where de.cid_type = 'diary' // |and de.stat_date ='${stat_date}' // """.stripMargin // ) // val get_result_old =exp_diary_old.rdd.map((_, 1)).reduceByKey(_ + _) // .sortBy(_._2,false) // // val more_than2_old=get_result_old.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y) // println(more_than2_old) // val all_old =get_result_old.map(_._2).reduce((x,y)=>x+y) // println(all_old) // val repeated_rate_old= more_than2_old / all_old.toDouble // println(repeated_rate_old) // // // val result2=List((stat_date,more_than2_old,all_old)) // val df2 = sc.createDataFrame(result2).toDF("stat_date","old_rep_count","old_imp_all") // // GmeiConfig.writeToJDBCTable(df2, table = "Repeated_evaluation_indicator_old", SaveMode.Append) // val temp=get_result.collect() // for (i <- 0 until 30 ) { // println(temp(i)) // } } } } object Repeated_content_recommendation_moreday { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = "2019-01-16" // val partition_date = stat_date.replace("-","") val now= new Date() // val stat_date=param.date val dateFormat = new SimpleDateFormat("yyyy-MM-dd") val date = dateFormat.format(now.getTime - 86400000L * 8) val yesterday=dateFormat.format(now.getTime- 86400000L) val exp_diary = sc.sql( s""" |select stat_date,device_id,concat_ws(',',collect_set(distinct cid_id)) as expoure_diary |from data_feed_exposure_precise |where cid_type = 'diary' |and stat_date >='${date}' |and device_id not in (select device_id from blacklist) |group by device_id,stat_date """.stripMargin ).rdd.map(row=>(row(0).toString,row(1).toString,row(2).toString)).map(row=>(row._2,row._3)).groupByKey() .filter(x => x._2.size >1) //打印结果 // val temp=exp_diary.take(10).foreach(println) // val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray) // .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2)) //统计每个用户重复日记个数 val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray) .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2)).collect() val fenmu = count_imp.map(x => x._1).reduce((x,y) => x+y) val fenzi = count_imp.map(x => x._2).reduce((x,y) => x+y) val repeated_rate= fenmu / fenzi.toDouble val result=List((yesterday,repeated_rate)) println(result) val df_result = sc.createDataFrame(result) // GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append) // GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="Repeated_content_recommendation_moreday",SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",df_result, table="Repeated_content_recommendation_moreday",SaveMode.Append) // println("写入完成") GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="Repeated_content_recommendation_moreday",SaveMode.Append) println("写入完成") // exp_diary.show() // exp_diary.createOrReplaceTempView("exp_diary") // GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append) } } } object GetHiveSearchData { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable case class GetHiveSearchData_temp(stat_date:String,diaryExposureVal:String,diaryClickNum:String,meigouExposureVal:String,meigouClickNum:String,searchResultExposureVal:String,searchResultClickNum:String,searchDoctorExposureVal:String,searchDoctorClickNum:String,searchHospitalExposureVal:String,searchHospitalClickNum:String) val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "GetHiveSearchData_CTR") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") val stat_date = GmeiConfig.getMinusNDate(1) // val stat_date = param.date val partition_date = stat_date.replace("-","") val strDiaryExposureAction = "/api/search/v2/diary" val strDiaryClickAction = "search_result_more_diary_click_item" //需要确认 var (diaryExposureVal,diaryClickNum,diaryExposureMapCount,diaryExposureFilterCount) = GetSearchResultData(sc,strDiaryExposureAction,strDiaryClickAction,stat_date) val strMeigouExposureAction = "/api/search/v2/service" val strMeigouClickAction = "search_result_welfare_click_item" var (meigouExposureVal,meigouClickNum,meigouExposureMapCount,meigouExposureFilterCount) = GetSearchResultData(sc,strMeigouExposureAction,strMeigouClickAction,stat_date) val strSearchResultExposureAction = "/api/search/v2/content" val strSearchResultClickAction = "search_result_click_diary_item" //需要确认 var (searchResultExposureVal,searchResultClickNum,searchResultExposureMapCount,searchResultExposureFilterCount) = GetSearchResultData(sc,strSearchResultExposureAction,strSearchResultClickAction,stat_date) val strSearchDoctorExposureAction = "/api/search/v2/doctor" val strSearchDoctorClickAction = "search_result_doctor_click_item" var (searchDoctorExposureVal,searchDoctorClickNum,searchDoctorExposureMapCount,searchDoctorExposureFilterCount) = GetSearchResultData(sc,strSearchDoctorExposureAction,strSearchDoctorClickAction,stat_date) val strSearchHospitalExposureAction = "/api/search/v2/hospital" val strSearchHospitalClickAction = "search_result_hospital_click_item" var (searchHospitalExposureVal,searchHospitalClickNum,searchHospitalExposureMapCount,searchHospitalExposureFilterCount) = GetSearchResultData(sc,strSearchHospitalExposureAction,strSearchHospitalClickAction,stat_date) val jigou_id = sc.sql( s""" |SELECT cl_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date>='20180402' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |SELECT cl_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date>='20171101' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |select device_id as cl_id from blacklist """.stripMargin ) jigou_id.createOrReplaceTempView("jigou_id") val diary_clickSql = sc.sql( s""" |select |count(1) click_num |from online.tl_hdfs_maidian_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.partition_date='${partition_date}' |and ov.action='on_click_diary_card' |and ov.params['page_name']='search_result_diary' |and jigou_id.cl_id is null """.stripMargin ) val diary_clickArray = diary_clickSql.collect() val diary_click_num = diary_clickArray(0).getAs[Long]("click_num") val content_diary_clickSql = sc.sql( s""" |select |count(1) click_num |from online.tl_hdfs_maidian_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.partition_date='${partition_date}' |and ov.action='on_click_diary_card' |and ov.params['page_name']='search_result_more' |and jigou_id.cl_id is null """.stripMargin ) val content_diary_clickArray:Array[Row] = content_diary_clickSql.collect() val content_diary_click_num:Long = content_diary_clickArray(0).getAs[Long]("click_num") println("searchDiaryExposureVal:" + diaryExposureVal + "\tsearchDiaryClickNum:" + diary_click_num + "\tclickRate:" + (diary_click_num.floatValue()/diaryExposureVal.floatValue()).formatted("%.2f")) println("searchMeigouExposureVal:" + meigouExposureVal + "\tsearchMeigouClickNum:" + meigouClickNum + "\tclickRate:" + (meigouClickNum.floatValue()/meigouExposureVal.floatValue()).formatted("%.2f")) println("searchResultExposureVal:" + searchResultExposureVal + "\tsearchResultClickNum:" + (searchResultClickNum+content_diary_click_num) + "\tclickRate:" + ((searchResultClickNum+content_diary_click_num).floatValue()/searchResultExposureVal.floatValue()).formatted("%.2f")) println("searchDoctorExposureVal:" + searchDoctorExposureVal + "\tsearchDoctorClickNum:" + searchDoctorClickNum + "\tclickRate:" + (searchDoctorClickNum.floatValue()/searchDoctorExposureVal.floatValue()).formatted("%.2f")) println("searchHospitalExposureVal:" + searchHospitalExposureVal + "\tsearchHospitalClickNum:" + searchHospitalClickNum + "\tclickRate:" + (searchHospitalClickNum.floatValue()/searchHospitalExposureVal.floatValue()).formatted("%.2f")) // val add_data = sc.sql( // s""" // |insert into table GetHiveSearchData_CTR(stat_date,diaryExposureVal,diaryClickNum,meigouExposureVal,meigouClickNum,searchResultExposureVal,searchResultClickNum,searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum) // |values ('${stat_date}','${diaryExposureVal}','${(diary_click_num+diaryClickNum)}','${meigouExposureVal}','${meigouClickNum}','${searchResultExposureVal}','${(searchResultClickNum+content_diary_click_num)}','${searchDoctorExposureVal}','${searchDoctorClickNum}','${searchHospitalExposureVal}','${searchHospitalClickNum}') // """.stripMargin // ) import sc.implicits._ val result=List((stat_date,diaryExposureVal,(diary_click_num+diaryClickNum),meigouExposureVal,meigouClickNum,searchResultExposureVal,(searchResultClickNum+content_diary_click_num),searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum)) // val df_result = sc.createDataFrame(result).map(x=>GetHiveSearchData_temp(x(0).toString,x(1).toString,x(2).toString,x(3).toString,x(4).toString,x(5).toString,x(6).toString,x(7).toString,x(8).toString,x(9).toString,x(10).toString)).toDF() val df_result = sc.sparkContext.parallelize(result).map(x=>(x._1,x._2,x._3,x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11)).toDF("stat_date","diaryExposureVal","diaryClickNum","meigouExposureVal","meigouClickNum","searchResultExposureVal","searchResultClickNum","searchDoctorExposureVal", "searchDoctorClickNum","searchHospitalExposureVal","searchHospitalClickNum") // // GmeiConfig.writeToJDBCTable(df_result, table = "GetHiveSearchData_CTR", SaveMode.Append) // GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="GetHiveSearchData_CTR",SaveMode.Append) println("开始写入") // GmeiConfig.writeToJDBCTable("jerry.jdbcuri",df_result, table="GetHiveSearchData_CTR",SaveMode.Append) // println("写入完成") GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="GetHiveSearchData_CTR",SaveMode.Append) println("写入完成") } def GetSearchResultData(spark: SparkSession, strExposureAction:String, strClickAction:String,stat_date:String) = { val partition_date = stat_date.replace("-","") val exposureAccum = spark.sparkContext.longAccumulator("search exposure data") val jigou_id = spark.sql( s""" |SELECT cl_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date>='20180402' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |SELECT cl_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date>='20171101' AND partition_date<'${partition_date}' |AND pv_ratio>=0.95 |UNION ALL |select device_id as cl_id from blacklist """.stripMargin ) jigou_id.createOrReplaceTempView("jigou_id") val exposureSql = spark.sql( s""" |select action,user_id,city_id,app |from online.tl_hdfs_backend_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.action='$strExposureAction' |and ov.partition_date='${partition_date}' |and jigou_id.cl_id is null """.stripMargin ) val exposureMapResult = exposureSql.rdd.map(row => { //val jsonObj = JSON.parseFull(row.getAs[Map[String,Any]]("app").toString()) val rowAppFieldMap:Map[String,Any] = row.getAs[Map[String,Any]]("app") if (rowAppFieldMap.nonEmpty) { // jsonMap:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String,Any]] if (rowAppFieldMap.contains("exposure_data")){ val exposure_data_lists:List[Any] = JSON.parseFull(rowAppFieldMap("exposure_data").toString).get.asInstanceOf[List[Any]] if (exposure_data_lists.length > 0){ exposure_data_lists.foreach(exposure_item=>{ if (exposure_item!=None && exposure_item.toString.nonEmpty){ val exposureItemMap:Map[String,Any] = exposure_item.asInstanceOf[Map[String,Any]] if (exposureItemMap.contains("list_ids")){ //val exposure_list_ids:List[Any] = exposureItemMap.get("list_ids").get.asInstanceOf[List[Any]] val exposure_list_ids:List[Any] = exposureItemMap("list_ids").asInstanceOf[List[Any]] exposureAccum.add(exposure_list_ids.length) } }else{ None } }) exposure_data_lists }else{ None } } }else{ None } }) //must add cache exposureMapResult.cache() val exposureFilterResult = exposureMapResult.filter(_.!=(None)) //val exposureArray:Array[Any] = exposureFilterResult.collect() //exposureArray.foreach(item => println(item.toString)) val exposureMapCount:Long = exposureMapResult.count() val exposureFilterCount:Long = exposureFilterResult.count() val clickSql = spark.sql( s""" |select |count(1) click_num |from online.tl_hdfs_maidian_view ov left join jigou_id |on ov.cl_id = jigou_id.cl_id |where ov.partition_date='${partition_date}' |and ov.action='$strClickAction' |and jigou_id.cl_id is null """.stripMargin ) val clickArray:Array[Row] = clickSql.collect() val click_num:Long = clickArray(0).getAs[Long]("click_num") (exposureAccum.value,click_num,exposureMapCount,exposureFilterCount) } // GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append) // exp_diary.show() // exp_diary.createOrReplaceTempView("exp_diary") // GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append) } } object find_reason { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 // val ti = new TiContext(sc) sc.sql("use jerry_prod") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") // ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise") // val stat_date = GmeiConfig.getMinusNDate(1) val stat_date=param.date val partition_date = stat_date.replace("-","") //机构id val blacklist = sc.sql( s""" |select device_id from blacklist """.stripMargin ) blacklist.createOrReplaceTempView("blacklist") val agency_id = sc.sql( s""" |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_day |WHERE partition_date >= '20180402' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 |UNION ALL |SELECT DISTINCT(cl_id) as device_id |FROM online.ml_hospital_spam_pv_month |WHERE partition_date >= '20171101' |AND partition_date <= '${partition_date}' |AND pv_ratio >= 0.95 """.stripMargin ) // agency_id.show() agency_id.createOrReplaceTempView("agency_id") //每日新用户 val device_id_newUser = sc.sql( s""" |select distinct(os.device_id) as device_id |from online.ml_device_day_active_status os left join blacklist |on os.device_id = blacklist.device_id |where (os.active_type = '1' or os.active_type='2') |and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and os.partition_date ='${partition_date}' |and blacklist.device_id is null """.stripMargin ) // device_id_newUser.show() device_id_newUser.createOrReplaceTempView("device_id_new") //每日老用户 val device_id_oldUser = sc.sql( s""" |select distinct(os.device_id) as device_id |from online.ml_device_day_active_status os left join blacklist |on os.device_id=blacklist.device_id |where os.active_type = '4' |and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100' | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ' | ,'promotion_shike','promotion_julang_jl03','','unknown','promotion_zuimei') |and os.partition_date ='${partition_date}' |and blacklist.device_id is null """.stripMargin ) // device_id_oldUser.show() device_id_oldUser.createOrReplaceTempView("device_id_old") val all_clk = sc.sql( s""" |select ov.cl_id as device_id |from online.tl_hdfs_maidian_view ov left join agency_id |on ov.cl_id = agency_id.device_id |where ov.action = 'on_click_diary_card' |and ov.cl_id != "NULL" |and ov.params['tab_name'] = '精选' |and ov.params['page_name'] = 'home' |and ov.partition_date='${partition_date}' |and agency_id.device_id is null """.stripMargin ) // all_clk.show() all_clk.createOrReplaceTempView("all_clk_diary_card") //1.当天老用户中的点击用户数 val old_clk_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as old_clk_count |from all_clk_diary_card oc inner join device_id_old |on oc.device_id = device_id_old.device_id """.stripMargin ) // old_clk_count.show() //1.1有点击的老用户 val old_clk_device = sc.sql( s""" |select distinct(oc.device_id) as device_id |from all_clk_diary_card oc inner join device_id_old |on oc.device_id = device_id_old.device_id """.stripMargin ) old_clk_device.createOrReplaceTempView("old_clk_device") //1.1无点击的老用户 val old_noclk_device = sc.sql( s""" |select device_id |from device_id_old |except |select device_id |from old_clk_device """.stripMargin ) old_noclk_device.show() //2.当天新用户中的点击用户数 val new_clk_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as new_clk_count |from all_clk_diary_card oc inner join device_id_new |on oc.device_id = device_id_new.device_id """.stripMargin ) //2.1 有点击的新用户 val new_clk_device = sc.sql( s""" |select distinct(oc.device_id) as device_id |from all_clk_diary_card oc inner join device_id_new |on oc.device_id = device_id_new.device_id """.stripMargin ) new_clk_device.createOrReplaceTempView("new_clk_device") //3.当天老用户数 val old_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(dio.device_id)) as old_count |from device_id_old dio left join agency_id |on dio.device_id = agency_id.device_id |where agency_id.device_id is null """.stripMargin ) //4.当天新用户数 val new_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(din.device_id)) as new_count |from device_id_new din left join agency_id |on din.device_id = agency_id.device_id |where agency_id.device_id is null """.stripMargin ) //5.有点击老用户的曝光数 val exp_clkold_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(dp.device_id) as imp_clkold_count |from data_feed_exposure_precise dp inner join old_clk_device |on dp.device_id = old_clk_device.device_id |where stat_date='${stat_date}' |group by stat_date """.stripMargin ) //6.有点击新用户的曝光数 val exp_clknew_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(dp.device_id) as imp_clknew_count |from data_feed_exposure_precise dp inner join new_clk_device |on dp.device_id = new_clk_device.device_id |where stat_date='${stat_date}' |group by stat_date """.stripMargin ) val result = old_clk_count.join(new_clk_count,"stat_date") .join(old_count,"stat_date") .join(new_count,"stat_date") .join(exp_clkold_count,"stat_date") .join(exp_clknew_count,"stat_date") // GmeiConfig.writeToJDBCTable(result, "device_clk_imp_reason", SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="device_clk_imp_reason",SaveMode.Append) println("写入完成") } } }