1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.gmei
import java.util.Properties
import java.io.Serializable
import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util.Calendar
import com.typesafe.config._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object GmeiConfig extends Serializable {
var env: String = null
var config: Config = null
def setup(param: String): this.type = {
this.env = param
this.config = initConfig(this.env)
this
}
def initConfig(env: String) = {
lazy val c = ConfigFactory.load()
c.getConfig(env).withFallback(c)
}
def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf
sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000")
if (!sparkConf.contains("""spark.master""")) {
sparkConf.setMaster("local[3]")
}
if (!sparkConf.contains("spark.tispark.pd.addresses")) {
sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
}
println(sparkConf.get("spark.tispark.pd.addresses"))
val spark = SparkSession
.builder()
.config(sparkConf)
.appName("feededa")
.enableHiveSupport()
.getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
val context = SparkContext.getOrCreate(sparkConf)
(context, spark)
}
def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
println(jdbcuri, table)
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("useSSL", "false")
prop.put("isolationLevel", "NONE")
prop.put("truncate", "true")
// save to mysql/tidb
try {
df.repartition(128).write.mode(saveModel)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
.jdbc(jdbcuri, table, prop)
print("写入成功")}
catch {
case _ => println("没有写入成功")
}
}
def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
val jdbcuri = this.config.getString("tidb.jdbcuri")
writeToJDBCTable(jdbcuri, df, table, saveMode)
}
def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={
var conn: Connection= null
var ps:java.sql.PreparedStatement=null
val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)"
conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC")
ps = conn.prepareStatement(sql)
try{
iterator.foreach(x => {
ps.setString(1,x._1)
ps.setString(2,x._2)
ps.setString(3,x._3)
ps.setString(4,x._4)
ps.executeUpdate()
})
println("update device feat done")
}catch {
case _ => println("update failed")
}
}
def getMinusNDate(n: Int):String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-n)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}
}