Commit 7594926a authored by 张彦钊's avatar 张彦钊

esmm 预测候选集过滤掉被惩罚医生关联的日记

parent 9b43214d
...@@ -396,7 +396,7 @@ object EsmmPredData { ...@@ -396,7 +396,7 @@ object EsmmPredData {
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}') |where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
""".stripMargin """.stripMargin
) )
raw_data.show() // raw_data.show()
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map { val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
...@@ -406,7 +406,7 @@ object EsmmPredData { ...@@ -406,7 +406,7 @@ object EsmmPredData {
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("") val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(500).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids") (device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue") }.filter(_._3!="").toDF("device_id","city_id","merge_queue")
println("nearby_device_count",raw_data1.count()) // println("nearby_device_count",raw_data1.count())
val start= LocalDate.now().minusDays(14).toString val start= LocalDate.now().minusDays(14).toString
import sc.implicits._ import sc.implicits._
...@@ -443,7 +443,7 @@ object EsmmPredData { ...@@ -443,7 +443,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
).withColumn("label",lit(1)) ).withColumn("label",lit(1))
raw_data2.createOrReplaceTempView("raw_data2") raw_data2.createOrReplaceTempView("raw_data2")
println("nearby_explode_count",raw_data2.count()) // println("nearby_explode_count",raw_data2.count())
// native_data // native_data
...@@ -455,7 +455,7 @@ object EsmmPredData { ...@@ -455,7 +455,7 @@ object EsmmPredData {
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != "" |where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin """.stripMargin
) )
println("native_device_count",native_data.count()) // println("native_device_count",native_data.count())
if (history.take(1).nonEmpty){ if (history.take(1).nonEmpty){
native_data.createOrReplaceTempView("temp") native_data.createOrReplaceTempView("temp")
...@@ -479,9 +479,7 @@ object EsmmPredData { ...@@ -479,9 +479,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
).withColumn("label",lit(0)) ).withColumn("label",lit(0))
native_data1.createOrReplaceTempView("native_data1") native_data1.createOrReplaceTempView("native_data1")
println("native_explode_count",native_data1.count()) // println("native_explode_count",native_data1.count())
//union //union
val union_data = sc.sql( val union_data = sc.sql(
...@@ -492,7 +490,7 @@ object EsmmPredData { ...@@ -492,7 +490,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
union_data.createOrReplaceTempView("raw_data") union_data.createOrReplaceTempView("raw_data")
println("union_count",union_data.count()) // println("union_count",union_data.count())
//join feat //join feat
...@@ -508,7 +506,7 @@ object EsmmPredData { ...@@ -508,7 +506,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
// sid_data.show() // sid_data.show()
println(sid_data.count()) // println(sid_data.count())
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0)) val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data") sid_data_label.createOrReplaceTempView("union_data")
...@@ -556,10 +554,29 @@ object EsmmPredData { ...@@ -556,10 +554,29 @@ object EsmmPredData {
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show() // union_data_ccity_name.show()
val jdbcDF = sc.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com:3306/zhengxing")
.option("dbtable", "api_punishment")
.option("user", "work")
.option("password", "BJQaT9VzDcuPBqkd")
.load()
jdbcDF.createOrReplaceTempView("api_punishment")
val now = LocalDate.now().toString
val punish_doctor = sc.sql(
s"""
|select doctor_id from api_punishment
|where end_time > '$now'
""".stripMargin).collect().map(x => x(0).toString).distinct
println("punish_doctor")
println(punish_doctor.length)
val union_data_scity_id = sc.sql( val union_data_scity_id = sc.sql(
s""" s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id | d.city_id as scity_id,b.doctor_id
|from union_data_ccity_name a |from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
...@@ -567,8 +584,10 @@ object EsmmPredData { ...@@ -567,8 +584,10 @@ object EsmmPredData {
|where b.partition_date='${yesteday}' |where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}' |and c.partition_date='${yesteday}'
|and d.partition_date='${yesteday}' |and d.partition_date='${yesteday}'
|and b.doctor_id not in (${punish_doctor.map(x => s"'$x'").mkString(",")})
""".stripMargin """.stripMargin
) )
union_data_scity_id.createOrReplaceTempView("union_data_scity_id") union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
val union_data_scity_id2 = sc.sql( val union_data_scity_id2 = sc.sql(
...@@ -580,7 +599,6 @@ object EsmmPredData { ...@@ -580,7 +599,6 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
// println(union_data_scity_id2.count()) // println(union_data_scity_id2.count())
union_data_scity_id2.persist() union_data_scity_id2.persist()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment