package com.gmei import java.io.Serializable import java.time.LocalDate import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import org.apache.spark.sql.functions.lit import scala.util.Try object EsmmData { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = GmeiConfig.getMinusNDate(1) ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String]("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data") val max_stat_date = sc.sql( s""" |select max(stat_date) from esmm_train_data """.stripMargin ) val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head println("max_stat_date_str",max_stat_date_str) println("param.date",param.date) if (max_stat_date_str != param.date){ val stat_date = param.date println(stat_date) // val imp_data = sc.sql( // s""" // |select distinct stat_date,device_id,city_id as ucity_id, // | cid_id,diary_service_id // |from data_feed_exposure // |where cid_type = 'diary' // |and stat_date ='${stat_date}' // """.stripMargin // ) val imp_data = sc.sql( s""" |select * from |(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id |from data_feed_exposure |where cid_type = 'diary' |and stat_date ='${stat_date}' |group by stat_date,device_id,city_id,cid_id,diary_service_id having count(*) > 1) a """.stripMargin ) // imp_data.show() // println("imp_data.count()") // println(imp_data.count()) val clk_data = sc.sql( s""" |select distinct stat_date,device_id,city_id as ucity_id, | cid_id,diary_service_id |from data_feed_click |where cid_type = 'diary' |and stat_date ='${stat_date}' """.stripMargin ) // clk_data.show() // println("clk_data.count()") // println(clk_data.count()) val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0)) // imp_data_filter.createOrReplaceTempView("imp_data_filter") // imp_data_filter.show() // println("imp_data_filter.count()") // println(imp_data_filter.count()) val stat_date_not = stat_date.replace("-","") val cvr_data = sc.sql( s""" |select distinct | from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, | cl_id as device_id,city_id as ucity_id, | params["referrer_id"] as cid_id,params["business_id"] as diary_service_id |from online.tl_hdfs_maidian_view |where action='page_view' |and partition_date ='${stat_date_not}' |and params['page_name'] = 'welfare_detail' |and params['referrer'] = 'diary_detail' """.stripMargin ) val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1)) // cvr_data_filter.createOrReplaceTempView("cvr_data_filter") // cvr_data_filter.show() // println("cvr_data_filter.count()") // println(cvr_data_filter.count()) val other_click = get_other_click(sc,stat_date_not) val all_click = clk_data.union(other_click) val clk_data_filter =all_click.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0)) // clk_data_filter.createOrReplaceTempView("clk_data_filter") // clk_data_filter.show() // println("clk_data_filter.count()") // println(clk_data_filter.count()) val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter) union_data.createOrReplaceTempView("union_data") // union_data.show() // println("union_data.count()") // println(union_data.count()) val union_data_clabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z, | c.level1_id as clevel1_id |from union_data a |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${stat_date_not}' |and c.partition_date='${stat_date_not}' """.stripMargin ) union_data_clabel.createOrReplaceTempView("union_data_clabel") // union_data_clabel.show() val union_data_slabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id, | c.level1_id as slevel1_id |from union_data_clabel a |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${stat_date_not}' |and c.partition_date='${stat_date_not}' """.stripMargin ) union_data_slabel.createOrReplaceTempView("union_data_slabel") // union_data_slabel.show() val union_data_ccity_name = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, | c.name as ccity_name |from union_data_slabel a |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join src_zhengxing_api_tag c on b.tag_id=c.id | where c.tag_type=4 """.stripMargin ) union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") // union_data_ccity_name.show() val union_data_scity_id = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, | d.city_id as scity_id |from union_data_ccity_name a |left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id |where b.partition_date='${stat_date_not}' |and c.partition_date='${stat_date_not}' |and d.partition_date='${stat_date_not}' """.stripMargin ) union_data_scity_id.createOrReplaceTempView("union_data_scity_id") union_data_scity_id.show() val union_data_scity_id2 = sc.sql( s""" |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(diary_service_id) diary_service_id,first(y) y, |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id |from union_data_scity_id |group by device_id,cid_id """.stripMargin ) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append) } else { println("esmm_train_data already have param.date data") } sc.stop() } } def get_other_click(spark:SparkSession,yesterday:String): DataFrame ={ var result01 = spark.sql( s""" |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, |device["device_id"] as device_id,channel as device_type, |city_id,params['business_id'] as cid |from online.tl_hdfs_maidian_view where partition_date = '$yesterday' |and action = 'on_click_diary_card' and params['tab_name'] != '精选' |and params['page_name'] = 'home' """.stripMargin ) // println(result01.count()) // result01.show(6) val recommend = spark.sql( s""" |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, |device["device_id"] as device_id,channel as device_type, |city_id,params["business_id"] as cid |from online.tl_hdfs_maidian_view where partition_date = '$yesterday' |and action = 'diarybook_detail_click_recommend_block' and params["business_type"] = "diary" """.stripMargin ) // println("详情页推荐日记:") // println(recommend.count()) // recommend.show(6) val search_zonghe = spark.sql( s""" |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, |device["device_id"] as device_id,channel as device_type,city_id,params["business_id"] as cid |from online.tl_hdfs_maidian_view where partition_date = '$yesterday' |and action = 'search_result_click_infomation_item' and params["business_type"] = "diary" """.stripMargin ) // println("搜索综合:") // println(search_zonghe.count()) // search_zonghe.show(6) val non_home = spark.sql( s""" |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, |device["device_id"] as device_id,channel as device_type,city_id,params["diary_id"] as cid |from online.tl_hdfs_maidian_view where partition_date = '$yesterday' |and action = 'on_click_diary_card' and params['page_name'] != 'home' """.stripMargin ) // println("non home:") // println(non_home.count()) // non_home.show(6) result01 = result01.union(recommend).union(search_zonghe).union(non_home) // println(result01.count()) result01.createOrReplaceTempView("temp_result") val result02 = spark.sql( s""" |select * from temp_result |where device_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5') | and device_id not in | (SELECT cl_id | FROM online.ml_hospital_spam_pv_day | WHERE partition_date>='20180402' AND partition_date<'${yesterday}' | AND pv_ratio>=0.95 | UNION ALL | SELECT cl_id | FROM online.ml_hospital_spam_pv_month | WHERE partition_date>='20171101' AND partition_date<'${yesterday}' | AND pv_ratio>=0.95 | ) """.stripMargin ) result02.createOrReplaceTempView("temp_result02") val result_dairy = spark.sql( s""" |select | re.stat_date as stat_date, | re.device_id as device_id, | re.city_id as ucity_id, | re.cid as cid_id, | da.service_id as diary_service_id |from temp_result02 re |left join online.ml_community_diary_updates da |on re.cid = da.diary_id |where da.partition_date='${yesterday}' """.stripMargin ).distinct() result_dairy } } object EsmmPredData { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix") ti.tidbMapTable("eagle","ffm_diary_queue") ti.tidbMapTable("eagle","search_queue") ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data") ti.tidbMapTable("eagle","biz_feed_diary_queue") ti.tidbMapTable("jerry_prod","data_feed_exposure_precise") import sc.implicits._ val yesteday_have_seq = GmeiConfig.getMinusNDate(1) //nearby_data val raw_data = sc.sql( s""" |select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from |(select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix |union |select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue |union |select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 |where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}') """.stripMargin ) raw_data.show() val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map { case (device_city, cid_data) => val device_id = Try(device_city.split(",")(0)).getOrElse("") val city_id = Try(device_city.split(",")(1)).getOrElse("") val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("") (device_id,city_id ,s"$cids") }.filter(_._3!="").toDF("device_id","city_id","merge_queue") println("nearby_device_count",raw_data1.count()) val start= LocalDate.now().minusDays(14).toString import sc.implicits._ val sql = s""" |select distinct device_id,cid_id from data_feed_exposure_precise |where stat_date >= "$start" and cid_type = "diary" """.stripMargin val history = sc.sql(sql).repartition(200).rdd .map(x =>(x(0).toString,x(1).toString)).groupByKey().map(x => (x._1,x._2.mkString(","))) .toDF("device_id","cid_set") history.persist() history.createOrReplaceTempView("history") if (history.take(1).nonEmpty){ raw_data1.createOrReplaceTempView("r") val sql_nearby_filter = s""" |select r.device_id,r.city_id,r.merge_queue,history.cid_set from r |left join history on r.device_id = history.device_id """.stripMargin val df = sc.sql(sql_nearby_filter).na.fill("").rdd .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString)) .map(x => (x._1,x._2,x._3.split(",").diff(x._4.split(",")).mkString(","))) .toDF("device_id","city_id","merge_queue") df.createOrReplaceTempView("raw_data1") }else{ raw_data1.createOrReplaceTempView("raw_data1") } val raw_data2 = sc.sql( s""" |select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1 """.stripMargin ).withColumn("label",lit(1)) raw_data2.createOrReplaceTempView("raw_data2") println("nearby_explode_count",raw_data2.count()) // native_data val native_data = sc.sql( s""" |select distinct a.device_id,a.city_id,b.native_queue from data_feed_exposure a |left join (select if(city_id='world','worldwide',city_id) city_id,native_queue from biz_feed_diary_queue) b |on a.city_id = b.city_id |where a.stat_date='${yesteday_have_seq}' and b.native_queue != "" """.stripMargin ) println("native_device_count",native_data.count()) if (history.take(1).nonEmpty){ native_data.createOrReplaceTempView("temp") val sql_native_filter = s""" |select t.device_id,t.city_id,t.native_queue,history.cid_set from temp t |left join history on t.device_id = history.device_id """.stripMargin val df = sc.sql(sql_native_filter).na.fill("").rdd .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString)) .map(x => (x._1,x._2,x._3.split(",").diff(x._4.split(",")).mkString(","))) .toDF("device_id","city_id","native_queue") df.createOrReplaceTempView("native_data") }else{ native_data.createOrReplaceTempView("native_data") } val native_data1 = sc.sql( s""" |select device_id,city_id as ucity_id,explode(split(native_queue,',')) as cid_id from native_data """.stripMargin ).withColumn("label",lit(0)) native_data1.createOrReplaceTempView("native_data1") println("native_explode_count",native_data1.count()) //union val union_data = sc.sql( s""" |select device_id,ucity_id,cid_id,label from native_data1 |union |select device_id,ucity_id,cid_id,label from raw_data2 """.stripMargin ) union_data.createOrReplaceTempView("raw_data") println("union_count",union_data.count()) //join feat val yesteday = GmeiConfig.getMinusNDate(1).replace("-","") val sid_data = sc.sql( s""" |select distinct | from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, | a.device_id,a.ucity_id,a.cid_id,a.label, b.service_id as diary_service_id |from raw_data a |left join online.ml_community_diary_updates b on a.cid_id = b.diary_id |where b.partition_date = '${yesteday}' """.stripMargin ) // sid_data.show() println(sid_data.count()) val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0)) sid_data_label.createOrReplaceTempView("union_data") val union_data_clabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z, | c.level1_id as clevel1_id |from union_data a |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' """.stripMargin ) union_data_clabel.createOrReplaceTempView("union_data_clabel") // union_data_clabel.show() val union_data_slabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id, | c.level1_id as slevel1_id |from union_data_clabel a |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' """.stripMargin ) union_data_slabel.createOrReplaceTempView("union_data_slabel") // union_data_slabel.show() val union_data_ccity_name = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, | c.name as ccity_name |from union_data_slabel a |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join src_zhengxing_api_tag c on b.tag_id=c.id | where c.tag_type=4 """.stripMargin ) union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") // union_data_ccity_name.show() val union_data_scity_id = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, | d.city_id as scity_id |from union_data_ccity_name a |left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' |and d.partition_date='${yesteday}' """.stripMargin ) union_data_scity_id.createOrReplaceTempView("union_data_scity_id") val union_data_scity_id2 = sc.sql( s""" |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y, |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id |from union_data_scity_id |group by device_id,cid_id,label """.stripMargin ) // union_data_scity_id.createOrReplaceTempView("union_data_scity_id") println(union_data_scity_id2.count()) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite) sc.stop() } } } object GetDiaryPortrait { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = GmeiConfig.getMinusNDate(1) ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String]("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") val stat_date = param.date.replace("-","") val diary_tag = sc.sql( s""" |select c.diary_id, | concat_ws(',',collect_set(cast(c.level1_id as string))) as level1_ids, | concat_ws(',',collect_set(cast(c.level2_id as string))) as level2_ids, | concat_ws(',',collect_set(cast(c.level3_id as string))) as level3_ids from | (select a.diary_id,b.level1_id,b.level2_id,b.level3_id | from online.tl_hdfs_diary_tags_view a | left join online.bl_tag_hierarchy_detail b | on a.tag_id = b.id | where a.partition_date = '${stat_date}' | and b.partition_date = '${stat_date}') c | group by c.diary_id """.stripMargin ) diary_tag.show() println(diary_tag.count()) val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" GmeiConfig.writeToJDBCTable(jdbc,diary_tag,"diary_feat",SaveMode.Overwrite) sc.stop() } } } object GetDevicePortrait { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = GmeiConfig.getMinusNDate(1) ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String]("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat") import sc.implicits._ val stat_date = param.date.replace("-","") val device_search_tag = sc.sql( s""" |select c.device_id,c.stat_date,c.level1_id,count(c.level1_id) as level1_count |from (select | a.cl_id as device_id,a.partition_date as stat_date, | COALESCE(a.params['diary_id'], a.params['business_id'], 0) as cid_id, | b.level1_ids as level1_id | from online.tl_hdfs_maidian_view a | left join diary_feat b | on COALESCE(a.params['diary_id'], a.params['business_id'], 0) = b.diary_id | where | b.level1_ids is not null and | a.partition_date = '${stat_date}' | and (a.action = 'on_click_diary_card' or (a.action="full_stack_click_video_card_full_screen_play" and a.params["card_type"]="diary"))) c |group by c.device_id,c.level1_id,c.stat_date """.stripMargin ) device_search_tag.show() println(device_search_tag.count()) device_search_tag.createOrReplaceTempView("tag_count") val max_count_tag = sc.sql( s""" |select a.device_id,a.stat_date,a.level1_id as max_level1_id,a.level1_count as max_level1_count |from tag_count a |inner join |(select device_id,max(level1_count) as max_count from tag_count group by device_id) b |on a.level1_count = b.max_count and a.device_id = b.device_id """.stripMargin ) // .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString)) // max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat) // // max_count_tag.take(10).foreach(println) // println(max_count_tag.count()) //drop duplicates val max_count_tag_rdd = max_count_tag.rdd.groupBy(_.getAs[String]("device_id")).map { case (device_id,data) => val stat_date = data.map(_.getAs[String]("stat_date")).head val max_level1_id = data.map(_.getAs[String]("max_level1_id")).head.toString val max_level1_count = data.map(_.getAs[Long]("max_level1_count")).head.toString (device_id,stat_date,max_level1_id,max_level1_count) }.filter(_._1!=null) max_count_tag_rdd.foreachPartition(GmeiConfig.updateDeviceFeat) max_count_tag_rdd.take(10).foreach(println) println(max_count_tag_rdd.count()) sc.stop() } } } object GetLevelCount { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", path: String = null ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String]("path") .text(s"the path you used") .action((x,c) => c.copy(path = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1).replace("-","") // val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",") val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580" val diary_level1 = sc.sql( s""" |select diary_id,explode(split(level1_ids,';')) level1_id from diary_feat |where diary_id in (${diary_queue}) """.stripMargin ) diary_level1.show() println(diary_level1.count()) //胸部日记id val cid_xiong = diary_level1.rdd.filter(_.getAs("level1_id")=="7") cid_xiong.collect().foreach(println) //计算各类别日记的数量 val level1_count = diary_level1.rdd.map(x => (x(1).toString)).map(level1 => (level1,1)).reduceByKey((a,b) => a+b).sortBy(_._2,false).toDF("level1_id","count") level1_count.show() level1_count.createOrReplaceTempView("tmp") val level1_name = sc.sql( s""" |select a.level1_id,a.count,b.level1_name from tmp a |left join (select distinct level1_id,level1_name from online.bl_tag_hierarchy_detail where partition_date = '${stat_date}') b |on a.level1_id = b.level1_id order by a.count desc """.stripMargin ) level1_name.show() sc.stop() } } } object GetDeviceDuration { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = GmeiConfig.getMinusNDate(1) ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String]("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat") import sc.implicits._ val stat_date = param.date val uid_duration = sc.sql( s""" |select a.device_id,coalesce(a.start_time,a.ndiary_in,0) in_time,coalesce(a.end_time,a.ndiary_out,0) out_time, |explode(split(b.level1_ids,';')) level1_id |from data_feed_click a |left join diary_feat b on a.cid_id = b.diary_id |where a.stat_date > '2018-12-12' """.stripMargin ) uid_duration.show() val uid_duration_last = sc.sql( s""" |select d.device_id, |sum(if(d.level1_id=0,sum_duration,0)) as kongbai, |sum(if(d.level1_id=1,sum_duration,0)) as yanbu, |sum(if(d.level1_id=10,sum_duration,0)) as simi, |sum(if(d.level1_id=1024,sum_duration,0)) as zitizhifang, |sum(if(d.level1_id=1080,sum_duration,0)) as banyongjiu, |sum(if(d.level1_id=11,sum_duration,0)) as yachi, |sum(if(d.level1_id=12,sum_duration,0)) as kouchun, |sum(if(d.level1_id=13,sum_duration,0)) as erbu, |sum(if(d.level1_id=2,sum_duration,0)) as bibu, |sum(if(d.level1_id=2053,sum_duration,0)) as remen, |sum(if(d.level1_id=2054,sum_duration,0)) as banyongjiuzhuang, |sum(if(d.level1_id=2212,sum_duration,0)) as jiankangguanli, |sum(if(d.level1_id=2214,sum_duration,0)) as qita, |sum(if(d.level1_id=3,sum_duration,0)) as lunkuo, |sum(if(d.level1_id=4,sum_duration,0)) as shoushen, |sum(if(d.level1_id=5,sum_duration,0)) as pifu, |sum(if(d.level1_id=6933,sum_duration,0)) as shenghuo, |sum(if(d.level1_id=7,sum_duration,0)) as xiongbu, |sum(if(d.level1_id=9,sum_duration,0)) as maofa, |sum(if(d.level1_id=922,sum_duration,0)) as kangshuai, |sum(if(d.level1_id=929,sum_duration,0)) as shili, |sum(if(d.level1_id=971,sum_duration,0)) as chanhouxiufu, |sum(if(d.level1_id=992,sum_duration,0)) as zhushe |from | (select c.device_id,c.level1_id,sum(c.duration) as sum_duration from | (select a.device_id, | coalesce(a.end_time,a.ndiary_out,0)-coalesce(a.start_time,a.ndiary_in,0) as duration, | explode(split(b.level1_ids,';')) level1_id | from data_feed_click a | left join diary_feat b on a.cid_id = b.diary_id where a.stat_date > '2018-12-12') c | group by c.device_id,c.level1_id) d |group by d.device_id """.stripMargin ) uid_duration_last.show() sc.stop() } } } object EsmmDataTest { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = GmeiConfig.getMinusNDate(1) ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String]("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure_precise") ti.tidbMapTable(dbName = "jerry_test", tableName = "train_data") click(sc) val max_stat_date = sc.sql( s""" |select max(stat_date) from train_data """.stripMargin ) val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head println("max_stat_date_str",max_stat_date_str) println("param.date",param.date) if (max_stat_date_str != param.date || max_stat_date_str == null){ val stat_date = param.date println(stat_date) // val imp_data = sc.sql( // s""" // |select distinct stat_date,device_id,city_id as ucity_id, // | cid_id,diary_service_id // |from data_feed_exposure // |where cid_type = 'diary' // |and stat_date ='${stat_date}' // """.stripMargin // ) val imp_data = sc.sql( s""" |select * from |(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id |from data_feed_exposure_precise |where cid_type = 'diary' |and stat_date ='${stat_date}' |group by stat_date,device_id,city_id,cid_id,diary_service_id) a """.stripMargin ) // imp_data.show() // println("imp_data.count()") // println(imp_data.count()) val clk_data = sc.sql( s""" |select distinct stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id |from esmm_click |where stat_date ='${stat_date}' """.stripMargin ) // clk_data.show() // println("clk_data.count()") // println(clk_data.count()) val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0)) // imp_data_filter.createOrReplaceTempView("imp_data_filter") // imp_data_filter.show() // println("imp_data_filter.count()") // println(imp_data_filter.count()) val stat_date_not = stat_date.replace("-","") val cvr_data = sc.sql( s""" |select distinct | from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, | cl_id as device_id,city_id as ucity_id, | params["referrer_id"] as cid_id,params["business_id"] as diary_service_id |from online.tl_hdfs_maidian_view |where action='page_view' |and partition_date ='${stat_date_not}' |and params['page_name'] = 'welfare_detail' |and params['referrer'] = 'diary_detail' """.stripMargin ) val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1)) // cvr_data_filter.createOrReplaceTempView("cvr_data_filter") // cvr_data_filter.show() // println("cvr_data_filter.count()") // println(cvr_data_filter.count()) val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0)) // clk_data_filter.createOrReplaceTempView("clk_data_filter") // clk_data_filter.show() // println("clk_data_filter.count()") // println(clk_data_filter.count()) val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter) union_data.createOrReplaceTempView("union_data") // union_data.show() // println("union_data.count()") // println(union_data.count()) val union_data_clabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z, | c.level1_id as clevel1_id |from union_data a |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${stat_date_not}' |and c.partition_date='${stat_date_not}' """.stripMargin ) union_data_clabel.createOrReplaceTempView("union_data_clabel") // union_data_clabel.show() val union_data_slabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id, | c.level1_id as slevel1_id |from union_data_clabel a |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${stat_date_not}' |and c.partition_date='${stat_date_not}' """.stripMargin ) union_data_slabel.createOrReplaceTempView("union_data_slabel") // union_data_slabel.show() val union_data_ccity_name = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, | c.name as ccity_name |from union_data_slabel a |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join src_zhengxing_api_tag c on b.tag_id=c.id | where c.tag_type=4 """.stripMargin ) union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") // union_data_ccity_name.show() val union_data_scity_id = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, | d.city_id as scity_id |from union_data_ccity_name a |left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id |where b.partition_date='${stat_date_not}' |and c.partition_date='${stat_date_not}' |and d.partition_date='${stat_date_not}' """.stripMargin ) union_data_scity_id.createOrReplaceTempView("union_data_scity_id") union_data_scity_id.show() val union_data_scity_id2 = sc.sql( s""" |select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,first(diary_service_id) diary_service_id,first(y) y, |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,first(scity_id) scity_id |from union_data_scity_id |group by device_id,cid_id """.stripMargin ) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="train_data",SaveMode.Append) } else { println("train_data already have param.date data") } sc.stop() } } def click(spark:SparkSession): Unit ={ val yesterday = LocalDate.now().minusDays(1).toString.replace("-","") println(yesterday) val stat_yesterday = LocalDate.now().minusDays(1).toString val max_stat_date = spark.sql( s""" |select max(stat_date) from esmm_click """.stripMargin ) val max = max_stat_date.collect().map(s => s(0).toString).head println("max_stat_date",max) if (max != stat_yesterday || max == null){ val result01 = spark.sql( s""" |select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, |device["device_id"] as device_id,channel as device_type, |city_id,params['diary_id'] as cid |from online.tl_hdfs_maidian_view where partition_date = '$yesterday' |and action = 'on_click_diary_card' and params['tab_name'] = '精选' |and params['page_name'] = 'home' """.stripMargin ) result01.createOrReplaceTempView("temp_result") val result02 = spark.sql( s""" |select * from temp_result |where device_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3' | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang' | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1' | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4' | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5') | and device_id not in | (SELECT cl_id | FROM online.ml_hospital_spam_pv_day | WHERE partition_date>='20180402' AND partition_date<'${yesterday}' | AND pv_ratio>=0.95 | UNION ALL | SELECT cl_id | FROM online.ml_hospital_spam_pv_month | WHERE partition_date>='20171101' AND partition_date<'${yesterday}' | AND pv_ratio>=0.95 | ) """.stripMargin ) result02.createOrReplaceTempView("temp_result02") val result_dairy = spark.sql( s""" |select | re.stat_date as stat_date, | re.device_id as device_id, | re.device_type as device_type, | re.cid as cid_id, | re.city_id as city_id, | da.service_id as diary_service_id |from temp_result02 re |left join online.ml_community_diary_updates da |on re.cid = da.diary_id |where da.partition_date='${yesterday}' """.stripMargin ) val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" GmeiConfig.writeToJDBCTable(jdbcuri,result_dairy, table="esmm_click",SaveMode.Append) println("data insert") }else{ println("data already exists") } } }