package com.gmei import java.io.{File, PrintWriter, Serializable} import com.gmei.lib.AbstractParams import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, SaveMode, TiContext} import scopt.OptionParser object Data2FFM { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def writecsv(path:String,str:String): Unit ={ val writer = new PrintWriter(new File(path)) writer.write(str) writer.close() println("写入成功") } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data") ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data") val train_sep_date = GmeiConfig.getMinusNDate(10) val esmm_data = sc.sql( s""" |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data |where stat_date > '${train_sep_date}' """.stripMargin ).repartition(200).na.drop() val column_list = esmm_data.columns.filter(x => x != "y" && x != "z") val max_stat_date = sc.sql( s""" |select max(stat_date) from esmm_train_data """.stripMargin ) println("------------------------") val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head println(max_stat_date_str) println(column_list.slice(0,2).toList) esmm_data.persist() val column_number = scala.collection.mutable.Map[String,Array[String]]() for (i <- column_list){ column_number(i) = esmm_data.select(i).collect().map(x => x(0).toString).distinct } esmm_data.unpersist() println("dict") val rdd = esmm_data.rdd .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, x(4).toString,x(5).toString,x(6).toString, x(7).toString)) rdd.persist() import sc.implicits._ val train = rdd.filter(x => x._4 != max_stat_date_str) .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7), column_number("ccity_name").indexOf(x._8),x._5,x._6)) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid") println("train") train.show(6) val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite) val test = rdd.filter(x => x._4 == max_stat_date_str) .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7), column_number("ccity_name").indexOf(x._8),x._5,x._6)) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid") println("test") test.show(6) rdd.unpersist() GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite) val esmm_pre_data = sc.sql( s""" |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label |from esmm_pre_data """.stripMargin ).repartition(200).na.drop() esmm_pre_data.persist() val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map( s => s(0).toString ) val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map( s => s(0).toString) val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map( s => s(0).toString) val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id")) val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id")) val esmm_join_device = esmm_pre_device.intersect(column_number("device_id")) val rdd_pre = esmm_pre_data.rdd.repartition(200) .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, x(4).toString,x(5).toString,x(6).toString, x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1) .filter(x => esmm_join_city.indexOf(x._5) != -1).filter(x => esmm_join_device.indexOf(x._1) != -1) val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7), column_number("ccity_name").indexOf(x._8),x._5,x._6)) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid") println("pre") native_pre.show(6) GmeiConfig.writeToJDBCTable(jdbcuri, native_pre, "esmm_data2ffm_infer_native", SaveMode.Overwrite) val nearby_pre = rdd_pre.filter(x => x._9 == "1").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7), column_number("ccity_name").indexOf(x._8),x._5,x._6)) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid") println("pre") nearby_pre.show(6) GmeiConfig.writeToJDBCTable(jdbcuri, nearby_pre, "esmm_data2ffm_infer_nearby", SaveMode.Overwrite) sc.stop() } } }