Commit 22eb4111 authored by 高雅喆's avatar 高雅喆

bug fix

parent 8fe70c8c
......@@ -260,14 +260,17 @@ object EsmmPredData {
val raw_data = sc.sql(
s"""
|select concat(device_id,",",city_id) as device_city,similarity_cid from nd_device_cid_similarity_matrix
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
|(select device_id,city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
|union
|select concat(device_id,",",city_id) as device_city,native_queue from ffm_diary_queue
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|union
|select concat(device_id,",",city_id) as device_city,search_queue from search_queue
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data)
""".stripMargin
)
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
case (device_city, cid_data) =>
val device_id = device_city.split(",")(0)
......@@ -275,16 +278,9 @@ object EsmmPredData {
val cids = Try(cid_data.toSeq.map(_.getAs[String]("similarity_cid").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(300).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1")
val raw_data2 = sc.sql(
s"""
|select a.device_id,a.city_id as ucity_id,explode(split(a.merge_queue, ',')) as cid_id from raw_data1 a
|left join esmm_train_data b on a.device_id = b.device_id
|where b.device_id is not null
""".stripMargin
)
raw_data2.createOrReplaceTempView("raw_data")
println(raw_data1.count())
raw_data1.createOrReplaceTempView("raw_data")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment