Commit 9fdaa836 authored by 张彦钊's avatar 张彦钊

修改rdd

parent ab86987d
......@@ -59,28 +59,9 @@ object Data2FFM {
|from esmm_train_data limit 6
""".stripMargin
).na.drop()
esmm_data.show(6)
val column_list = esmm_data.columns
// val esmm_pre_data = sc.sql(
// s"""
// |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
// |from esmm_pre_data
// """.stripMargin
// ).na.drop()
//
// val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
// s => s(0).toString
// )
//
//
//
val max_stat_date = sc.sql(
s"""
|select max(stat_date) from esmm_train_data
......@@ -93,8 +74,6 @@ object Data2FFM {
println(max_stat_date_str)
println(column_list.slice(0,2).toList)
val column_number = scala.collection.mutable.Map[String,Array[String]]()
......@@ -102,10 +81,6 @@ object Data2FFM {
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
}
// val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
// val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
// val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
val a = column_number("device_id").toList
println(a)
......@@ -148,7 +123,25 @@ object Data2FFM {
// GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
//
//
// val esmm_pre_data = sc.sql(
// s"""
// |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
// |from esmm_pre_data
// """.stripMargin
// ).na.drop()
//
// val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
// s => s(0).toString
// )
// val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
// val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
// val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
//
// val rdd_pre = esmm_pre_data.rdd.repartition(200)
// .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment