Commit 8cc061c7 authored by 张彦钊's avatar 张彦钊

修改rdd

parent a6d50b78
...@@ -60,7 +60,11 @@ object Data2FFM { ...@@ -60,7 +60,11 @@ object Data2FFM {
).na.drop() ).na.drop()
esmm_data.show(8) esmm_data.show(8)
println(esmm_data.count()) println(esmm_data.count())
val column_list = esmm_data.columns esmm_data.createOrReplaceTempView("df")
val sql1 = "select * from df"
val df = sc.sql(sql1)
val column_list = df.columns
val max_stat_date = sc.sql( val max_stat_date = sc.sql(
s""" s"""
...@@ -76,13 +80,13 @@ object Data2FFM { ...@@ -76,13 +80,13 @@ object Data2FFM {
val column_number = scala.collection.mutable.Map[String,Array[String]]() val column_number = scala.collection.mutable.Map[String,Array[String]]()
for (i <- column_list){ for (i <- column_list){
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString) column_number(i) = df.select(i).distinct().collect().map(x => x(0).toString)
} }
val a = column_number("device_id").toList val a = column_number("device_id").toList
println(a) println(a)
println("dict") println("dict")
val rdd = esmm_data.rdd.repartition(200) val rdd = df.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(7).toString)) x(4).toString,x(5).toString,x(6).toString, x(7).toString))
rdd.persist() rdd.persist()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment