Commit e11ee889 authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

git commit -m "esmm_train_data from overwrite to append and add native
queue data"
parents 9dd44f44 41774688
...@@ -50,10 +50,17 @@ object testt { ...@@ -50,10 +50,17 @@ object testt {
ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
// val stat_date = GmeiConfig.getMinusNDate(1) val stat_date = GmeiConfig.getMinusNDate(1)
val stat_date=param.date // val stat_date=param.date
val partition_date = stat_date.replace("-","") val partition_date = stat_date.replace("-","")
//机构id //机构id
val blacklist = sc.sql(
s"""
|select device_id from blacklist
""".stripMargin
)
blacklist.createOrReplaceTempView("blacklist")
val agency_id = sc.sql( val agency_id = sc.sql(
s""" s"""
|SELECT DISTINCT(cl_id) as device_id |SELECT DISTINCT(cl_id) as device_id
...@@ -69,87 +76,319 @@ object testt { ...@@ -69,87 +76,319 @@ object testt {
|AND pv_ratio >= 0.95 |AND pv_ratio >= 0.95
""".stripMargin """.stripMargin
) )
// agency_id.show() agency_id.show()
agency_id.createOrReplaceTempView("agency_id") agency_id.createOrReplaceTempView("agency_id")
//每日新用户
val device_id_newUser = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type != '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
device_id_newUser.show()
device_id_newUser.createOrReplaceTempView("device_id_new")
//每日老用户
val device_id_oldUser = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
device_id_oldUser.show()
device_id_oldUser.createOrReplaceTempView("device_id_old")
//日记本转化美购 //日记本转化美购
//1.日记本到美购转化数 //1.日记本到美购转化数
val diary_meigou_count = sc.sql( val diary_meigou_temp = sc.sql(
s""" s"""
|select '${stat_date}' as stat_date, count(page_name) as diary_meigou_count |select ou.cl_id as device_id
|from online.bl_hdfs_page_view_updates ou left join agency_id |from online.bl_hdfs_page_view_updates ou left join agency_id
|on ou.cl_id = agency_id.device_id |on ou.cl_id = agency_id.device_id
|where ou.partition_date = '${partition_date}' |where ou.partition_date = '${partition_date}'
|and ou.page_name='welfare_detail' |and ou.page_name='welfare_detail'
|and ou.referrer='diary_detail' |and ou.referrer='diary_detail'
|and agency_id.device_id is null |and agency_id.device_id is null
|and ou.cl_id not in (select device_id from blacklist)
""".stripMargin """.stripMargin
) )
diary_meigou_count.show() diary_meigou_temp.createOrReplaceTempView("diary_meigou_temp")
val diary_meigou_device = sc.sql(
s"""
|select dt.device_id
|from diary_meigou_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
""".stripMargin
)
diary_meigou_device.createOrReplaceTempView("diary_meigou_device")
//新用户到美购详情页的转化
val diary_meigou_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_newUser
|from diary_meigou_device dd left join device_id_new
|on dd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
""".stripMargin
)
//老用户到美购详情页的转化
val diary_meigou_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_meigou_oldUser
|from diary_meigou_device dd left join device_id_old
|on dd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
""".stripMargin
)
// val diary_meigou_count = sc.sql(
// s"""
// |select '${stat_date}' as stat_date, count(page_name) as diary_meigou_count
// |from online.bl_hdfs_page_view_updates ou left join agency_id
// |on ou.cl_id = agency_id.device_id
// |where ou.partition_date = '${partition_date}'
// |and ou.page_name='welfare_detail'
// |and ou.referrer='diary_detail'
// |and agency_id.device_id is null
// |and ou.cl_id not in (select device_id from blacklist)
// """.stripMargin
// )
// diary_meigou_count.show()
//2.日记本点击数 //2.日记本点击数
val diary_clk = sc.sql( val diary_clk_temp = sc.sql(
s""" s"""
|select '${stat_date}' as stat_date,count(cl_id) as diary_clk |select ov.cl_id as device_id
|from online.tl_hdfs_maidian_view ov left join agency_id |from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id |on ov.cl_id = agency_id.device_id
|where ov.action = 'on_click_diary_card' |where ov.action = 'on_click_diary_card'
|and ov.cl_id != "NULL" |and ov.cl_id != "NULL"
|and ov.partition_date='${partition_date}' |and ov.partition_date='${partition_date}'
|and agency_id.device_id is null |and agency_id.device_id is null
|and ov.cl_id not in (select device_id from blacklist)
""".stripMargin """.stripMargin
) )
diary_clk.show() diary_clk_temp.createOrReplaceTempView("diary_clk_temp")
val diary_clk_device = sc.sql(
s"""
|select dt.device_id
|from diary_clk_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
""".stripMargin
)
diary_clk_device.createOrReplaceTempView("diary_clk_device")
//新用户日记本点击
val diary_clk_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_newUser
|from diary_clk_device dd left join device_id_new
|on dd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
""".stripMargin
)
//老用户日记本点击
val diary_clk_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_clk_oldUser
|from diary_clk_device dd left join device_id_old
|on dd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
""".stripMargin
)
// val diary_clk = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(cl_id) as diary_clk
// |from online.tl_hdfs_maidian_view ov left join agency_id
// |on ov.cl_id = agency_id.device_id
// |where ov.action = 'on_click_diary_card'
// |and ov.cl_id != "NULL"
// |and ov.partition_date='${partition_date}'
// |and agency_id.device_id is null
// |and ov.cl_id not in (select device_id from blacklist)
// """.stripMargin
// )
// diary_clk.show()
//3.日记本曝光数 //3.日记本曝光数
val diary_expoure=sc.sql( val diary_expoure_temp=sc.sql(
s""" s"""
|select '${stat_date}' as stat_date,count(cl_id) as diary_expoure |select od.cl_id as device_id
|from online.ml_community_exposure_detail od left join agency_id |from online.ml_community_exposure_detail od left join agency_id
|on od.cl_id = agency_id.device_id |on od.cl_id = agency_id.device_id
|where od.business_type = "diary" |where od.business_type = "diary"
|and od.cl_id != "NULL" |and od.cl_id != "NULL"
|and od.partition_date='${partition_date}' |and od.partition_date='${partition_date}'
|and agency_id.device_id is null |and agency_id.device_id is null
|and od.cl_id not in (select device_id from blacklist)
""".stripMargin """.stripMargin
) )
diary_expoure.show() diary_expoure_temp.createOrReplaceTempView("diary_expoure_temp")
val diary_expoure_device = sc.sql(
s"""
|select dt.device_id
|from diary_expoure_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
""".stripMargin
)
diary_expoure_device.createOrReplaceTempView("diary_expoure_device")
//新用户日记本曝光
val diary_exp_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_newUser
|from diary_expoure_device dd left join device_id_new
|on dd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
""".stripMargin
)
//老用户日记本曝光
val diary_exp_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(dd.device_id) as diary_exp_oldUser
|from diary_expoure_device dd left join device_id_old
|on dd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
""".stripMargin
)
// val diary_expoure=sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(cl_id) as diary_expoure
// |from online.ml_community_exposure_detail od left join agency_id
// |on od.cl_id = agency_id.device_id
// |where od.business_type = "diary"
// |and od.cl_id != "NULL"
// |and od.partition_date='${partition_date}'
// |and agency_id.device_id is null
// |and od.cl_id not in (select device_id from blacklist)
// """.stripMargin
// )
// diary_expoure.show()
//4.搜索次数统计 //4.搜索次数统计
val search_count = sc.sql( val search_device_temp = sc.sql(
s""" s"""
|select '${stat_date}' as stat_date,count(params['query']) as search_num |select ov.cl_id as device_id
|from online.tl_hdfs_maidian_view ov left join agency_id |from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id |on ov.cl_id = agency_id.device_id
|where (ov.action = 'do_search' or ov.action = 'search_result_click_search') |where (ov.action = 'do_search' or ov.action = 'search_result_click_search')
|and ov.partition_date ='${partition_date}' |and ov.partition_date ='${partition_date}'
|and agency_id.device_id is null |and agency_id.device_id is null
|and ov.cl_id not in (select device_id from blacklist)
""".stripMargin """.stripMargin
) )
search_count.show() search_device_temp.createOrReplaceTempView("search_device_temp")
//5.登录人数 val search_device = sc.sql(
s"""
|select dt.device_id
|from search_device_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
""".stripMargin
)
search_device.createOrReplaceTempView("search_device")
//新用户搜索次数
val search_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(sd.device_id) as search_newUser
|from search_device sd left join device_id_new
|on sd.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
""".stripMargin
)
//老用户日搜索次数
val search_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(sd.device_id) as search_oldUser
|from search_device sd left join device_id_old
|on sd.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
""".stripMargin
)
val log_num = sc.sql( //5.登录人数
val log_device_temp = sc.sql(
s""" s"""
|select '${stat_date}' as stat_date,count(distinct(device_id)) as log_num |select distinct(oe.device_id) as device_id
|from data_feed_exposure oe left join agency_id |from data_feed_exposure oe left join agency_id
|on oe.device_id = agency_id.device_id |on oe.device_id = agency_id.device_id
|and oe.stat_date ='${stat_date}' |and oe.stat_date ='${stat_date}'
|and agency_id.device_id is null |and agency_id.device_id is null
|and oe.device_id not in (select device_id from blacklist)
""".stripMargin """.stripMargin
) )
log_num.show() log_device_temp.createOrReplaceTempView("log_device_temp")
val log_device = sc.sql(
s"""
|select dt.device_id
|from log_device_temp dt left join blacklist
|on dt.device_id = blacklist.device_id
|where blacklist.device_id is null
""".stripMargin
)
log_device.createOrReplaceTempView("log_device")
//新用户登录人数
val log_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_newUser
|from log_device ld left join device_id_new
|on ld.device_id = device_id_new.device_id
|where device_id_new.device_id is not null
""".stripMargin
)
//老用户登录人数
val log_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(distinct(ld.device_id)) as log_oldUser
|from log_device ld left join device_id_old
|on ld.device_id = device_id_old.device_id
|where device_id_old.device_id is not null
""".stripMargin
)
// val log_num = sc.sql(
// s"""
// |select '${stat_date}' as stat_date,count(distinct(device_id)) as log_num
// |from data_feed_exposure oe left join agency_id
// |on oe.device_id = agency_id.device_id
// |and oe.stat_date ='${stat_date}'
// |and agency_id.device_id is null
// |and oe.device_id not in (select device_id from blacklist)
// """.stripMargin
// )
// log_num.show()
val result = diary_meigou_count.join(diary_clk,"stat_date") val result = diary_meigou_newUser.join(diary_meigou_oldUser,"stat_date")
.join(diary_expoure,"stat_date") .join(diary_clk_newUser,"stat_date")
.join(search_count,"stat_date") .join(diary_clk_oldUser,"stat_date")
.join(log_num,"stat_date") .join(diary_exp_newUser,"stat_date")
.join(diary_exp_oldUser,"stat_date")
.join(search_newUser,"stat_date")
.join(search_oldUser,"stat_date")
.join(log_newUser,"stat_date")
.join(log_oldUser,"stat_date")
GmeiConfig.writeToJDBCTable(result, "diary_meigou_crv", SaveMode.Append) GmeiConfig.writeToJDBCTable(result, "diary_meigou_crv", SaveMode.Append)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment