GmeiConfig.scala 3.12 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1 2 3 4 5
package com.gmei


import java.util.Properties
import java.io.Serializable
高雅喆's avatar
高雅喆 committed
6
import java.sql.{Connection, DriverManager}
高雅喆's avatar
高雅喆 committed
7
import java.text.SimpleDateFormat
高雅喆's avatar
高雅喆 committed
8
import java.util.Calendar
高雅喆's avatar
高雅喆 committed
9 10

import com.typesafe.config._
高雅喆's avatar
高雅喆 committed
11
import org.apache.spark.{SparkConf, SparkContext}
高雅喆's avatar
高雅喆 committed
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}



object GmeiConfig extends Serializable {

  var env: String = null
  var config: Config = null

  def setup(param: String): this.type = {
    this.env = param
    this.config = initConfig(this.env)
    this
  }


  def initConfig(env: String) = {
    lazy val c = ConfigFactory.load()
    c.getConfig(env).withFallback(c)
  }

  def getSparkSession():(SparkContext, SparkSession) = {
    val sparkConf = new SparkConf
    sparkConf.set("spark.sql.crossJoin.enabled", "true")
高雅喆's avatar
高雅喆 committed
37
    sparkConf.set("spark.debug.maxToStringFields", "130")
王志伟's avatar
王志伟 committed
38
    sparkConf.set("spark.sql.broadcastTimeout",  "6000")
高雅喆's avatar
高雅喆 committed
39

王志伟's avatar
王志伟 committed
40
    if (!sparkConf.contains("""spark.master""")) {
高雅喆's avatar
高雅喆 committed
41 42 43 44 45 46 47 48 49 50 51
      sparkConf.setMaster("local[3]")
    }

    if (!sparkConf.contains("spark.tispark.pd.addresses")) {
      sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
    }
    println(sparkConf.get("spark.tispark.pd.addresses"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
高雅喆's avatar
高雅喆 committed
52
      .appName("feededa")
高雅喆's avatar
高雅喆 committed
53
      .enableHiveSupport()
高雅喆's avatar
高雅喆 committed
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
      .getOrCreate()

    val context = SparkContext.getOrCreate(sparkConf)
    (context, spark)
  }

  def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
    println(jdbcuri, table)
    val prop = new Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("useSSL", "false")
    prop.put("isolationLevel", "NONE")
    prop.put("truncate", "true")
    // save to mysql/tidb
    df.repartition(128).write.mode(saveModel)
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
      .jdbc(jdbcuri, table, prop)
王志伟's avatar
王志伟 committed
71
    print("写入成功")
高雅喆's avatar
高雅喆 committed
72 73 74 75 76 77 78 79 80
  }


  def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
    val jdbcuri = this.config.getString("tidb.jdbcuri")
    writeToJDBCTable(jdbcuri, df, table, saveMode)
  }


高雅喆's avatar
高雅喆 committed
81 82 83 84

  def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={
    var conn: Connection= null
    var ps:java.sql.PreparedStatement=null
高雅喆's avatar
高雅喆 committed
85
    val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)"
高雅喆's avatar
高雅喆 committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
    conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC")
    ps = conn.prepareStatement(sql)
    try{
      iterator.foreach(x => {
        ps.setString(1,x._1)
        ps.setString(2,x._2)
        ps.setString(3,x._3)
        ps.setString(4,x._4)
        ps.executeUpdate()
      })
      println("update device feat done")
    }catch {
      case _ => println("update failed")
    }
  }


高雅喆's avatar
高雅喆 committed
103 104 105 106 107 108 109 110 111
  def getMinusNDate(n: Int):String={
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal:Calendar=Calendar.getInstance()
    cal.add(Calendar.DATE,-n)
    var yesterday=dateFormat.format(cal.getTime())
    yesterday
  }

}