GmeiConfig.scala 3.2 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1 2 3 4 5
package com.gmei


import java.util.Properties
import java.io.Serializable
高雅喆's avatar
高雅喆 committed
6
import java.sql.{Connection, DriverManager}
高雅喆's avatar
高雅喆 committed
7
import java.text.SimpleDateFormat
高雅喆's avatar
高雅喆 committed
8
import java.util.Calendar
高雅喆's avatar
高雅喆 committed
9 10

import com.typesafe.config._
高雅喆's avatar
高雅喆 committed
11
import org.apache.spark.{SparkConf, SparkContext}
高雅喆's avatar
高雅喆 committed
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}



object GmeiConfig extends Serializable {

  var env: String = null
  var config: Config = null

  def setup(param: String): this.type = {
    this.env = param
    this.config = initConfig(this.env)
    this
  }


  def initConfig(env: String) = {
    lazy val c = ConfigFactory.load()
    c.getConfig(env).withFallback(c)
  }

  def getSparkSession():(SparkContext, SparkSession) = {
    val sparkConf = new SparkConf
    sparkConf.set("spark.sql.crossJoin.enabled", "true")
高雅喆's avatar
高雅喆 committed
37
    sparkConf.set("spark.debug.maxToStringFields", "130")
王志伟's avatar
王志伟 committed
38
    sparkConf.set("spark.sql.broadcastTimeout",  "6000")
高雅喆's avatar
高雅喆 committed
39

王志伟's avatar
王志伟 committed
40
    if (!sparkConf.contains("""spark.master""")) {
高雅喆's avatar
高雅喆 committed
41 42 43 44 45 46 47 48 49 50 51
      sparkConf.setMaster("local[3]")
    }

    if (!sparkConf.contains("spark.tispark.pd.addresses")) {
      sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
    }
    println(sparkConf.get("spark.tispark.pd.addresses"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
高雅喆's avatar
高雅喆 committed
52
      .appName("feededa")
高雅喆's avatar
高雅喆 committed
53
      .enableHiveSupport()
高雅喆's avatar
高雅喆 committed
54 55 56 57 58 59 60 61 62 63 64 65 66 67
      .getOrCreate()

    val context = SparkContext.getOrCreate(sparkConf)
    (context, spark)
  }

  def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
    println(jdbcuri, table)
    val prop = new Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("useSSL", "false")
    prop.put("isolationLevel", "NONE")
    prop.put("truncate", "true")
    // save to mysql/tidb
张彦钊's avatar
张彦钊 committed
68 69 70 71 72 73 74 75 76
    try {
      df.repartition(128).write.mode(saveModel)
        .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
        .jdbc(jdbcuri, table, prop)
      print("写入成功")}
    catch {
      case _ => println("没有写入成功")
    }

高雅喆's avatar
高雅喆 committed
77 78 79 80 81 82 83 84 85
  }


  def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
    val jdbcuri = this.config.getString("tidb.jdbcuri")
    writeToJDBCTable(jdbcuri, df, table, saveMode)
  }


高雅喆's avatar
高雅喆 committed
86 87 88 89

  def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={
    var conn: Connection= null
    var ps:java.sql.PreparedStatement=null
高雅喆's avatar
高雅喆 committed
90
    val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)"
高雅喆's avatar
高雅喆 committed
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC")
    ps = conn.prepareStatement(sql)
    try{
      iterator.foreach(x => {
        ps.setString(1,x._1)
        ps.setString(2,x._2)
        ps.setString(3,x._3)
        ps.setString(4,x._4)
        ps.executeUpdate()
      })
      println("update device feat done")
    }catch {
      case _ => println("update failed")
    }
  }


高雅喆's avatar
高雅喆 committed
108 109 110 111 112 113 114 115 116
  def getMinusNDate(n: Int):String={
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal:Calendar=Calendar.getInstance()
    cal.add(Calendar.DATE,-n)
    var yesterday=dateFormat.format(cal.getTime())
    yesterday
  }

}
张彦钊's avatar
张彦钊 committed
117