package com.gmei import java.util.Properties import java.io.Serializable import java.sql.{Connection, DriverManager} import java.text.SimpleDateFormat import java.util.Calendar import com.typesafe.config._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object GmeiConfig extends Serializable { var env: String = null var config: Config = null def setup(param: String): this.type = { this.env = param this.config = initConfig(this.env) this } def initConfig(env: String) = { lazy val c = ConfigFactory.load() c.getConfig(env).withFallback(c) } def getSparkSession():(SparkContext, SparkSession) = { val sparkConf = new SparkConf().set("spark.tispark.plan.allow_index_read", "false") .set("spark.hive.mapred.supports.subdirectories","true") .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true") sparkConf.set("spark.sql.crossJoin.enabled", "true") sparkConf.set("spark.debug.maxToStringFields", "130") sparkConf.set("spark.sql.broadcastTimeout", "6000") sparkConf.set("spark.tispark.plan.allow_index_read", "false") sparkConf.set("spark.hive.mapred.supports.subdirectories","true") sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true") val spark = SparkSession .builder() .config(sparkConf) .config("spark.tispark.pd.addresses","172.16.40.158:2379") .config("spark.sql.extensions","org.apache.spark.sql.TiExtensions") .appName("feededa") .enableHiveSupport() .getOrCreate() spark.sql("use online") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") val context = SparkContext.getOrCreate(sparkConf) (context, spark) } def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = { println(jdbcuri, table) val prop = new Properties() prop.put("driver", "com.mysql.jdbc.Driver") prop.put("useSSL", "false") prop.put("isolationLevel", "NONE") prop.put("truncate", "true") // save to mysql/tidb try { df.repartition(128).write.mode(saveModel) .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300) .jdbc(jdbcuri, table, prop) print("写入成功")} catch { case _ => println("没有写入成功") } } def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = { val jdbcuri = this.config.getString("tidb.jdbcuri") writeToJDBCTable(jdbcuri, df, table, saveMode) } def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={ var conn: Connection= null var ps:java.sql.PreparedStatement=null val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)" conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC") ps = conn.prepareStatement(sql) try{ iterator.foreach(x => { ps.setString(1,x._1) ps.setString(2,x._2) ps.setString(3,x._3) ps.setString(4,x._4) ps.executeUpdate() }) println("update device feat done") }catch { case _ => println("update failed") } } def getMinusNDate(n: Int):String={ var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") var cal:Calendar=Calendar.getInstance() cal.add(Calendar.DATE,-n) var yesterday=dateFormat.format(cal.getTime()) yesterday } }