Commit d7ec6450 authored by 王志伟's avatar 王志伟
parents 77e93663 beb78a18
......@@ -2,6 +2,7 @@ package com.gmei
import java.io.Serializable
import java.time.LocalDate
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
......@@ -279,6 +280,7 @@ object EsmmPredData {
ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
ti.tidbMapTable("eagle","biz_feed_diary_queue")
ti.tidbMapTable("jerry_prod","data_feed_exposure_precise")
import sc.implicits._
......@@ -306,9 +308,37 @@ object EsmmPredData {
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1")
println("nearby_device_count",raw_data1.count())
val start= LocalDate.now().minusDays(8).toString
import sc.implicits._
val sql =
s"""
|select distinct device_id,cid_id from data_feed_exposure_precise
|where stat_date >= "$start" and cid_type = "diary"
""".stripMargin
val history = sc.sql(sql).repartition(200).rdd
.map(x =>(x(0).toString,x(1).toString)).groupByKey().map(x => (x._1,x._2.mkString(",")))
.toDF("device_id","cid_set")
history.persist()
history.createOrReplaceTempView("history")
if (history.take(1).nonEmpty){
raw_data1.createOrReplaceTempView("r")
val sql_nearby_filter =
s"""
|select r.device_id,r.city_id,r.merge_queue,history.cid_set from r
|left join history on r.device_id = history.device_id
""".stripMargin
val df = sc.sql(sql_nearby_filter).na.fill("").rdd
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
.map(x => (x._1,x._2,x._3.split(",").diff(x._4.split(",")).mkString(",")))
.toDF("device_id","city_id","merge_queue")
df.createOrReplaceTempView("raw_data1")
}else{
raw_data1.createOrReplaceTempView("raw_data1")
}
val raw_data2 = sc.sql(
s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1
......@@ -327,9 +357,24 @@ object EsmmPredData {
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin
)
native_data.createOrReplaceTempView("native_data")
println("native_device_count",native_data.count())
if (history.take(1).nonEmpty){
native_data.createOrReplaceTempView("temp")
val sql_native_filter =
s"""
|select t.device_id,t.city_id,t.native_queue,history.cid_set from temp t
|left join history on t.device_id = history.device_id
""".stripMargin
val df = sc.sql(sql_native_filter).na.fill("").rdd
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
.map(x => (x._1,x._2,x._3.split(",").diff(x._4.split(",")).mkString(",")))
.toDF("device_id","city_id","native_queue")
df.createOrReplaceTempView("native_data")
}else{
native_data.createOrReplaceTempView("native_data")
}
val native_data1 = sc.sql(
s"""
|select device_id,city_id as ucity_id,explode(split(native_queue,',')) as cid_id from native_data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment