Commit 9dd44f44 authored by 高雅喆's avatar 高雅喆

esmm_train_data from overwrite to append

parent ab11177f
......@@ -52,10 +52,11 @@ object Data2FFM {
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val train_sep_date = GmeiConfig.getMinusNDate(14)
val esmm_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
|where stat_date > '${train_sep_date}'
""".stripMargin
).repartition(200).na.drop()
val column_list = esmm_data.columns.filter(x => x != "y" && x != "z")
......@@ -114,7 +115,7 @@ object Data2FFM {
val esmm_pre_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name,label
|from esmm_pre_data
""".stripMargin
).repartition(200).na.drop()
......@@ -132,10 +133,10 @@ object Data2FFM {
val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString,
x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
x(7).toString,x(8).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
.filter(x => esmm_join_city.indexOf(x._5) != -1)
val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
val native_pre = rdd_pre.filter(x => x._9 == "0").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("ccity_name").indexOf(x._8),x._5,x._6))
......@@ -144,8 +145,20 @@ object Data2FFM {
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("pre")
pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
native_pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, native_pre, "esmm_data2ffm_infer_native", SaveMode.Overwrite)
val nearby_pre = rdd_pre.filter(x => x._9 == "1").map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("ccity_name").indexOf(x._8),x._5,x._6))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
println("pre")
nearby_pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, nearby_pre, "esmm_data2ffm_infer_nearby", SaveMode.Overwrite)
sc.stop()
......
......@@ -18,7 +18,8 @@ object EsmmData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
case class Params(env: String = "dev",
date: String = GmeiConfig.getMinusNDate(1)
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
......@@ -28,6 +29,9 @@ object EsmmData {
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String]("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
......@@ -53,9 +57,8 @@ object EsmmData {
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
println("新修改的")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(14)
val stat_date = param.date
println(stat_date)
val imp_data = sc.sql(
s"""
......@@ -63,9 +66,9 @@ object EsmmData {
| cid_id,diary_service_id
|from data_feed_exposure
|where cid_type = 'diary'
|and stat_date >'${stat_date}'
|and stat_date ='${stat_date}'
""".stripMargin
).repartition(200)
)
// imp_data.show()
// println("imp_data.count()")
// println(imp_data.count())
......@@ -77,9 +80,9 @@ object EsmmData {
| cid_id,diary_service_id
|from data_feed_click
|where cid_type = 'diary'
|and stat_date >'${stat_date}'
|and stat_date ='${stat_date}'
""".stripMargin
).repartition(200)
)
// clk_data.show()
// println("clk_data.count()")
// println(clk_data.count())
......@@ -93,7 +96,7 @@ object EsmmData {
// println(imp_data_filter.count())
val stat_date_not = GmeiConfig.getMinusNDate(14).replace("-","")
val stat_date_not = stat_date.replace("-","")
val cvr_data = sc.sql(
s"""
|select distinct
......@@ -102,11 +105,11 @@ object EsmmData {
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view
|where action='page_view'
|and partition_date >'${stat_date_not}'
|and partition_date ='${stat_date_not}'
|and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail'
""".stripMargin
).repartition(200)
)
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
......@@ -130,7 +133,7 @@ object EsmmData {
// println(union_data.count())
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
......@@ -138,8 +141,8 @@ object EsmmData {
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_clabel.createOrReplaceTempView("union_data_clabel")
......@@ -152,8 +155,8 @@ object EsmmData {
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_slabel.createOrReplaceTempView("union_data_slabel")
......@@ -181,14 +184,14 @@ object EsmmData {
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
|and d.partition_date='${yesteday}'
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
|and d.partition_date='${stat_date_not}'
""".stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
......@@ -240,24 +243,18 @@ object EsmmPredData {
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
ti.tidbMapTable("eagle","ffm_diary_queue")
ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
ti.tidbMapTable("eagle","biz_feed_diary_queue")
import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
// val activate_data = sc.sql(
// s"""
// |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
// |from merge_queue_table a
// |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
// |where b.stat_date ='${yesteday_have_seq}'
// |and b.device_id is not null
// """.stripMargin
// )
//nearby_data
val raw_data = sc.sql(
s"""
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
......@@ -266,9 +263,9 @@ object EsmmPredData {
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data)
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
""".stripMargin
).repartition(200)
)
raw_data.show()
......@@ -280,40 +277,59 @@ object EsmmPredData {
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1")
println(raw_data1.count())
println("nearby_device_count",raw_data1.count())
val raw_data2 = sc.sql(
s"""
|select device_id,city_id,merge_queue from raw_data1 limit 10000
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data1
""".stripMargin
).repartition(200)
).withColumn("label",lit(1))
raw_data2.createOrReplaceTempView("raw_data2")
println(raw_data2.count())
raw_data2.show()
println("nearby_explode_count",raw_data2.count())
val raw_data3 = sc.sql(
// native_data
val native_data = sc.sql(
s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2
|select distinct a.device_id,a.city_id,b.native_queue from data_feed_click a
|left join biz_feed_diary_queue b on a.city_id = b.city_id
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin
).repartition(200)
raw_data3.createOrReplaceTempView("raw_data")
println(raw_data3.count())
)
native_data.createOrReplaceTempView("native_data")
println("native_device_count",native_data.count())
val native_data1 = sc.sql(
s"""
|select device_id,city_id as ucity_id,
|explode(split(split(native_queue, concat(',',split(native_queue,',')[300]))[0],',')) as cid_id
|from native_data
""".stripMargin
).withColumn("label",lit(0))
native_data1.createOrReplaceTempView("native_data1")
println("native_explode_count",native_data1.count())
// activate_data.createOrReplaceTempView("raw_data")
// raw_data.show()
//union
val union_data = sc.sql(
s"""
|select device_id,ucity_id,cid_id,label from native_data1
|union
|select device_id,ucity_id,cid_i,label from raw_data2
""".stripMargin
)
union_data.createOrReplaceTempView("raw_data")
println("union_count",union_data.count())
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
//join feat
val yesteday = yesteday_have_seq.replace("-","")
val sid_data = sc.sql(
s"""
|select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| a.device_id,a.ucity_id,a.cid_id, b.service_id as diary_service_id
| a.device_id,a.ucity_id,a.cid_id,a.label, b.service_id as diary_service_id
|from raw_data a
|left join online.ml_community_diary_updates b on a.cid_id = b.diary_id
|where b.partition_date = '${yesteday}'
......@@ -328,7 +344,7 @@ object EsmmPredData {
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
......@@ -342,7 +358,7 @@ object EsmmPredData {
val union_data_slabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
......@@ -357,7 +373,7 @@ object EsmmPredData {
val union_data_ccity_name = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
......@@ -370,7 +386,7 @@ object EsmmPredData {
val union_data_scity_id = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.label,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment