Commit 5def84e2 authored by 高雅喆's avatar 高雅喆

bug fix

parent 6fb79002
......@@ -69,6 +69,11 @@ object Data2FFM {
""".stripMargin
).na.drop()
val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
s => s(0).toString
)
val max_stat_date = sc.sql(
s"""
......@@ -90,6 +95,11 @@ object Data2FFM {
for (i <- column_list){
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
}
val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
println("dict")
val rdd = esmm_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
......@@ -123,10 +133,12 @@ object Data2FFM {
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString))
x(7).toString,x(8).toString,x(9).toString,x(10).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
rdd_pre.persist()
val pre = rdd_pre.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment