package com.gmei
import java.io.{File, PrintWriter, Serializable}

import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import scopt.OptionParser


object Data2FFM {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }


  def writecsv(path:String,str:String): Unit ={
    val writer = new PrintWriter(new File(path))
    writer.write(str)
    writer.close()
    println("写入成功")
  }


  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")


//      val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
      val esmm_data = sc.sql(
        s"""
           |select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
           |from esmm_train_data
         """.stripMargin
      ).na.drop()
      val column_list = esmm_data.columns


      val esmm_pre_data = sc.sql(
        s"""
           |select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
           |from esmm_pre_data
        """.stripMargin
      ).na.drop()


      val max_stat_date = sc.sql(
        s"""
           |select max(stat_date) from esmm_train_data
         """.stripMargin
      )
      println("------------------------")
      val max_stat_date_str = max_stat_date.collect().map(s =>
        s(0).toString
      ).head

      println(max_stat_date_str)



      println(column_list.slice(0,2).toList)

      val column_number = scala.collection.mutable.Map[String,Array[String]]()
      for (i <- column_list){
        column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
      }
      println("dict")
      val rdd = esmm_data.rdd.repartition(200)
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
          x(4).toString,x(5).toString,x(6).toString,
          x(7).toString,x(8).toString,x(9).toString,x(10).toString))
      rdd.persist()
      import sc.implicits._
      val train = rdd.filter(x => x._4 != max_stat_date_str)
        .map(x => (x._1,x._2,x._3,
          column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
          column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
          column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
          column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
      val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
      GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)

      val test = rdd.filter(x => x._4 == max_stat_date_str)
        .map(x => (x._1,x._2,x._3,
          column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
          column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
          column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
          column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
      GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)


      val rdd_pre = esmm_pre_data.rdd.repartition(200)
        .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
          x(4).toString,x(5).toString,x(6).toString,
          x(7).toString,x(8).toString,x(9).toString,x(10).toString))
      rdd_pre.persist()
      val pre = rdd_pre.map(x => (x._1,x._2,x._3,
        column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
        column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
        column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
        column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
        .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
          format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
        .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
        .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
      GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)


      sc.stop()

    }

  }
}