Commit 574de9b7 authored by 张彦钊's avatar 张彦钊

修改rdd

parent e347d1ff
......@@ -55,17 +55,10 @@ object Data2FFM {
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val esmm_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data limit 6
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
""".stripMargin
).na.drop()
esmm_data.show(8)
println(esmm_data.count())
esmm_data.createOrReplaceTempView("a")
val sql1 = "select * from a"
val df = sc.sql(sql1)
df.cache()
val column_list = df.columns
val column_list = esmm_data.columns
val max_stat_date = sc.sql(
s"""
|select max(stat_date) from esmm_train_data
......@@ -80,13 +73,13 @@ object Data2FFM {
val column_number = scala.collection.mutable.Map[String,Array[String]]()
for (i <- column_list){
column_number(i) = df.select(i).distinct().collect().map(x => x(0).toString)
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
}
val a = column_number("device_id").toList
println(a)
println("dict")
val rdd = df.rdd.repartition(200)
val rdd = esmm_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(7).toString))
rdd.persist()
......@@ -107,60 +100,60 @@ object Data2FFM {
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("train")
train.show(6)
// val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
// GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)
//
// val test = rdd.filter(x => x._4 == max_stat_date_str)
// .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
// column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
// column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
// column_number("ccity_name").indexOf(x._8),x._5,x._6))
// .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
// format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
// .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
// .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
// println("test")
// test.show(6)
// GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
//
//
// val esmm_pre_data = sc.sql(
// s"""
// |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
// |from esmm_pre_data
// """.stripMargin
// ).na.drop()
//
// val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
// val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
// val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
//
// val rdd_pre = esmm_pre_data.rdd.repartition(200)
// .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
// x(4).toString,x(5).toString,x(6).toString,
// x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
// .filter(x => esmm_join_city.indexOf(x._5) != -1)
// .filter(x => esmm_join_device.indexOf(x._1) != -1)
// val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
// column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
// column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
// column_number("ccity_name").indexOf(x._8),x._5,x._6))
// .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
// format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
// .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
// .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
// println("pre")
// pre.show(6)
// GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)
val test = rdd.filter(x => x._4 == max_stat_date_str)
.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("ccity_name").indexOf(x._8),x._5,x._6))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("test")
test.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
val esmm_pre_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|from esmm_pre_data
""".stripMargin
).na.drop()
val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
s => s(0).toString
)
val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
s => s(0).toString
)
val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
s => s(0).toString
)
val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString,
x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
.filter(x => esmm_join_city.indexOf(x._5) != -1)
.filter(x => esmm_join_device.indexOf(x._1) != -1)
val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("ccity_name").indexOf(x._8),x._5,x._6))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("pre")
pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
sc.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment