Commit 62067f31 authored by 王志伟's avatar 王志伟
parents 734506ff d261a004
...@@ -57,8 +57,8 @@ object Data2FFM { ...@@ -57,8 +57,8 @@ object Data2FFM {
s""" s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
""".stripMargin """.stripMargin
).na.drop() ).repartition(200).na.drop()
val column_list = esmm_data.columns val column_list = esmm_data.columns.filter(x => x != "y" && x != "z")
val max_stat_date = sc.sql( val max_stat_date = sc.sql(
s""" s"""
|select max(stat_date) from esmm_train_data |select max(stat_date) from esmm_train_data
...@@ -70,19 +70,18 @@ object Data2FFM { ...@@ -70,19 +70,18 @@ object Data2FFM {
println(max_stat_date_str) println(max_stat_date_str)
println(column_list.slice(0,2).toList) println(column_list.slice(0,2).toList)
esmm_data.persist()
val column_number = scala.collection.mutable.Map[String,Array[String]]() val column_number = scala.collection.mutable.Map[String,Array[String]]()
for (i <- column_list){ for (i <- column_list){
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString) column_number(i) = esmm_data.select(i).collect().map(x => x(0).toString).distinct
} }
esmm_data.unpersist()
println("dict") println("dict")
val rdd = esmm_data.rdd.repartition(200) val rdd = esmm_data.rdd
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(7).toString)) x(4).toString,x(5).toString,x(6).toString, x(7).toString))
rdd.persist() rdd.persist()
import sc.implicits._ import sc.implicits._
val train = rdd.filter(x => x._4 != max_stat_date_str) val train = rdd.filter(x => x._4 != max_stat_date_str)
.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
...@@ -109,6 +108,7 @@ object Data2FFM { ...@@ -109,6 +108,7 @@ object Data2FFM {
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid") .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("test") println("test")
test.show(6) test.show(6)
rdd.unpersist()
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
...@@ -117,7 +117,7 @@ object Data2FFM { ...@@ -117,7 +117,7 @@ object Data2FFM {
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|from esmm_pre_data |from esmm_pre_data
""".stripMargin """.stripMargin
).na.drop() ).repartition(200).na.drop()
val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map( val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
s => s(0).toString s => s(0).toString
...@@ -125,19 +125,16 @@ object Data2FFM { ...@@ -125,19 +125,16 @@ object Data2FFM {
val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map( val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
s => s(0).toString s => s(0).toString
) )
val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
s => s(0).toString
)
val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id")) val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id")) val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
val rdd_pre = esmm_pre_data.rdd.repartition(200) val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString,
x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1) x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
.filter(x => esmm_join_city.indexOf(x._5) != -1) .filter(x => esmm_join_city.indexOf(x._5) != -1)
.filter(x => esmm_join_device.indexOf(x._1) != -1)
val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
......
...@@ -65,7 +65,7 @@ object EsmmData { ...@@ -65,7 +65,7 @@ object EsmmData {
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date >'${stat_date}' |and stat_date >'${stat_date}'
""".stripMargin """.stripMargin
) ).repartition(200)
// imp_data.show() // imp_data.show()
// println("imp_data.count()") // println("imp_data.count()")
// println(imp_data.count()) // println(imp_data.count())
...@@ -79,7 +79,7 @@ object EsmmData { ...@@ -79,7 +79,7 @@ object EsmmData {
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date >'${stat_date}' |and stat_date >'${stat_date}'
""".stripMargin """.stripMargin
) ).repartition(200)
// clk_data.show() // clk_data.show()
// println("clk_data.count()") // println("clk_data.count()")
// println(clk_data.count()) // println(clk_data.count())
...@@ -106,7 +106,7 @@ object EsmmData { ...@@ -106,7 +106,7 @@ object EsmmData {
|and params['page_name'] = 'welfare_detail' |and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail' |and params['referrer'] = 'diary_detail'
""".stripMargin """.stripMargin
) ).repartition(200)
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1)) val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter") // cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
...@@ -268,7 +268,7 @@ object EsmmPredData { ...@@ -268,7 +268,7 @@ object EsmmPredData {
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 |select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data) |where tmp1.device_id in (select distinct device_id from esmm_train_data)
""".stripMargin """.stripMargin
) ).repartition(200)
raw_data.show() raw_data.show()
...@@ -286,7 +286,7 @@ object EsmmPredData { ...@@ -286,7 +286,7 @@ object EsmmPredData {
s""" s"""
|select device_id,city_id,merge_queue from raw_data1 limit 10000 |select device_id,city_id,merge_queue from raw_data1 limit 10000
""".stripMargin """.stripMargin
) ).repartition(200)
raw_data2.createOrReplaceTempView("raw_data2") raw_data2.createOrReplaceTempView("raw_data2")
println(raw_data2.count()) println(raw_data2.count())
raw_data2.show() raw_data2.show()
...@@ -295,7 +295,7 @@ object EsmmPredData { ...@@ -295,7 +295,7 @@ object EsmmPredData {
s""" s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2 |select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2
""".stripMargin """.stripMargin
) ).repartition(200)
raw_data3.createOrReplaceTempView("raw_data") raw_data3.createOrReplaceTempView("raw_data")
println(raw_data3.count()) println(raw_data3.count())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment