GmeiConfig.scala 2.5 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1 2 3 4 5
package com.gmei


import java.util.Properties
import java.io.Serializable
高雅喆's avatar
高雅喆 committed
6 7
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
高雅喆's avatar
高雅喆 committed
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43

import com.typesafe.config._
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}

import com.gmei.ENV.ENV


object GmeiConfig extends Serializable {

  var param1: Main.Params = null
  var env: String = null
  var config: Config = null

  def setup(param: Main.Params): this.type = {
    this.param1 = param
    this.env = this.param1.env match {
      case "prod" => ENV.PROD
      case "dev" => ENV.DEV
      case "pre" => ENV.PRE
      case _ => ENV.DEV
    }
    this.config = initConfig(this.env)
    this
  }


  def initConfig(env: ENV) = {
    lazy val c = ConfigFactory.load()
    c.getConfig(env).withFallback(c)
  }

  def getSparkSession():(SparkContext, SparkSession) = {
    val sparkConf = new SparkConf
    sparkConf.set("spark.sql.crossJoin.enabled", "true")
高雅喆's avatar
高雅喆 committed
44
    sparkConf.set("spark.debug.maxToStringFields", "100")
高雅喆's avatar
高雅喆 committed
45 46 47 48 49 50

    if (!sparkConf.contains("spark.master")) {
      sparkConf.setMaster("local[3]")
    }

    if (!sparkConf.contains("spark.tispark.pd.addresses")) {
51
      sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
高雅喆's avatar
高雅喆 committed
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    }
    println(sparkConf.get("spark.tispark.pd.addresses"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("node2vec")
      .getOrCreate()

    val context = SparkContext.getOrCreate(sparkConf)
    (context, spark)
  }

  def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
    println(jdbcuri, table)
    val prop = new Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("useSSL", "false")
    prop.put("isolationLevel", "NONE")
71
    prop.put("truncate", "true")
高雅喆's avatar
高雅喆 committed
72 73 74 75 76 77 78 79
    // save to mysql/tidb
    df.repartition(128).write.mode(saveModel)
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
      .jdbc(jdbcuri, table, prop)
  }


  def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
80
    val jdbcuri = this.config.getString("tidb.jdbcuri")
高雅喆's avatar
高雅喆 committed
81 82
    writeToJDBCTable(jdbcuri, df, table, saveMode)
  }
高雅喆's avatar
高雅喆 committed
83 84 85 86 87 88 89 90 91 92


  def getMinusNDate(n: Int):String={
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal:Calendar=Calendar.getInstance()
    cal.add(Calendar.DATE,-n)
    var yesterday=dateFormat.format(cal.getTime())
    yesterday
  }

高雅喆's avatar
高雅喆 committed
93
}