package com.gmei
import java.io.{File, PrintWriter, Serializable}

import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import scopt.OptionParser


object Data2FFM {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }


  def writecsv(path:String,str:String): Unit ={
    val writer = new PrintWriter(new File(path))
    writer.write(str)
    writer.close()
    println("写入成功")
  }


  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")


      val train_sep_date = GmeiConfig.getMinusNDate(10)
      val esmm_data = sc.sql(
        s"""
           |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
           |where stat_date > '${train_sep_date}'
         """.stripMargin
      ).repartition(200).na.drop()
      val column_list = esmm_data.columns.filter(x => x != "y" && x != "z")
      val max_stat_date = sc.sql(
        s"""
           |select max(stat_date) from esmm_train_data
         """.stripMargin
      )
      println("------------------------")
      val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head

      println(max_stat_date_str)

      println(column_list.slice(0,2).toList)
      esmm_data.persist()
      val column_number = scala.collection.mutable.Map[String,Array[String]]()
      for (i <- column_list){
        column_number(i) = esmm_data.select(i).collect().map(x => x(0).toString).distinct
      }
      esmm_data.unpersist()
      println("dict")
      val rdd = esmm_data.rdd
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
          x(4).toString,x(5).toString,x(6).toString, x(7).toString))
      rdd.persist()

      import sc.implicits._
      val train = rdd.filter(x => x._4 != max_stat_date_str)
        .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
          column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
          column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
          column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("train")
      train.show(6)
      val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)

      val test = rdd.filter(x => x._4 == max_stat_date_str)
      .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("test")
      test.show(6)
      rdd.unpersist()
      GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)


      val esmm_pre_data = sc.sql(
        s"""
           |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label
           |from esmm_pre_data
        """.stripMargin
      ).repartition(200).na.drop()
      esmm_pre_data.persist()
      val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
        s => s(0).toString
      )
      val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
        s => s(0).toString)
      val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
        s => s(0).toString)

      val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
      val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
      val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))

      val rdd_pre = esmm_pre_data.rdd.repartition(200)
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
          x(4).toString,x(5).toString,x(6).toString,
          x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
        .filter(x => esmm_join_city.indexOf(x._5) != -1).filter(x => esmm_join_device.indexOf(x._1) != -1)

      val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("pre")
      native_pre.show(6)
      GmeiConfig.writeToJDBCTable(jdbcuri, native_pre, "esmm_data2ffm_infer_native", SaveMode.Overwrite)

      val nearby_pre = rdd_pre.filter(x => x._9 == "1").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
        column_number("ccity_name").indexOf(x._8),x._5,x._6))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
      println("pre")
      nearby_pre.show(6)
      GmeiConfig.writeToJDBCTable(jdbcuri, nearby_pre, "esmm_data2ffm_infer_nearby", SaveMode.Overwrite)


      sc.stop()

    }

  }
}