Commit 5be2e09d authored by 王志伟's avatar 王志伟
parents fe05e120 c684c4c0
...@@ -31,8 +31,8 @@ object EsmmData { ...@@ -31,8 +31,8 @@ object EsmmData {
.text(s"the databases environment you used") .text(s"the databases environment you used")
.action((x, c) => c.copy(env = x)) .action((x, c) => c.copy(env = x))
opt[String]("date") opt[String]("date")
.text(s"the date you used") .text(s"the date you used")
.action((x,c) => c.copy(date = x)) .action((x,c) => c.copy(date = x))
note( note(
""" """
|For example, the following command runs this app on a tidb dataset: |For example, the following command runs this app on a tidb dataset:
...@@ -70,15 +70,15 @@ object EsmmData { ...@@ -70,15 +70,15 @@ object EsmmData {
if (max_stat_date_str != param.date){ if (max_stat_date_str != param.date){
val stat_date = param.date val stat_date = param.date
println(stat_date) println(stat_date)
// val imp_data = sc.sql( // val imp_data = sc.sql(
// s""" // s"""
// |select distinct stat_date,device_id,city_id as ucity_id, // |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id // | cid_id,diary_service_id
// |from data_feed_exposure // |from data_feed_exposure
// |where cid_type = 'diary' // |where cid_type = 'diary'
// |and stat_date ='${stat_date}' // |and stat_date ='${stat_date}'
// """.stripMargin // """.stripMargin
// ) // )
val imp_data = sc.sql( val imp_data = sc.sql(
s""" s"""
...@@ -91,8 +91,8 @@ object EsmmData { ...@@ -91,8 +91,8 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
// imp_data.show() // imp_data.show()
println("imp_data.count()") println("imp_data.count()")
println(imp_data.count()) println(imp_data.count())
val clk_data = sc.sql( val clk_data = sc.sql(
...@@ -105,8 +105,8 @@ object EsmmData { ...@@ -105,8 +105,8 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
// clk_data.show() // clk_data.show()
println("clk_data.count()") println("clk_data.count()")
println(clk_data.count()) println(clk_data.count())
...@@ -384,6 +384,14 @@ object EsmmPredData { ...@@ -384,6 +384,14 @@ object EsmmPredData {
val yesteday_have_seq = GmeiConfig.getMinusNDate(1) val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
val target_user = sc.sql(
s"""
|select concat(t.device_id,",",t.city_id) from
|(select distinct device_id,city_id
|from data_feed_exposure where stat_date='${yesteday_have_seq}') t
""".stripMargin).collect().map(x => x(0).toString)
println("target_user",target_user.length)
//nearby_data //nearby_data
val raw_data = sc.sql( val raw_data = sc.sql(
s""" s"""
...@@ -393,20 +401,21 @@ object EsmmPredData { ...@@ -393,20 +401,21 @@ object EsmmPredData {
|select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue |select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
|union |union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 |select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}') """.stripMargin)
""".stripMargin // raw_data.show()
)
// raw_data.show()
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map { val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city"))
.filter(x => target_user.indexOf(x._1) != -1)
.map {
case (device_city, cid_data) => case (device_city, cid_data) =>
val device_id = Try(device_city.split(",")(0)).getOrElse("") val device_id = Try(device_city.split(",")(0)).getOrElse("")
val city_id = Try(device_city.split(",")(1)).getOrElse("") val city_id = Try(device_city.split(",")(1)).getOrElse("")
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("") val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids") (device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue") }.filter(_._3!="").toDF("device_id","city_id","merge_queue")
// println("nearby_device_count",raw_data1.count())
// println("nearby_device_count",raw_data1.count())
val start= LocalDate.now().minusDays(14).toString val start= LocalDate.now().minusDays(14).toString
import sc.implicits._ import sc.implicits._
...@@ -443,7 +452,7 @@ object EsmmPredData { ...@@ -443,7 +452,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
).withColumn("label",lit(1)) ).withColumn("label",lit(1))
raw_data2.createOrReplaceTempView("raw_data2") raw_data2.createOrReplaceTempView("raw_data2")
// println("nearby_explode_count",raw_data2.count()) // println("nearby_explode_count",raw_data2.count())
// native_data // native_data
...@@ -455,7 +464,7 @@ object EsmmPredData { ...@@ -455,7 +464,7 @@ object EsmmPredData {
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != "" |where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin """.stripMargin
) )
// println("native_device_count",native_data.count()) // println("native_device_count",native_data.count())
if (history.take(1).nonEmpty){ if (history.take(1).nonEmpty){
native_data.createOrReplaceTempView("temp") native_data.createOrReplaceTempView("temp")
...@@ -473,13 +482,16 @@ object EsmmPredData { ...@@ -473,13 +482,16 @@ object EsmmPredData {
native_data.createOrReplaceTempView("native_data") native_data.createOrReplaceTempView("native_data")
} }
history.unpersist()
val native_data1 = sc.sql( val native_data1 = sc.sql(
s""" s"""
|select device_id,city_id as ucity_id,explode(split(native_queue,',')) as cid_id from native_data |select device_id,city_id as ucity_id,explode(split(native_queue,',')) as cid_id from native_data
""".stripMargin """.stripMargin
).withColumn("label",lit(0)) ).withColumn("label",lit(0))
native_data1.createOrReplaceTempView("native_data1") native_data1.createOrReplaceTempView("native_data1")
// println("native_explode_count",native_data1.count()) // println("native_explode_count",native_data1.count())
//union //union
val union_data = sc.sql( val union_data = sc.sql(
...@@ -490,7 +502,7 @@ object EsmmPredData { ...@@ -490,7 +502,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
union_data.createOrReplaceTempView("raw_data") union_data.createOrReplaceTempView("raw_data")
// println("union_count",union_data.count()) // println("union_count",union_data.count())
//join feat //join feat
...@@ -505,8 +517,8 @@ object EsmmPredData { ...@@ -505,8 +517,8 @@ object EsmmPredData {
|where b.partition_date = '${yesteday}' |where b.partition_date = '${yesteday}'
""".stripMargin """.stripMargin
) )
// sid_data.show() // sid_data.show()
// println(sid_data.count()) // println(sid_data.count())
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0)) val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data") sid_data_label.createOrReplaceTempView("union_data")
...@@ -592,22 +604,23 @@ object EsmmPredData { ...@@ -592,22 +604,23 @@ object EsmmPredData {
val union_data_scity_id2 = sc.sql( val union_data_scity_id2 = sc.sql(
s""" s"""
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y, |select device_id,cid_id,first(stat_date) stat_date,ucity_id,first(label) label,first(diary_service_id) diary_service_id,first(y) y,
|first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name, |first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,
|first(scity_id) scity_id,first(hospital_id) hospital_id |first(scity_id) scity_id,first(hospital_id) hospital_id
|from union_data_scity_id |from union_data_scity_id
|group by device_id,cid_id,label |group by device_id,ucity_id,cid_id
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
// println(union_data_scity_id2.count()) // println(union_data_scity_id2.count())
union_data_scity_id2.persist() union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
union_data_scity_id2.unpersist() union_data_scity_id2.unpersist()
sc.stop() sc.stop()
} }
...@@ -753,19 +766,19 @@ object GetDevicePortrait { ...@@ -753,19 +766,19 @@ object GetDevicePortrait {
device_search_tag.createOrReplaceTempView("tag_count") device_search_tag.createOrReplaceTempView("tag_count")
val max_count_tag = sc.sql( val max_count_tag = sc.sql(
s""" s"""
|select a.device_id,a.stat_date,a.level1_id as max_level1_id,a.level1_count as max_level1_count |select a.device_id,a.stat_date,a.level1_id as max_level1_id,a.level1_count as max_level1_count
|from tag_count a |from tag_count a
|inner join |inner join
|(select device_id,max(level1_count) as max_count from tag_count group by device_id) b |(select device_id,max(level1_count) as max_count from tag_count group by device_id) b
|on a.level1_count = b.max_count and a.device_id = b.device_id |on a.level1_count = b.max_count and a.device_id = b.device_id
""".stripMargin """.stripMargin
) )
// .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString)) // .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
// max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat) // max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat)
// //
// max_count_tag.take(10).foreach(println) // max_count_tag.take(10).foreach(println)
// println(max_count_tag.count()) // println(max_count_tag.count())
//drop duplicates //drop duplicates
val max_count_tag_rdd = max_count_tag.rdd.groupBy(_.getAs[String]("device_id")).map { val max_count_tag_rdd = max_count_tag.rdd.groupBy(_.getAs[String]("device_id")).map {
...@@ -833,7 +846,7 @@ object GetLevelCount { ...@@ -833,7 +846,7 @@ object GetLevelCount {
import sc.implicits._ import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1).replace("-","") val stat_date = GmeiConfig.getMinusNDate(1).replace("-","")
// val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",") // val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",")
val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580" val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580"
val diary_level1 = sc.sql( val diary_level1 = sc.sql(
s""" s"""
...@@ -1262,4 +1275,4 @@ object EsmmDataTest { ...@@ -1262,4 +1275,4 @@ object EsmmDataTest {
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment