Commit d74c6332 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add test
parents d7d077d7 a4293306
......@@ -55,146 +55,156 @@ object EsmmData {
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
import sc.implicits._
val stat_date = param.date
println(stat_date)
val imp_data = sc.sql(
val max_stat_date = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from data_feed_exposure
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
|select max(stat_date) from esmm_train_data
""".stripMargin
)
// imp_data.show()
// println("imp_data.count()")
// println(imp_data.count())
val clk_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from data_feed_click
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
println("max_stat_date_str",max_stat_date_str)
println("param.date",param.date)
if (max_stat_date_str != param.date){
val stat_date = param.date
println(stat_date)
val imp_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from data_feed_exposure
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
""".stripMargin
)
// clk_data.show()
// println("clk_data.count()")
// println(clk_data.count())
val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
// imp_data_filter.createOrReplaceTempView("imp_data_filter")
// imp_data_filter.show()
// println("imp_data_filter.count()")
// println(imp_data_filter.count())
val stat_date_not = stat_date.replace("-","")
val cvr_data = sc.sql(
s"""
|select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| cl_id as device_id,city_id as ucity_id,
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view
|where action='page_view'
|and partition_date ='${stat_date_not}'
|and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail'
)
// imp_data.show()
// println("imp_data.count()")
// println(imp_data.count())
val clk_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from data_feed_click
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
""".stripMargin
)
)
// clk_data.show()
// println("clk_data.count()")
// println(clk_data.count())
val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
// imp_data_filter.createOrReplaceTempView("imp_data_filter")
// imp_data_filter.show()
// println("imp_data_filter.count()")
// println(imp_data_filter.count())
val stat_date_not = stat_date.replace("-","")
val cvr_data = sc.sql(
s"""
|select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| cl_id as device_id,city_id as ucity_id,
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view
|where action='page_view'
|and partition_date ='${stat_date_not}'
|and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail'
""".stripMargin
)
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
// cvr_data_filter.show()
// println("cvr_data_filter.count()")
// println(cvr_data_filter.count())
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
// cvr_data_filter.show()
// println("cvr_data_filter.count()")
// println(cvr_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show()
// println("clk_data_filter.count()")
// println(clk_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show()
// println("clk_data_filter.count()")
// println(clk_data_filter.count())
val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
union_data.createOrReplaceTempView("union_data")
// union_data.show()
// println("union_data.count()")
// println(union_data.count())
val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
union_data.createOrReplaceTempView("union_data")
// union_data.show()
// println("union_data.count()")
// println(union_data.count())
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show()
val union_data_slabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
)
union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show()
val union_data_slabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
""".stripMargin
)
union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show()
val union_data_ccity_name = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4
)
union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show()
val union_data_ccity_name = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4
""".stripMargin
)
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show()
val union_data_scity_id = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
|and d.partition_date='${stat_date_not}'
)
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show()
val union_data_scity_id = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${stat_date_not}'
|and c.partition_date='${stat_date_not}'
|and d.partition_date='${stat_date_not}'
""".stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
} else {
println("esmm_train_data already have param.date data")
}
sc.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment