Commit 3494d3f0 authored by 张彦钊's avatar 张彦钊

修复esmm模型native_queue是空的bug

parent e92a8969
......@@ -70,15 +70,15 @@ object EsmmData {
if (max_stat_date_str != param.date){
val stat_date = param.date
println(stat_date)
// val imp_data = sc.sql(
// s"""
// |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id
// |from data_feed_exposure
// |where cid_type = 'diary'
// |and stat_date ='${stat_date}'
// """.stripMargin
// )
// val imp_data = sc.sql(
// s"""
// |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id
// |from data_feed_exposure
// |where cid_type = 'diary'
// |and stat_date ='${stat_date}'
// """.stripMargin
// )
val imp_data = sc.sql(
s"""
......@@ -384,6 +384,14 @@ object EsmmPredData {
val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
val target_user = sc.sql(
s"""
|select concat(t.device_id,",",t.city_id) from
|(select distinct device_id,city_id
|from data_feed_exposure where stat_date='${yesteday_have_seq}') t
""".stripMargin).collect().map(x => x(0).toString)
println("target_user",target_user.length)
//nearby_data
val raw_data = sc.sql(
s"""
......@@ -393,20 +401,21 @@ object EsmmPredData {
|select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
|union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
""".stripMargin
)
// raw_data.show()
""".stripMargin)
// raw_data.show()
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city"))
.filter(x => target_user.indexOf(x._1) != -1)
.map {
case (device_city, cid_data) =>
val device_id = Try(device_city.split(",")(0)).getOrElse("")
val city_id = Try(device_city.split(",")(1)).getOrElse("")
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
// println("nearby_device_count",raw_data1.count())
// println("nearby_device_count",raw_data1.count())
val start= LocalDate.now().minusDays(14).toString
import sc.implicits._
......@@ -443,7 +452,7 @@ object EsmmPredData {
""".stripMargin
).withColumn("label",lit(1))
raw_data2.createOrReplaceTempView("raw_data2")
// println("nearby_explode_count",raw_data2.count())
// println("nearby_explode_count",raw_data2.count())
// native_data
......@@ -455,7 +464,7 @@ object EsmmPredData {
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin
)
// println("native_device_count",native_data.count())
// println("native_device_count",native_data.count())
if (history.take(1).nonEmpty){
native_data.createOrReplaceTempView("temp")
......@@ -473,13 +482,16 @@ object EsmmPredData {
native_data.createOrReplaceTempView("native_data")
}
history.unpersist()
val native_data1 = sc.sql(
s"""
|select device_id,city_id as ucity_id,explode(split(native_queue,',')) as cid_id from native_data
""".stripMargin
).withColumn("label",lit(0))
native_data1.createOrReplaceTempView("native_data1")
// println("native_explode_count",native_data1.count())
// println("native_explode_count",native_data1.count())
//union
val union_data = sc.sql(
......@@ -490,7 +502,7 @@ object EsmmPredData {
""".stripMargin
)
union_data.createOrReplaceTempView("raw_data")
// println("union_count",union_data.count())
// println("union_count",union_data.count())
//join feat
......@@ -505,8 +517,8 @@ object EsmmPredData {
|where b.partition_date = '${yesteday}'
""".stripMargin
)
// sid_data.show()
// println(sid_data.count())
// sid_data.show()
// println(sid_data.count())
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data")
......@@ -592,22 +604,23 @@ object EsmmPredData {
val union_data_scity_id2 = sc.sql(
s"""
|select device_id,cid_id,first(stat_date) stat_date,first(ucity_id) ucity_id,label,first(diary_service_id)diary_service_id,first(y) y,
|select device_id,cid_id,first(stat_date) stat_date,ucity_id,first(label) label,first(diary_service_id) diary_service_id,first(y) y,
|first(z) z,first(clevel1_id) clevel1_id,first(slevel1_id) slevel1_id,first(ccity_name) ccity_name,
|first(scity_id) scity_id,first(hospital_id) hospital_id
|from union_data_scity_id
|group by device_id,cid_id,label
|group by device_id,ucity_id,cid_id
""".stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
// println(union_data_scity_id2.count())
// println(union_data_scity_id2.count())
union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
union_data_scity_id2.unpersist()
sc.stop()
}
......@@ -761,11 +774,11 @@ object GetDevicePortrait {
|on a.level1_count = b.max_count and a.device_id = b.device_id
""".stripMargin
)
// .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
// max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat)
//
// max_count_tag.take(10).foreach(println)
// println(max_count_tag.count())
// .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
// max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat)
//
// max_count_tag.take(10).foreach(println)
// println(max_count_tag.count())
//drop duplicates
val max_count_tag_rdd = max_count_tag.rdd.groupBy(_.getAs[String]("device_id")).map {
......@@ -833,7 +846,7 @@ object GetLevelCount {
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1).replace("-","")
// val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",")
// val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",")
val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580"
val diary_level1 = sc.sql(
s"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment