Commit 6bfa857c authored by 张彦钊's avatar 张彦钊

device filter

parent 7b4ecbe6
...@@ -119,22 +119,24 @@ object Data2FFM { ...@@ -119,22 +119,24 @@ object Data2FFM {
|from esmm_pre_data |from esmm_pre_data
""".stripMargin """.stripMargin
).repartition(200).na.drop() ).repartition(200).na.drop()
esmm_pre_data.persist()
val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map( val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
s => s(0).toString s => s(0).toString
) )
val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map( val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
s => s(0).toString s => s(0).toString)
) val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
s => s(0).toString)
val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id")) val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id")) val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
val rdd_pre = esmm_pre_data.rdd.repartition(200) val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1) x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
.filter(x => esmm_join_city.indexOf(x._5) != -1) .filter(x => esmm_join_city.indexOf(x._5) != -1).filter(x => esmm_join_device.indexOf(x._1) != -1)
val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment