Commit 95adea27 authored by 高雅喆's avatar 高雅喆

add if

parent a8c58338
...@@ -55,9 +55,18 @@ object EsmmData { ...@@ -55,9 +55,18 @@ object EsmmData {
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
import sc.implicits._ val max_stat_date = sc.sql(
s"""
|select max(stat_date) from esmm_train_data
""".stripMargin
)
val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
println("max_stat_date_str",max_stat_date_str)
println("param.date",param.date)
if (max_stat_date_str == param.date){
val stat_date = param.date val stat_date = param.date
println(stat_date) println(stat_date)
val imp_data = sc.sql( val imp_data = sc.sql(
...@@ -69,9 +78,9 @@ object EsmmData { ...@@ -69,9 +78,9 @@ object EsmmData {
|and stat_date ='${stat_date}' |and stat_date ='${stat_date}'
""".stripMargin """.stripMargin
) )
// imp_data.show() // imp_data.show()
// println("imp_data.count()") // println("imp_data.count()")
// println(imp_data.count()) // println(imp_data.count())
val clk_data = sc.sql( val clk_data = sc.sql(
...@@ -83,17 +92,17 @@ object EsmmData { ...@@ -83,17 +92,17 @@ object EsmmData {
|and stat_date ='${stat_date}' |and stat_date ='${stat_date}'
""".stripMargin """.stripMargin
) )
// clk_data.show() // clk_data.show()
// println("clk_data.count()") // println("clk_data.count()")
// println(clk_data.count()) // println(clk_data.count())
val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0)) val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
// imp_data_filter.createOrReplaceTempView("imp_data_filter") // imp_data_filter.createOrReplaceTempView("imp_data_filter")
// imp_data_filter.show() // imp_data_filter.show()
// println("imp_data_filter.count()") // println("imp_data_filter.count()")
// println(imp_data_filter.count()) // println(imp_data_filter.count())
val stat_date_not = stat_date.replace("-","") val stat_date_not = stat_date.replace("-","")
...@@ -112,25 +121,25 @@ object EsmmData { ...@@ -112,25 +121,25 @@ object EsmmData {
) )
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1)) val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter") // cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
// cvr_data_filter.show() // cvr_data_filter.show()
// println("cvr_data_filter.count()") // println("cvr_data_filter.count()")
// println(cvr_data_filter.count()) // println(cvr_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0)) val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter") // clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show() // clk_data_filter.show()
// println("clk_data_filter.count()") // println("clk_data_filter.count()")
// println(clk_data_filter.count()) // println(clk_data_filter.count())
val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter) val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
union_data.createOrReplaceTempView("union_data") union_data.createOrReplaceTempView("union_data")
// union_data.show() // union_data.show()
// println("union_data.count()") // println("union_data.count()")
// println(union_data.count()) // println(union_data.count())
...@@ -146,7 +155,7 @@ object EsmmData { ...@@ -146,7 +155,7 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
union_data_clabel.createOrReplaceTempView("union_data_clabel") union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show() // union_data_clabel.show()
val union_data_slabel = sc.sql( val union_data_slabel = sc.sql(
s""" s"""
...@@ -160,7 +169,7 @@ object EsmmData { ...@@ -160,7 +169,7 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
union_data_slabel.createOrReplaceTempView("union_data_slabel") union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show() // union_data_slabel.show()
val union_data_ccity_name = sc.sql( val union_data_ccity_name = sc.sql(
...@@ -174,7 +183,7 @@ object EsmmData { ...@@ -174,7 +183,7 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show() // union_data_ccity_name.show()
val union_data_scity_id = sc.sql( val union_data_scity_id = sc.sql(
s""" s"""
...@@ -189,12 +198,13 @@ object EsmmData { ...@@ -189,12 +198,13 @@ object EsmmData {
|and d.partition_date='${stat_date_not}' |and d.partition_date='${stat_date_not}'
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show() union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
} else {
println("esmm_train_data already have param.date data")
}
sc.stop() sc.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment