Commit a0bb0220 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add test file
parents 7dec134c f0e20388
......@@ -9,6 +9,8 @@ import scopt.OptionParser
import com.gmei.lib.AbstractParams
import org.apache.spark.sql.functions.lit
import scala.util.Try
object EsmmData {
......@@ -237,31 +239,65 @@ object EsmmPredData {
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "merge_queue_table")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
ti.tidbMapTable("eagle","ffm_diary_queue")
ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
val activate_data = sc.sql(
// val activate_data = sc.sql(
// s"""
// |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
// |from merge_queue_table a
// |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
// |where b.stat_date ='${yesteday_have_seq}'
// |and b.device_id is not null
// """.stripMargin
// )
val raw_data = sc.sql(
s"""
|select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
|from merge_queue_table a
|left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
|where b.stat_date ='${yesteday_have_seq}'
|and b.device_id is not null
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
|(select device_id,city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
|union
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data)
""".stripMargin
)
raw_data.show()
val raw_data = sc.sql(
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
case (device_city, cid_data) =>
val device_id = Try(device_city.split(",")(0)).getOrElse("")
val city_id = Try(device_city.split(",")(1)).getOrElse("")
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(300).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1")
println(raw_data1.count())
val raw_data2 = sc.sql(
s"""
|select device_id,city_id as ucity_id, explode(split(search_queue, ',')) as cid_id
|from merge_queue_table
|select device_id,city_id,explode(split(merge_queue, ',')) as cid_id from raw_data1
""".stripMargin
)
activate_data.createOrReplaceTempView("raw_data")
raw_data2.createOrReplaceTempView("raw_data")
print(raw_data2.count())
// activate_data.createOrReplaceTempView("raw_data")
// raw_data.show()
import sc.implicits._
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val sid_data = sc.sql(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment