Commit f3d0a6a8 authored by 张彦钊's avatar 张彦钊

修改ffm文件

parent 323fb84a
...@@ -55,7 +55,7 @@ object Data2FFM { ...@@ -55,7 +55,7 @@ object Data2FFM {
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5) // val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val esmm_data = sc.sql( val esmm_data = sc.sql(
s""" s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|from esmm_train_data |from esmm_train_data
""".stripMargin """.stripMargin
).na.drop() ).na.drop()
...@@ -64,7 +64,7 @@ object Data2FFM { ...@@ -64,7 +64,7 @@ object Data2FFM {
val esmm_pre_data = sc.sql( val esmm_pre_data = sc.sql(
s""" s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|from esmm_pre_data |from esmm_pre_data
""".stripMargin """.stripMargin
).na.drop() ).na.drop()
...@@ -108,32 +108,30 @@ object Data2FFM { ...@@ -108,32 +108,30 @@ object Data2FFM {
val rdd = esmm_data.rdd.repartition(200) val rdd = esmm_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString)) x(7).toString))
rdd.persist() rdd.persist()
import sc.implicits._ import sc.implicits._
val train = rdd.filter(x => x._4 != max_stat_date_str) val train = rdd.filter(x => x._4 != max_stat_date_str)
.map(x => (x._1,x._2,x._3, .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._8),x._6,x._7))
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)
val test = rdd.filter(x => x._4 == max_stat_date_str) val test = rdd.filter(x => x._4 == max_stat_date_str)
.map(x => (x._1,x._2,x._3, .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._8),x._6,x._7))
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
...@@ -142,18 +140,16 @@ object Data2FFM { ...@@ -142,18 +140,16 @@ object Data2FFM {
val rdd_pre = esmm_pre_data.rdd.repartition(200) val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1) x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
.filter(x => esmm_join_city.indexOf(x._5) != -1) .filter(x => esmm_join_city.indexOf(x._5) != -1)
rdd_pre.persist() val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
val pre = rdd_pre.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._8),x._6,x._7))
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment