package com.gmei import java.util.Properties import java.io.Serializable import java.text.SimpleDateFormat import java.util.{Calendar, Date} import com.typesafe.config._ import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext} import com.gmei.ENV.ENV object GmeiConfig extends Serializable { var param1: Main.Params = null var env: String = null var config: Config = null def setup(param: Main.Params): this.type = { this.param1 = param this.env = this.param1.env match { case "prod" => ENV.PROD case "dev" => ENV.DEV case "pre" => ENV.PRE case _ => ENV.DEV } this.config = initConfig(this.env) this } def initConfig(env: ENV) = { lazy val c = ConfigFactory.load() c.getConfig(env).withFallback(c) } def getSparkSession():(SparkContext, SparkSession) = { val sparkConf = new SparkConf sparkConf.set("spark.sql.crossJoin.enabled", "true") sparkConf.set("spark.debug.maxToStringFields", "100") if (!sparkConf.contains("spark.master")) { sparkConf.setMaster("local[3]") } if (!sparkConf.contains("spark.tispark.pd.addresses")) { sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses")) } println(sparkConf.get("spark.tispark.pd.addresses")) val spark = SparkSession .builder() .config(sparkConf) .appName("node2vec") .getOrCreate() val context = SparkContext.getOrCreate(sparkConf) (context, spark) } def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = { println(jdbcuri, table) val prop = new Properties() prop.put("driver", "com.mysql.jdbc.Driver") prop.put("useSSL", "false") prop.put("isolationLevel", "NONE") prop.put("truncate", "true") // save to mysql/tidb df.repartition(128).write.mode(saveModel) .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300) .jdbc(jdbcuri, table, prop) } def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = { val jdbcuri = this.config.getString("tidb.jdbcuri") writeToJDBCTable(jdbcuri, df, table, saveMode) } def getMinusNDate(n: Int):String={ var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal:Calendar=Calendar.getInstance() cal.add(Calendar.DATE,-n) var yesterday=dateFormat.format(cal.getTime()) yesterday } }