1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.gmei
import java.util.Properties
import java.io.Serializable
import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util.Calendar
import com.typesafe.config._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object GmeiConfig extends Serializable {
var env: String = null
var config: Config = null
def setup(param: String): this.type = {
this.env = param
this.config = initConfig(this.env)
this
}
def initConfig(env: String) = {
lazy val c = ConfigFactory.load()
c.getConfig(env).withFallback(c)
}
def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf().set("spark.tispark.plan.allow_index_read", "false")
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000")
sparkConf.set("spark.tispark.plan.allow_index_read", "false")
sparkConf.set("spark.hive.mapred.supports.subdirectories","true")
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
val spark = SparkSession
.builder()
.config(sparkConf)
.config("spark.tispark.pd.addresses","172.16.40.158:2379")
.config("spark.sql.extensions","org.apache.spark.sql.TiExtensions")
.appName("feededa")
.enableHiveSupport()
.getOrCreate()
spark.sql("use online")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
val context = SparkContext.getOrCreate(sparkConf)
(context, spark)
}
def writeToJDBCTable(jdbcuri: String, df: DataFrame, table: String, saveModel: SaveMode): Unit = {
println(jdbcuri, table)
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("useSSL", "false")
prop.put("isolationLevel", "NONE")
prop.put("truncate", "true")
// save to mysql/tidb
try {
df.repartition(128).write.mode(saveModel)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 300)
.jdbc(jdbcuri, table, prop)
print("写入成功")}
catch {
case _ => println("没有写入成功")
}
}
def writeToJDBCTable(df: DataFrame, table: String, saveMode: SaveMode): Unit = {
val jdbcuri = this.config.getString("tidb.jdbcuri")
writeToJDBCTable(jdbcuri, df, table, saveMode)
}
def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={
var conn: Connection= null
var ps:java.sql.PreparedStatement=null
val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)"
conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC")
ps = conn.prepareStatement(sql)
try{
iterator.foreach(x => {
ps.setString(1,x._1)
ps.setString(2,x._2)
ps.setString(3,x._3)
ps.setString(4,x._4)
ps.executeUpdate()
})
println("update device feat done")
}catch {
case _ => println("update failed")
}
}
def getMinusNDate(n: Int):String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-n)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}
}