package com.gmei import java.io.{File, PrintWriter, Serializable} import com.gmei.lib.AbstractParams import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, SaveMode, TiContext} import scopt.OptionParser object Data2FFM { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def writecsv(path:String,str:String): Unit ={ val writer = new PrintWriter(new File(path)) writer.write(str) writer.close() println("写入成功") } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data") ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data") // val yesteday_have_seq = GmeiConfig.getMinusNDate(5) val esmm_data = sc.sql( s""" |select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id |from esmm_train_data """.stripMargin ).na.drop() val column_list = esmm_data.columns val esmm_pre_data = sc.sql( s""" |select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id |from esmm_pre_data """.stripMargin ).na.drop() val max_stat_date = sc.sql( s""" |select max(stat_date) from esmm_train_data """.stripMargin ) println("------------------------") val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString ).head println(max_stat_date_str) println(column_list.slice(0,2).toList) val column_number = scala.collection.mutable.Map[String,Array[String]]() for (i <- column_list){ column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString) } println("dict") val rdd = esmm_data.rdd.repartition(200) .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, x(4).toString,x(5).toString,x(6).toString, x(7).toString,x(8).toString,x(9).toString,x(10).toString)) rdd.persist() import sc.implicits._ val train = rdd.filter(x => x._4 != max_stat_date_str) .map(x => (x._1,x._2,x._3, column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data") val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite) val test = rdd.filter(x => x._4 == max_stat_date_str) .map(x => (x._1,x._2,x._3, column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data") GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite) val rdd_pre = esmm_pre_data.rdd.repartition(200) .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, x(4).toString,x(5).toString,x(6).toString, x(7).toString,x(8).toString,x(9).toString,x(10).toString)) rdd_pre.persist() val pre = rdd_pre.map(x => (x._1,x._2,x._3, column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data") GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite) sc.stop() } } }