Commit 68c03be4 authored by 张彦钊's avatar 张彦钊

修改tidb读取方式

parent e0bccdd3
dev.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true dev.tidb.jdbcuri=jdbc:mysql://192.168.15.12:4000/eagle?user=root&password=&rewriteBatchedStatements=true
dev.tispark.pd.addresses=10.66.157.22:2379 dev.tispark.pd.addresses=192.168.15.11:2379
dev.mimas.jdbcuri= jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/mimas_test?user=work&password=workwork&rewriteBatchedStatements=true dev.mimas.jdbcuri= jdbc:mysql://rm-2zenowgrn4i5p0j7txo.mysql.rds.aliyuncs.com/mimas_test?user=work&password=Gengmei1&rewriteBatchedStatements=true
dev.gaia.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/zhengxing_test?user=work&password=workwork&rewriteBatchedStatements=true dev.gaia.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/zhengxing_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.gold.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/doris_test?user=work&password=workwork&rewriteBatchedStatements=true dev.gold.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/doris_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.redis.host=10.30.50.58 dev.jerry.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/jerry_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.redis.port=6379 dev.test.jdbcuri= jdbc:mysql://rm-2ze0v6uua2hl9he8edo.mysql.rds.aliyuncs.com/mimas_test?user=work&password=Gengmei1&rewriteBatchedStatements=true
pre.tidb.jdbcuri=jdbc:mysql://192.168.16.11:4000/eagle?user=root&password=&rewriteBatchedStatements=true pre.tidb.jdbcuri=jdbc:mysql://192.168.16.11:4000/eagle?user=root&password=&rewriteBatchedStatements=true
pre.tispark.pd.addresses=192.168.16.11:2379 pre.tispark.pd.addresses=192.168.16.11:2379
pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas_prod?user=mimas&password=workwork&rewriteBatchedStatements=true pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas_prod?user=mimas&password=workwork&rewriteBatchedStatements=true
prod.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
prod.gold.jdbcuri=jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
prod.mimas.jdbcuri=jdbc:mysql://rm-m5emg41za2w7l6au3.mysql.rds.aliyuncs.com/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true #阿里云线上配置
prod.gaia.jdbcuri=jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true #prod.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/eagle?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
prod.tispark.pd.addresses=10.66.157.22:2379 #prod.gold.jdbcuri=jdbc:mysql://rm-m5ey2s823bq0lc616.mysql.rds.aliyuncs.com/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
prod.redis.host=10.30.50.58 #prod.mimas.jdbcuri=jdbc:mysql://rm-m5emg41za2w7l6au3.mysql.rds.aliyuncs.com/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true
prod.redis.port=6379 #prod.gaia.jdbcuri=jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true
#prod.jerry.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#prod.tispark.pd.addresses=10.66.157.22:2379
#
#prod.tidb.jdbcuri_new=jdbc:mysql://152.136.44.138:4000/eagle?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#prod.jerry.jdbcuri_new=jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#腾讯云线上配置
prod.gold.jdbcuri=jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
prod.mimas.jdbcuri=jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true
prod.gaia.jdbcuri=jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true
prod.tidb.jdbcuri=jdbc:mysql://172.16.40.158:4000/eagle?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
prod.jerry.jdbcuri=jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
prod.tispark.pd.addresses=172.16.40.158:2379
...@@ -4,7 +4,7 @@ package com.gmei ...@@ -4,7 +4,7 @@ package com.gmei
import java.io.Serializable import java.io.Serializable
import java.time.LocalDate import java.time.LocalDate
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import scopt.OptionParser import scopt.OptionParser
import com.gmei.lib.AbstractParams import com.gmei.lib.AbstractParams
...@@ -51,17 +51,9 @@ object EsmmData { ...@@ -51,17 +51,9 @@ object EsmmData {
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
val max_stat_date = sc.sql( val max_stat_date = sc.sql(
s""" s"""
|select max(stat_date) from esmm_train_data |select max(stat_date) from jerry_test.esmm_train_data
""".stripMargin """.stripMargin
) )
val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
...@@ -74,7 +66,7 @@ object EsmmData { ...@@ -74,7 +66,7 @@ object EsmmData {
// s""" // s"""
// |select distinct stat_date,device_id,city_id as ucity_id, // |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id // | cid_id,diary_service_id
// |from data_feed_exposure // |from jerry_prod.data_feed_exposure
// |where cid_type = 'diary' // |where cid_type = 'diary'
// |and stat_date ='${stat_date}' // |and stat_date ='${stat_date}'
// """.stripMargin // """.stripMargin
...@@ -84,7 +76,7 @@ object EsmmData { ...@@ -84,7 +76,7 @@ object EsmmData {
s""" s"""
|select * from |select * from
|(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id |(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
|from data_feed_exposure |from jerry_prod.data_feed_exposure
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date ='${stat_date}' |and stat_date ='${stat_date}'
|group by stat_date,device_id,city_id,cid_id,diary_service_id having count(*) > 1) a |group by stat_date,device_id,city_id,cid_id,diary_service_id having count(*) > 1) a
...@@ -99,7 +91,7 @@ object EsmmData { ...@@ -99,7 +91,7 @@ object EsmmData {
s""" s"""
|select distinct stat_date,device_id,city_id as ucity_id, |select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id | cid_id,diary_service_id
|from data_feed_click |from jerry_prod.data_feed_click
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date ='${stat_date}' |and stat_date ='${stat_date}'
""".stripMargin """.stripMargin
...@@ -190,8 +182,8 @@ object EsmmData { ...@@ -190,8 +182,8 @@ object EsmmData {
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name | c.name as ccity_name
|from union_data_slabel a |from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join eagle.src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id |left join eagle.src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4 | where c.tag_type=4
""".stripMargin """.stripMargin
) )
...@@ -223,11 +215,11 @@ object EsmmData { ...@@ -223,11 +215,11 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
union_data_scity_id2.persist() union_data_scity_id2.persist()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="jerry_test.esmm_train_data",SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="jerry_test.esmm_train_data",SaveMode.Append)
union_data_scity_id2.unpersist() union_data_scity_id2.unpersist()
} else { } else {
println("esmm_train_data already have param.date data") println("jerry_test.esmm_train_data already have param.date data")
} }
sc.stop() sc.stop()
...@@ -368,18 +360,6 @@ object EsmmPredData { ...@@ -368,18 +360,6 @@ object EsmmPredData {
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
ti.tidbMapTable("eagle","ffm_diary_queue")
ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
ti.tidbMapTable("eagle","biz_feed_diary_queue")
ti.tidbMapTable("jerry_prod","data_feed_exposure_precise")
import sc.implicits._ import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(1) val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
...@@ -388,7 +368,7 @@ object EsmmPredData { ...@@ -388,7 +368,7 @@ object EsmmPredData {
s""" s"""
|select concat(t.device_id,",",t.city_id) from |select concat(t.device_id,",",t.city_id) from
|(select distinct device_id,city_id |(select distinct device_id,city_id
|from data_feed_exposure where stat_date='${yesteday_have_seq}') t |from jerry_prod.data_feed_exposure where stat_date='${yesteday_have_seq}') t
""".stripMargin).collect().map(x => x(0).toString) """.stripMargin).collect().map(x => x(0).toString)
println("target_user",target_user.length) println("target_user",target_user.length)
...@@ -396,11 +376,12 @@ object EsmmPredData { ...@@ -396,11 +376,12 @@ object EsmmPredData {
val raw_data = sc.sql( val raw_data = sc.sql(
s""" s"""
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from |select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
|(select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix |(select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue
|from jerry_prod.nd_device_cid_similarity_matrix
|union |union
|select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue |select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from eagle.ffm_diary_queue
|union |union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 |select device_id,city_id,search_queue as merge_queue from eagle.search_queue) as tmp1
""".stripMargin) """.stripMargin)
// raw_data.show() // raw_data.show()
...@@ -421,7 +402,7 @@ object EsmmPredData { ...@@ -421,7 +402,7 @@ object EsmmPredData {
import sc.implicits._ import sc.implicits._
val sql = val sql =
s""" s"""
|select distinct device_id,cid_id from data_feed_exposure_precise |select distinct device_id,cid_id from jerry_prod.data_feed_exposure_precise
|where stat_date >= "$start" and cid_type = "diary" |where stat_date >= "$start" and cid_type = "diary"
""".stripMargin """.stripMargin
val history = sc.sql(sql).repartition(200).rdd val history = sc.sql(sql).repartition(200).rdd
...@@ -458,8 +439,8 @@ object EsmmPredData { ...@@ -458,8 +439,8 @@ object EsmmPredData {
// native_data // native_data
val native_data = sc.sql( val native_data = sc.sql(
s""" s"""
|select distinct a.device_id,a.city_id,b.native_queue from data_feed_exposure a |select distinct a.device_id,a.city_id,b.native_queue from jerry_prod.data_feed_exposure a
|left join (select if(city_id='world','worldwide',city_id) city_id,native_queue from biz_feed_diary_queue) b |left join (select if(city_id='world','worldwide',city_id) city_id,native_queue from eagle.biz_feed_diary_queue) b
|on a.city_id = b.city_id |on a.city_id = b.city_id
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != "" |where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin """.stripMargin
...@@ -558,8 +539,8 @@ object EsmmPredData { ...@@ -558,8 +539,8 @@ object EsmmPredData {
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name | c.name as ccity_name
|from union_data_slabel a |from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join eagle.src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id |left join eagle.src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4 | where c.tag_type=4
""".stripMargin """.stripMargin
) )
...@@ -665,9 +646,6 @@ object GetDiaryPortrait { ...@@ -665,9 +646,6 @@ object GetDiaryPortrait {
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
val stat_date = param.date.replace("-","") val stat_date = param.date.replace("-","")
val diary_tag = sc.sql( val diary_tag = sc.sql(
...@@ -742,9 +720,6 @@ object GetDevicePortrait { ...@@ -742,9 +720,6 @@ object GetDevicePortrait {
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
import sc.implicits._ import sc.implicits._
val stat_date = param.date.replace("-","") val stat_date = param.date.replace("-","")
...@@ -757,7 +732,7 @@ object GetDevicePortrait { ...@@ -757,7 +732,7 @@ object GetDevicePortrait {
| COALESCE(a.params['diary_id'], a.params['business_id'], 0) as cid_id, | COALESCE(a.params['diary_id'], a.params['business_id'], 0) as cid_id,
| b.level1_ids as level1_id | b.level1_ids as level1_id
| from online.tl_hdfs_maidian_view a | from online.tl_hdfs_maidian_view a
| left join diary_feat b | left join jerry_prod.diary_feat b
| on COALESCE(a.params['diary_id'], a.params['business_id'], 0) = b.diary_id | on COALESCE(a.params['diary_id'], a.params['business_id'], 0) = b.diary_id
| where | where
| b.level1_ids is not null and | b.level1_ids is not null and
...@@ -844,11 +819,6 @@ object GetLevelCount { ...@@ -844,11 +819,6 @@ object GetLevelCount {
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
import sc.implicits._ import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1).replace("-","") val stat_date = GmeiConfig.getMinusNDate(1).replace("-","")
...@@ -856,7 +826,7 @@ object GetLevelCount { ...@@ -856,7 +826,7 @@ object GetLevelCount {
val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580" val diary_queue = "16215222,16204965,15361235,16121397,16277565,15491159,16299587,16296887,15294642,16204934,15649199,16122580,16122580,16122580,16122580,16122580,16122580"
val diary_level1 = sc.sql( val diary_level1 = sc.sql(
s""" s"""
|select diary_id,explode(split(level1_ids,';')) level1_id from diary_feat |select diary_id,explode(split(level1_ids,';')) level1_id from jerry_prod.diary_feat
|where diary_id in (${diary_queue}) |where diary_id in (${diary_queue})
""".stripMargin """.stripMargin
) )
...@@ -924,9 +894,6 @@ object GetDeviceDuration { ...@@ -924,9 +894,6 @@ object GetDeviceDuration {
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
import sc.implicits._ import sc.implicits._
val stat_date = param.date val stat_date = param.date
...@@ -935,8 +902,8 @@ object GetDeviceDuration { ...@@ -935,8 +902,8 @@ object GetDeviceDuration {
s""" s"""
|select a.device_id,coalesce(a.start_time,a.ndiary_in,0) in_time,coalesce(a.end_time,a.ndiary_out,0) out_time, |select a.device_id,coalesce(a.start_time,a.ndiary_in,0) in_time,coalesce(a.end_time,a.ndiary_out,0) out_time,
|explode(split(b.level1_ids,';')) level1_id |explode(split(b.level1_ids,';')) level1_id
|from data_feed_click a |from jerry_prod.data_feed_click a
|left join diary_feat b on a.cid_id = b.diary_id |left join jerry_prod.diary_feat b on a.cid_id = b.diary_id
|where a.stat_date > '2018-12-12' |where a.stat_date > '2018-12-12'
""".stripMargin """.stripMargin
) )
...@@ -973,8 +940,8 @@ object GetDeviceDuration { ...@@ -973,8 +940,8 @@ object GetDeviceDuration {
| (select a.device_id, | (select a.device_id,
| coalesce(a.end_time,a.ndiary_out,0)-coalesce(a.start_time,a.ndiary_in,0) as duration, | coalesce(a.end_time,a.ndiary_out,0)-coalesce(a.start_time,a.ndiary_in,0) as duration,
| explode(split(b.level1_ids,';')) level1_id | explode(split(b.level1_ids,';')) level1_id
| from data_feed_click a | from jerry_prod.data_feed_click a
| left join diary_feat b on a.cid_id = b.diary_id where a.stat_date > '2018-12-12') c | left join jerry_prod.diary_feat b on a.cid_id = b.diary_id where a.stat_date > '2018-12-12') c
| group by c.device_id,c.level1_id) d | group by c.device_id,c.level1_id) d
|group by d.device_id |group by d.device_id
""".stripMargin """.stripMargin
...@@ -1026,18 +993,12 @@ object EsmmDataTest { ...@@ -1026,18 +993,12 @@ object EsmmDataTest {
GmeiConfig.setup(param.env) GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure_precise")
ti.tidbMapTable(dbName = "jerry_test", tableName = "train_data")
click(sc) click(sc)
val max_stat_date = sc.sql( val max_stat_date = sc.sql(
s""" s"""
|select max(stat_date) from train_data |select max(stat_date) from jerry_test.train_data
""".stripMargin """.stripMargin
) )
val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
...@@ -1050,7 +1011,7 @@ object EsmmDataTest { ...@@ -1050,7 +1011,7 @@ object EsmmDataTest {
// s""" // s"""
// |select distinct stat_date,device_id,city_id as ucity_id, // |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id // | cid_id,diary_service_id
// |from data_feed_exposure // |from jerry_prod.data_feed_exposure
// |where cid_type = 'diary' // |where cid_type = 'diary'
// |and stat_date ='${stat_date}' // |and stat_date ='${stat_date}'
// """.stripMargin // """.stripMargin
...@@ -1060,7 +1021,7 @@ object EsmmDataTest { ...@@ -1060,7 +1021,7 @@ object EsmmDataTest {
s""" s"""
|select * from |select * from
|(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id |(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
|from data_feed_exposure_precise |from jerry_prod.data_feed_exposure_precise
|where cid_type = 'diary' |where cid_type = 'diary'
|and stat_date ='${stat_date}' |and stat_date ='${stat_date}'
|group by stat_date,device_id,city_id,cid_id,diary_service_id) a |group by stat_date,device_id,city_id,cid_id,diary_service_id) a
...@@ -1074,7 +1035,7 @@ object EsmmDataTest { ...@@ -1074,7 +1035,7 @@ object EsmmDataTest {
val clk_data = sc.sql( val clk_data = sc.sql(
s""" s"""
|select distinct stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id |select distinct stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
|from esmm_click |from jerry_test.esmm_click
|where stat_date ='${stat_date}' |where stat_date ='${stat_date}'
""".stripMargin """.stripMargin
) )
...@@ -1163,8 +1124,8 @@ object EsmmDataTest { ...@@ -1163,8 +1124,8 @@ object EsmmDataTest {
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name | c.name as ccity_name
|from union_data_slabel a |from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join eagle.src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id |left join eagle.src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4 | where c.tag_type=4
""".stripMargin """.stripMargin
) )
...@@ -1213,7 +1174,7 @@ object EsmmDataTest { ...@@ -1213,7 +1174,7 @@ object EsmmDataTest {
val stat_yesterday = LocalDate.now().minusDays(1).toString val stat_yesterday = LocalDate.now().minusDays(1).toString
val max_stat_date = spark.sql( val max_stat_date = spark.sql(
s""" s"""
|select max(stat_date) from esmm_click |select max(stat_date) from jerry_test.esmm_click
""".stripMargin """.stripMargin
) )
val max = max_stat_date.collect().map(s => s(0).toString).head val max = max_stat_date.collect().map(s => s(0).toString).head
......
...@@ -37,18 +37,12 @@ object GmeiConfig extends Serializable { ...@@ -37,18 +37,12 @@ object GmeiConfig extends Serializable {
sparkConf.set("spark.debug.maxToStringFields", "130") sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000") sparkConf.set("spark.sql.broadcastTimeout", "6000")
if (!sparkConf.contains("""spark.master""")) {
sparkConf.setMaster("local[3]")
}
if (!sparkConf.contains("spark.tispark.pd.addresses")) {
sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
}
println(sparkConf.get("spark.tispark.pd.addresses"))
val spark = SparkSession val spark = SparkSession
.builder() .builder()
.config(sparkConf) .config(sparkConf)
.config("spark.tispark.pd.addresses","172.16.40.158:2379")
.config("spark.sql.extensions","org.apache.spark.sql.TiExtensions")
.appName("feededa") .appName("feededa")
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()
......
...@@ -6,7 +6,7 @@ import java.time.LocalDate ...@@ -6,7 +6,7 @@ import java.time.LocalDate
import com.gmei.lib.AbstractParams import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scopt.OptionParser import scopt.OptionParser
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
...@@ -46,9 +46,6 @@ object esmm_feature { ...@@ -46,9 +46,6 @@ object esmm_feature {
GmeiConfig.setup(param.env) GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
ti.tidbMapTable(dbName = "jerry_test",tableName = "user_feature")
user_feature(sc) user_feature(sc)
get_applist(sc) get_applist(sc)
...@@ -67,7 +64,7 @@ object esmm_feature { ...@@ -67,7 +64,7 @@ object esmm_feature {
""".stripMargin).dropDuplicates("device_id") """.stripMargin).dropDuplicates("device_id")
df.persist() df.persist()
val old = spark.sql("select device_id from device_app_list").collect().map(x => x(0).toString) val old = spark.sql("select device_id from jerry_test.device_app_list").collect().map(x => x(0).toString)
import spark.implicits._ import spark.implicits._
val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString)) val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
...@@ -81,8 +78,6 @@ object esmm_feature { ...@@ -81,8 +78,6 @@ object esmm_feature {
val new_user = rdd.filter(x => old.indexOf(x._1)== -1) val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date") .toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){ if (new_user.take(1).nonEmpty){
val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbc, new_user,"device_app_list", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append) GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
...@@ -114,7 +109,7 @@ object esmm_feature { ...@@ -114,7 +109,7 @@ object esmm_feature {
def user_feature(spark:SparkSession): Unit ={ def user_feature(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","") val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday) println(yesterday)
val sql_exist = "select device_id from user_feature" val sql_exist = "select device_id from jerry_test.user_feature"
val old = spark.sql(sql_exist) val old = spark.sql(sql_exist)
.collect().map(x => x(0).toString) .collect().map(x => x(0).toString)
val sql_yesterday = val sql_yesterday =
...@@ -130,12 +125,8 @@ object esmm_feature { ...@@ -130,12 +125,8 @@ object esmm_feature {
val df_new = rdd.filter(x => old.indexOf(x._1)== -1) val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date") .toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){ if (df_new.take(1).nonEmpty){
df_new.persist()
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, df_new, "user_feature", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append) GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
df_new.unpersist()
}else { }else {
println("no need to insert into user feature") println("no need to insert into user feature")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment