GmeiConfig.scala 2.5 KB
package com.gmei


import java.util.Properties
import java.io.Serializable
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.typesafe.config._
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}

import com.gmei.ENV.ENV


object GmeiConfig extends Serializable {

  var param1: Main.Params = null
  var env: String = null
  var config: Config = null

  def setup(param: Main.Params): this.type = {
    this.param1 = param
    this.env = this.param1.env match {
      case "prod" => ENV.PROD
      case "dev" => ENV.DEV
      case "pre" => ENV.PRE
      case _ => ENV.DEV
    }
    this.config = initConfig(this.env)
    this
  }


  def initConfig(env: ENV) = {
    lazy val c = ConfigFactory.load()
    c.getConfig(env).withFallback(c)
  }

  def getSparkSession():(SparkContext, SparkSession) = {
    val sparkConf = new SparkConf
    sparkConf.set("spark.sql.crossJoin.enabled", "true")
    sparkConf.set("spark.debug.maxToStringFields", "100")

    if (!sparkConf.contains("spark.master")) {
      sparkConf.setMaster("local[3]")
    }

    if (!sparkConf.contains("spark.tispark.pd.addresses")) {
      sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
    }
    println(sparkConf.get("spark.tispark.pd.addresses"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("node2vec")
      .getOrCreate()

    val context = SparkContext.getOrCreate(sparkConf)
    (context, spark)
  }

  def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
    println(jdbcuri, table)
    val prop = new Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("useSSL", "false")
    prop.put("isolationLevel", "NONE")
    prop.put("truncate", "true")
    // save to mysql/tidb
    df.repartition(128).write.mode(saveModel)
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
      .jdbc(jdbcuri, table, prop)
  }


  def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
    val jdbcuri = this.config.getString("tidb.jdbcuri")
    writeToJDBCTable(jdbcuri, df, table, saveMode)
  }


  def getMinusNDate(n: Int):String={
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal:Calendar=Calendar.getInstance()
    cal.add(Calendar.DATE,-n)
    var yesterday=dateFormat.format(cal.getTime())
    yesterday
  }

}