package com.gmei


import java.util.Properties
import java.io.Serializable
import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util.Calendar

import com.typesafe.config._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}



object GmeiConfig extends Serializable {

  var env: String = null
  var config: Config = null

  def setup(param: String): this.type = {
    this.env = param
    this.config = initConfig(this.env)
    this
  }


  def initConfig(env: String) = {
    lazy val c = ConfigFactory.load()
    c.getConfig(env).withFallback(c)
  }

  def getSparkSession():(SparkContext, SparkSession) = {
    val sparkConf = new SparkConf
    sparkConf.set("spark.sql.crossJoin.enabled", "true")
    sparkConf.set("spark.debug.maxToStringFields", "130")
    sparkConf.set("spark.sql.broadcastTimeout",  "6000")

    if (!sparkConf.contains("""spark.master""")) {
      sparkConf.setMaster("local[3]")
    }

    if (!sparkConf.contains("spark.tispark.pd.addresses")) {
      sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
    }
    println(sparkConf.get("spark.tispark.pd.addresses"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("feededa")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("use online")
    spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
    spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
    spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
    spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
    val context = SparkContext.getOrCreate(sparkConf)
    (context, spark)
  }

  def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
    println(jdbcuri, table)
    val prop = new Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("useSSL", "false")
    prop.put("isolationLevel", "NONE")
    prop.put("truncate", "true")
    // save to mysql/tidb
    try {
      df.repartition(128).write.mode(saveModel)
        .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
        .jdbc(jdbcuri, table, prop)
      print("写入成功")}
    catch {
      case _ => println("没有写入成功")
    }

  }


  def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
    val jdbcuri = this.config.getString("tidb.jdbcuri")
    writeToJDBCTable(jdbcuri, df, table, saveMode)
  }



  def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={
    var conn: Connection= null
    var ps:java.sql.PreparedStatement=null
    val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)"
    conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC")
    ps = conn.prepareStatement(sql)
    try{
      iterator.foreach(x => {
        ps.setString(1,x._1)
        ps.setString(2,x._2)
        ps.setString(3,x._3)
        ps.setString(4,x._4)
        ps.executeUpdate()
      })
      println("update device feat done")
    }catch {
      case _ => println("update failed")
    }
  }


  def getMinusNDate(n: Int):String={
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal:Calendar=Calendar.getInstance()
    cal.add(Calendar.DATE,-n)
    var yesterday=dateFormat.format(cal.getTime())
    yesterday
  }

}