Data2FFM.scala 7.73 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1
package com.gmei
高雅喆's avatar
高雅喆 committed
2 3 4 5 6 7 8 9
import java.io.{File, PrintWriter, Serializable}

import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import scopt.OptionParser


高雅喆's avatar
高雅喆 committed
10
object Data2FFM {
高雅喆's avatar
高雅喆 committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }


  def writecsv(path:String,str:String): Unit ={
    val writer = new PrintWriter(new File(path))
    writer.write(str)
    writer.close()
    println("写入成功")
  }


  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")


55
      val train_sep_date = GmeiConfig.getMinusNDate(10)
高雅喆's avatar
高雅喆 committed
56 57
      val esmm_data = sc.sql(
        s"""
张彦钊's avatar
张彦钊 committed
58
           |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
59
           |where stat_date > '${train_sep_date}'
高雅喆's avatar
高雅喆 committed
60
         """.stripMargin
61 62
      ).repartition(200).na.drop()
      val column_list = esmm_data.columns.filter(x => x != "y" && x != "z")
高雅喆's avatar
高雅喆 committed
63 64 65 66
      val max_stat_date = sc.sql(
        s"""
           |select max(stat_date) from esmm_train_data
         """.stripMargin
67
      )
高雅喆's avatar
高雅喆 committed
68
      println("------------------------")
张彦钊's avatar
张彦钊 committed
69
      val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
70 71

      println(max_stat_date_str)
高雅喆's avatar
高雅喆 committed
72

高雅喆's avatar
高雅喆 committed
73
      println(column_list.slice(0,2).toList)
74
      esmm_data.persist()
高雅喆's avatar
高雅喆 committed
75 76
      val column_number = scala.collection.mutable.Map[String,Array[String]]()
      for (i <- column_list){
77
        column_number(i) = esmm_data.select(i).collect().map(x => x(0).toString).distinct
高雅喆's avatar
高雅喆 committed
78
      }
79
      esmm_data.unpersist()
高雅喆's avatar
高雅喆 committed
80
      println("dict")
81
      val rdd = esmm_data.rdd
高雅喆's avatar
高雅喆 committed
82
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
张彦钊's avatar
张彦钊 committed
83
          x(4).toString,x(5).toString,x(6).toString, x(7).toString))
高雅喆's avatar
高雅喆 committed
84
      rdd.persist()
张彦钊's avatar
张彦钊 committed
85

高雅喆's avatar
高雅喆 committed
86
      import sc.implicits._
87
      val train = rdd.filter(x => x._4 != max_stat_date_str)
张彦钊's avatar
张彦钊 committed
88
        .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
高雅喆's avatar
高雅喆 committed
89
          column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
张彦钊's avatar
张彦钊 committed
90
          column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
张彦钊's avatar
张彦钊 committed
91
          column_number("ccity_name").indexOf(x._8),x._5,x._6))
张彦钊's avatar
张彦钊 committed
92 93 94 95
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
张彦钊's avatar
张彦钊 committed
96 97
      println("train")
      train.show(6)
张彦钊's avatar
张彦钊 committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111
      val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)

      val test = rdd.filter(x => x._4 == max_stat_date_str)
      .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("test")
      test.show(6)
112
      rdd.unpersist()
张彦钊's avatar
张彦钊 committed
113 114 115 116 117
      GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)


      val esmm_pre_data = sc.sql(
        s"""
118
           |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label
张彦钊's avatar
张彦钊 committed
119 120
           |from esmm_pre_data
        """.stripMargin
121
      ).repartition(200).na.drop()
张彦钊's avatar
张彦钊 committed
122
      esmm_pre_data.persist()
张彦钊's avatar
张彦钊 committed
123 124 125 126
      val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
        s => s(0).toString
      )
      val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
张彦钊's avatar
张彦钊 committed
127 128 129
        s => s(0).toString)
      val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
        s => s(0).toString)
130

张彦钊's avatar
张彦钊 committed
131 132
      val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
      val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
张彦钊's avatar
张彦钊 committed
133
      val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
张彦钊's avatar
张彦钊 committed
134 135 136 137

      val rdd_pre = esmm_pre_data.rdd.repartition(200)
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
          x(4).toString,x(5).toString,x(6).toString,
138
          x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
张彦钊's avatar
张彦钊 committed
139
        .filter(x => esmm_join_city.indexOf(x._5) != -1).filter(x => esmm_join_device.indexOf(x._1) != -1)
140

141
      val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
张彦钊's avatar
张彦钊 committed
142 143 144 145 146 147 148 149
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("pre")
150 151 152 153 154 155 156 157 158 159 160 161 162 163
      native_pre.show(6)
      GmeiConfig.writeToJDBCTable(jdbcuri, native_pre, "esmm_data2ffm_infer_native", SaveMode.Overwrite)

      val nearby_pre = rdd_pre.filter(x => x._9 == "1").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("pre")
      nearby_pre.show(6)
      GmeiConfig.writeToJDBCTable(jdbcuri, nearby_pre, "esmm_data2ffm_infer_nearby", SaveMode.Overwrite)
高雅喆's avatar
高雅喆 committed
164

165 166 167 168 169 170

      sc.stop()

    }

  }
高雅喆's avatar
高雅喆 committed
171
}