Data2FFM.scala 7.75 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1
package com.gmei
高雅喆's avatar
高雅喆 committed
2 3 4 5
import java.io.{File, PrintWriter, Serializable}

import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
王志伟's avatar
王志伟 committed
6
import org.apache.spark.sql.{DataFrame, SaveMode}
高雅喆's avatar
高雅喆 committed
7 8 9
import scopt.OptionParser


高雅喆's avatar
高雅喆 committed
10
object Data2FFM {
高雅喆's avatar
高雅喆 committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }


  def writecsv(path:String,str:String): Unit ={
    val writer = new PrintWriter(new File(path))
    writer.write(str)
    writer.close()
    println("写入成功")
  }


  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

王志伟's avatar
王志伟 committed
50 51 52 53 54
      sc.sql("use jerry_test")

//      val ti = new TiContext(sc)
//      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
//      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")
高雅喆's avatar
高雅喆 committed
55 56


57
      val train_sep_date = GmeiConfig.getMinusNDate(10)
高雅喆's avatar
高雅喆 committed
58 59
      val esmm_data = sc.sql(
        s"""
张彦钊's avatar
张彦钊 committed
60
           |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
61
           |where stat_date > '${train_sep_date}'
高雅喆's avatar
高雅喆 committed
62
         """.stripMargin
63 64
      ).repartition(200).na.drop()
      val column_list = esmm_data.columns.filter(x => x != "y" && x != "z")
高雅喆's avatar
高雅喆 committed
65 66 67 68
      val max_stat_date = sc.sql(
        s"""
           |select max(stat_date) from esmm_train_data
         """.stripMargin
69
      )
高雅喆's avatar
高雅喆 committed
70
      println("------------------------")
张彦钊's avatar
张彦钊 committed
71
      val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
72 73

      println(max_stat_date_str)
高雅喆's avatar
高雅喆 committed
74

高雅喆's avatar
高雅喆 committed
75
      println(column_list.slice(0,2).toList)
76
      esmm_data.persist()
高雅喆's avatar
高雅喆 committed
77 78
      val column_number = scala.collection.mutable.Map[String,Array[String]]()
      for (i <- column_list){
79
        column_number(i) = esmm_data.select(i).collect().map(x => x(0).toString).distinct
高雅喆's avatar
高雅喆 committed
80
      }
81
      esmm_data.unpersist()
高雅喆's avatar
高雅喆 committed
82
      println("dict")
83
      val rdd = esmm_data.rdd
高雅喆's avatar
高雅喆 committed
84
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
张彦钊's avatar
张彦钊 committed
85
          x(4).toString,x(5).toString,x(6).toString, x(7).toString))
高雅喆's avatar
高雅喆 committed
86
      rdd.persist()
张彦钊's avatar
张彦钊 committed
87

高雅喆's avatar
高雅喆 committed
88
      import sc.implicits._
89
      val train = rdd.filter(x => x._4 != max_stat_date_str)
张彦钊's avatar
张彦钊 committed
90
        .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
高雅喆's avatar
高雅喆 committed
91
          column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
张彦钊's avatar
张彦钊 committed
92
          column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
张彦钊's avatar
张彦钊 committed
93
          column_number("ccity_name").indexOf(x._8),x._5,x._6))
张彦钊's avatar
张彦钊 committed
94 95 96 97
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
张彦钊's avatar
张彦钊 committed
98 99
      println("train")
      train.show(6)
张彦钊's avatar
张彦钊 committed
100 101 102 103 104 105 106 107 108 109 110 111 112 113
      val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)

      val test = rdd.filter(x => x._4 == max_stat_date_str)
      .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("test")
      test.show(6)
114
      rdd.unpersist()
张彦钊's avatar
张彦钊 committed
115 116 117 118 119
      GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)


      val esmm_pre_data = sc.sql(
        s"""
120
           |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label
张彦钊's avatar
张彦钊 committed
121 122
           |from esmm_pre_data
        """.stripMargin
123
      ).repartition(200).na.drop()
张彦钊's avatar
张彦钊 committed
124
      esmm_pre_data.persist()
张彦钊's avatar
张彦钊 committed
125 126 127 128
      val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
        s => s(0).toString
      )
      val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
张彦钊's avatar
张彦钊 committed
129 130 131
        s => s(0).toString)
      val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
        s => s(0).toString)
132

张彦钊's avatar
张彦钊 committed
133 134
      val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
      val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
张彦钊's avatar
张彦钊 committed
135
      val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
张彦钊's avatar
张彦钊 committed
136 137 138 139

      val rdd_pre = esmm_pre_data.rdd.repartition(200)
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
          x(4).toString,x(5).toString,x(6).toString,
140
          x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
张彦钊's avatar
张彦钊 committed
141
        .filter(x => esmm_join_city.indexOf(x._5) != -1).filter(x => esmm_join_device.indexOf(x._1) != -1)
142

143
      val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
张彦钊's avatar
张彦钊 committed
144 145 146 147 148 149 150 151
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("pre")
152 153 154 155 156 157 158 159 160 161 162 163 164 165
      native_pre.show(6)
      GmeiConfig.writeToJDBCTable(jdbcuri, native_pre, "esmm_data2ffm_infer_native", SaveMode.Overwrite)

      val nearby_pre = rdd_pre.filter(x => x._9 == "1").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("pre")
      nearby_pre.show(6)
      GmeiConfig.writeToJDBCTable(jdbcuri, nearby_pre, "esmm_data2ffm_infer_nearby", SaveMode.Overwrite)
高雅喆's avatar
高雅喆 committed
166

167 168 169 170 171 172

      sc.stop()

    }

  }
高雅喆's avatar
高雅喆 committed
173
}