Commit 5513bf82 authored by 高雅喆's avatar 高雅喆

change pre data from merge 3 recall

parent 6bafdeb6
......@@ -9,6 +9,8 @@ import scopt.OptionParser
import com.gmei.lib.AbstractParams
import org.apache.spark.sql.functions.lit
import scala.util.Try
object EsmmData {
......@@ -237,31 +239,61 @@ object EsmmPredData {
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "merge_queue_table")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
ti.tidbMapTable("eagle","ffm_diary_queue")
ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
val activate_data = sc.sql(
// val activate_data = sc.sql(
// s"""
// |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
// |from merge_queue_table a
// |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
// |where b.stat_date ='${yesteday_have_seq}'
// |and b.device_id is not null
// """.stripMargin
// )
val raw_data = sc.sql(
s"""
|select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
|from merge_queue_table a
|left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
|where b.stat_date ='${yesteday_have_seq}'
|and b.device_id is not null
|select concat(device_id,",",city_id) as device_city,similarity_cid from nd_device_cid_similarity_matrix
|union
|select concat(device_id,",",city_id) as device_city,native_queue from ffm_diary_queue
|union
|select concat(device_id,",",city_id) as device_city,search_queue from search_queue
""".stripMargin
)
val raw_data = sc.sql(
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
case (device_city, cid_data) =>
val device_id = device_city.split(",")(0)
val city_id = device_city.split(",")(1)
val cids = Try(cid_data.toSeq.map(_.getAs[String]("similarity_cid").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(300).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1")
val raw_data2 = sc.sql(
s"""
|select device_id,city_id as ucity_id, explode(split(search_queue, ',')) as cid_id
|from merge_queue_table
|select a.device_id,a.city_id,explode(split(a.merge_queue, ',')) as cid_id from raw_data1 a
|left join esmm_train_data b on a.device_id = b.device_id
|where b.device_id is not null
""".stripMargin
)
activate_data.createOrReplaceTempView("raw_data")
raw_data2.createOrReplaceTempView("raw_data")
// activate_data.createOrReplaceTempView("raw_data")
// raw_data.show()
import sc.implicits._
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val sid_data = sc.sql(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment