Commit de06e9e5 authored by 高雅喆's avatar 高雅喆

bug fix

parent a5c140f6
...@@ -266,7 +266,7 @@ object EsmmPredData { ...@@ -266,7 +266,7 @@ object EsmmPredData {
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue |select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|union |union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 |select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data limit 20000) |where tmp1.device_id in (select distinct device_id from esmm_train_data)
""".stripMargin """.stripMargin
) )
raw_data.show() raw_data.show()
...@@ -284,11 +284,19 @@ object EsmmPredData { ...@@ -284,11 +284,19 @@ object EsmmPredData {
val raw_data2 = sc.sql( val raw_data2 = sc.sql(
s""" s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1 |select device_id,city_id,merge_queue from raw_data1 limit 10000
""".stripMargin """.stripMargin
) )
raw_data2.createOrReplaceTempView("raw_data") raw_data1.createOrReplaceTempView("raw_data2")
print(raw_data2.count()) println(raw_data2.count())
val raw_data3 = sc.sql(
s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2
""".stripMargin
)
raw_data3.createOrReplaceTempView("raw_data")
print(raw_data3.count())
...@@ -310,7 +318,8 @@ object EsmmPredData { ...@@ -310,7 +318,8 @@ object EsmmPredData {
|where b.partition_date = '${yesteday}' |where b.partition_date = '${yesteday}'
""".stripMargin """.stripMargin
) )
sid_data.show() // sid_data.show()
println(sid_data.count())
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0)) val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data") sid_data_label.createOrReplaceTempView("union_data")
...@@ -372,7 +381,7 @@ object EsmmPredData { ...@@ -372,7 +381,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show() println(union_data_scity_id.count())
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_pre_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_pre_data",SaveMode.Overwrite)
......
...@@ -33,7 +33,7 @@ object GmeiConfig extends Serializable { ...@@ -33,7 +33,7 @@ object GmeiConfig extends Serializable {
def getSparkSession():(SparkContext, SparkSession) = { def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf val sparkConf = new SparkConf
sparkConf.set("spark.sql.crossJoin.enabled", "true") sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000") sparkConf.set("spark.sql.broadcastTimeout", "6000")
if (!sparkConf.contains("spark.master")) { if (!sparkConf.contains("spark.master")) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment