GmeiConfig.scala 2.28 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
package com.gmei


import java.util.Properties
import java.io.Serializable
import java.text.SimpleDateFormat
import java.util.{Calendar}

import com.typesafe.config._
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}



object GmeiConfig extends Serializable {

  var env: String = null
  var config: Config = null

  def setup(param: String): this.type = {
    this.env = param
    this.config = initConfig(this.env)
    this
  }


  def initConfig(env: String) = {
    lazy val c = ConfigFactory.load()
    c.getConfig(env).withFallback(c)
  }

  def getSparkSession():(SparkContext, SparkSession) = {
    val sparkConf = new SparkConf
    sparkConf.set("spark.sql.crossJoin.enabled", "true")
    sparkConf.set("spark.debug.maxToStringFields", "100")

    if (!sparkConf.contains("spark.master")) {
      sparkConf.setMaster("local[3]")
    }

    if (!sparkConf.contains("spark.tispark.pd.addresses")) {
      sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
    }
    println(sparkConf.get("spark.tispark.pd.addresses"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
高雅喆's avatar
高雅喆 committed
50
      .appName("feededa")
高雅喆's avatar
高雅喆 committed
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
      .getOrCreate()

    val context = SparkContext.getOrCreate(sparkConf)
    (context, spark)
  }

  def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
    println(jdbcuri, table)
    val prop = new Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("useSSL", "false")
    prop.put("isolationLevel", "NONE")
    prop.put("truncate", "true")
    // save to mysql/tidb
    df.repartition(128).write.mode(saveModel)
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
      .jdbc(jdbcuri, table, prop)
  }


  def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
    val jdbcuri = this.config.getString("tidb.jdbcuri")
    writeToJDBCTable(jdbcuri, df, table, saveMode)
  }


  def getMinusNDate(n: Int):String={
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal:Calendar=Calendar.getInstance()
    cal.add(Calendar.DATE,-n)
    var yesterday=dateFormat.format(cal.getTime())
    yesterday
  }

}