Commit 072d375b authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add crv feature file
parents bd03655b 3ef41289
package com.gmei
import java.io.Serializable
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import org.apache.spark.sql.functions.lit
object EsmmData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(14)
println(stat_date)
val imp_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from data_feed_exposure
|where cid_type = 'diary'
|and stat_date >'${stat_date}'
""".stripMargin
)
// imp_data.show()
// println("imp_data.count()")
// println(imp_data.count())
val clk_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|from data_feed_click
|where cid_type = 'diary'
|and stat_date >'${stat_date}'
""".stripMargin
)
// clk_data.show()
// println("clk_data.count()")
// println(clk_data.count())
val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
// imp_data_filter.createOrReplaceTempView("imp_data_filter")
// imp_data_filter.show()
// println("imp_data_filter.count()")
// println(imp_data_filter.count())
val stat_date_not = GmeiConfig.getMinusNDate(14).replace("-","")
val cvr_data = sc.sql(
s"""
|select distinct
| from_unixtime(unix_timestamp('${stat_date_not}' ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| cl_id as device_id,city_id as ucity_id,
| params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
|from online.tl_hdfs_maidian_view
|where action='page_view'
|and partition_date >'${stat_date_not}'
|and params['page_name'] = 'welfare_detail'
|and params['referrer'] = 'diary_detail'
""".stripMargin
)
val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
// cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
// cvr_data_filter.show()
// println("cvr_data_filter.count()")
// println(cvr_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show()
// println("clk_data_filter.count()")
// println(clk_data_filter.count())
val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
union_data.createOrReplaceTempView("union_data")
// union_data.show()
// println("union_data.count()")
// println(union_data.count())
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
""".stripMargin
)
union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show()
val union_data_slabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
""".stripMargin
)
union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show()
val union_data_ccity_name = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4
""".stripMargin
)
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show()
val union_data_scity_id = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
|and d.partition_date='${yesteday}'
""".stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable(union_data_scity_id, table="esmm_data",SaveMode.Overwrite)
sc.stop()
}
}
}
......@@ -50,7 +50,7 @@ object Search_keywords_count {
s"""
|select params['query'] as search_keywords
|from online.tl_hdfs_maidian_view
|where action = 'search_result_click_search'
|where action = 'do_search'
|and partition_date ='${partition_date}'
""".stripMargin
)
......@@ -59,7 +59,7 @@ object Search_keywords_count {
s"""
|select '${stat_date}' as stat_date,count(params['query']) as search_num
|from online.tl_hdfs_maidian_view
|where action = 'search_result_click_search'
|where action = 'do_search'
|and partition_date ='${partition_date}'
""".stripMargin
)
......
......@@ -89,9 +89,9 @@ object app_list {
|AND pv_ratio >= 0.95
""".stripMargin
)
agency_id.show()
agency_id.createOrReplaceTempView("agency_id")
//获取与新氧用户重合的用户device_id
val app_list = sc.sql(
s"""
......@@ -142,6 +142,36 @@ object app_list {
GmeiConfig.writeToJDBCTable(tempp_list, "device_id_applist", SaveMode.Append)
//在更美有消费的用户列表
val device_id_meigou = sc.sql(
s"""
|select DISTINCT(od.device_id) as device_id
|from online.ml_meigou_order_detail od left join agency_id
|on od.device_id = agency_id.device_id
|where od.partition_date = '20181118'
|and od.pay_time is not null
|and od.pay_time >= '2017-11-18'
|and agency_id.device_id is null
|and od.device_id not in (select distinct(device_id) from blacklist)
""".stripMargin
)
device_id_meigou.createOrReplaceTempView("device_id_meigou")
val app_list_meigou = sc.sql(
s"""
|select distinct(ov.cl_id) as device_id, user_id as user_id,params['installed_app_info'] as app_list,channel
|from online.tl_hdfs_maidian_view ov left join device_id_meigou
|on ov.cl_id = device_id_meigou.device_id
|where ov.action="user_installed_all_app_info"
|and device_id_meigou.device_id is not null
|and ov.partition_date = '${partition_date}'
""".stripMargin
)
val applist_meigou=app_list_meigou.withColumn("stat_date",addCol(app_list_meigou("device_id")))
GmeiConfig.writeToJDBCTable(applist_meigou, "device_id_applist_meigou", SaveMode.Append)
}
......@@ -319,51 +349,51 @@ object coincidence_xinyang {
//* 所有获得用户列表的用户id
//1.重合用户的美购数
val meigou_coincidence_num = sc.sql(
val meigou_1 = sc.sql(
s"""
|select count(ov.device_id) as meigou_coincidence_num
|select count(ov.device_id) as meigou_1
|from online.ml_meigou_order_detail ov left join coincidence_id
|on ov.device_id = coincidence_id.device_id
|where partition_date = '20181120'
|where ov.partition_date = '20181120'
|and coincidence_id.device_id is not null
|and ov.pay_time is not null
|and ov.pay_time >= '2017-11-18'
""".stripMargin
)
meigou_coincidence_num.show()
meigou_1.show()
//2.重合用户进行美购的用户数
val meigou_pay_device = sc.sql(
val meigou_2 = sc.sql(
s"""
|select count(distinct(ov.device_id)) as meigou_coincidence_num
|select count(distinct(ov.device_id)) as meigou_2
|from online.ml_meigou_order_detail ov left join coincidence_id
|on ov.device_id = coincidence_id.device_id
|where partition_date = '20181120'
|where ov.partition_date = '20181120'
|and coincidence_id.device_id is not null
|and ov.pay_time is not null
|and ov.pay_time >= '2017-11-18'
""".stripMargin
)
meigou_pay_device.show()
meigou_2.show()
//3.所有获得应用列表的用户的美购数
val meigou_pay_all = sc.sql(
val meigou_3 = sc.sql(
s"""
|select count(od.device_id) as meigou_pay_device
|select count(od.device_id) as meigou_3
|from online.ml_meigou_order_detail od inner join all_id
|on od.device_id = all_id.device_id
|where partition_date = '20181120'
|where od.partition_date = '20181120'
|and all_id.device_id is not null
|and od.pay_time is not null
|and od.pay_time >= '2017-11-18'
""".stripMargin
)
meigou_pay_all.show()
meigou_3.show()
//4.所有获得应用列表用户进行美购的用户数
val meigou_pay_device_all = sc.sql(
val meigou_4 = sc.sql(
s"""
|select count(distinct(od.device_id)) as meigou_pay_device
|select count(distinct(od.device_id)) as meigou_4
|from online.ml_meigou_order_detail od inner join all_id
|on od.device_id = all_id.device_id
|where partition_date = '20181120'
......@@ -372,12 +402,37 @@ object coincidence_xinyang {
|and od.pay_time >= '2017-11-18'
""".stripMargin
)
meigou_pay_device_all.show()
meigou_4.show()
// 5.所有用户过去一年的美购数
val meigou_5 = sc.sql(
s"""
|select count(device_id) as meigou_5
|from online.ml_meigou_order_detail
|where partition_date = '20181120'
|and pay_time is not null
|and pay_time >= '2017-11-18'
""".stripMargin
)
meigou_5.show()
// 56.所有用户过去一年的美购用户数
val meigou_6 = sc.sql(
s"""
|select count(distinct(device_id)) as meigou_6
|from online.ml_meigou_order_detail
|where partition_date = '20181120'
|and pay_time is not null
|and pay_time >= '2017-11-18'
""".stripMargin
)
meigou_6.show()
//截止目前获得的与新氧重合的用户数咨询统计
/* val zixun_num_all = sc.sql(
//1.与新氧重合用户的美购咨询数
val zixun_1 = sc.sql(
s"""
|select count(ov.cl_id) as zixun_num_all
|select count(ov.cl_id) as zixun_1
|from online.tl_hdfs_maidian_view ov left join coincidence_id
|on ov.cl_id = coincidence_id.device_id
|where partition_date >= '20180501'
......@@ -385,11 +440,11 @@ object coincidence_xinyang {
|and action = 'welfare_detail_click_message'
""".stripMargin
)
zixun_num_all.show()
val zixun_device_all = sc.sql(
zixun_1.show()
//2.与新氧重合用户的美购咨询用户数
val zixun_2 = sc.sql(
s"""
|select count(distinct(ov.cl_id)) as zixun_num_all
|select count(distinct(ov.cl_id)) as zixun_2
|from online.tl_hdfs_maidian_view ov left join coincidence_id
|on ov.cl_id = coincidence_id.device_id
|where partition_date >= '20180501'
......@@ -397,31 +452,52 @@ object coincidence_xinyang {
|and action = 'welfare_detail_click_message'
""".stripMargin
)
zixun_device_all.show()
*/
val zixun_num_co = sc.sql(
zixun_2.show()
//3.已经获得应用列表用户的美购咨询次数
val zixun_3 = sc.sql(
s"""
|select count(ov.cl_id) as zixun_num_all
|select count(ov.cl_id) as zixun_3
|from online.tl_hdfs_maidian_view ov left join all_id
|on ov.cl_id = all_id.device_id
|where partition_date >= '20180501'
|and all_id.device_id is not null
|and action = 'welfare_detail_click_message'
|and ov.action = 'welfare_detail_click_message'
""".stripMargin
)
zixun_num_co.show()
val zixun_num_co_dis = sc.sql(
zixun_3.show()
//4.已经获得应用列表用户的美购咨询用户数
val zixun_4 = sc.sql(
s"""
|select count(distinct(ov.cl_id)) as zixun_num_all
|select count(distinct(ov.cl_id)) as zixun_4
|from online.tl_hdfs_maidian_view ov left join all_id
|on ov.cl_id = all_id.device_id
|where partition_date >= '20180501'
|and all_id.device_id is not null
|and ov.action = 'welfare_detail_click_message'
""".stripMargin
)
zixun_4.show()
//5.所有用户的美购咨询数
val zixun_5 = sc.sql(
s"""
|select count(cl_id) as zixun_5
|from online.tl_hdfs_maidian_view
|where partition_date >= '20180501'
|and action = 'welfare_detail_click_message'
""".stripMargin
)
zixun_5.show()
//6.所有用户的美购咨询用户数
val zixun_6 = sc.sql(
s"""
|select count(distinct(cl_id)) as zixun_6
|from online.tl_hdfs_maidian_view
|where partition_date >= '20180501'
|and action = 'welfare_detail_click_message'
""".stripMargin
)
zixun_num_co_dis.show()
zixun_6.show()
......
......@@ -187,7 +187,7 @@ object strategy_other {
//使用信息熵描述推荐系统对长尾优质物品(日记本)的挖掘能力
//使用基尼系数描述推荐系统对日记本推荐是否具有马太效应
object evaluation_indicator_ {
object evaluation_indicator {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
......@@ -226,121 +226,16 @@ object evaluation_indicator_ {
//val stat_date = GmeiConfig.getMinusNDate(1)
//println(param.date)
val partition_date = param.date.replace("-","")
val devicee_id_oldUser = sc.sql(
val diary_id = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
devicee_id_oldUser.show()
devicee_id_oldUser.createOrReplaceTempView("device_id_old")
//device_id尾号1有点击用户日记本点击数
val clk_active_1 = sc.sql(
s"""
|select '${param.date}' as stat_date, count(jd.cid_id) as clk_active_1
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.device_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'1$$'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
|select native_queue as diary_id
|from doris_prod.device_diary_queue
""".stripMargin
)
diary_id.show()
//device_id尾号1有点击用户日记本曝光数
val imp_active_1 = sc.sql(
s"""
|select '${param.date}' as stat_date, count(je.cid_id) as imp_active_1
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.device_id
|where je.cid_type = 'diary'
|and je.device_id in (select distinct(device_id) from data_feed_click where device_id regexp '1$$' and stat_date = '${param.date}')
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date}'
""".stripMargin
)
//device_id尾号1点击日记本用户数
val clk_diary_device = sc.sql(
s"""
|select '${param.date}' as stat_date, count(distinct(jd.device_id)) as clk_diary_device
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.device_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'1$$'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
""".stripMargin
)
//所有有点击用户日记本点击数
val clk_active_all = sc.sql(
s"""
|select '${param.date}' as stat_date, count(jd.cid_id) as clk_active_all
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.device_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
""".stripMargin
)
//所有有点击用户日记本曝光数
val imp_active_all = sc.sql(
s"""
|select '${param.date}' as stat_date, count(je.cid_id) as imp_active_all
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.device_id
|where je.cid_type = 'diary'
|and je.device_id in (select distinct(device_id) from data_feed_click where stat_date = '${param.date}')
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date}'
""".stripMargin
)
//策略命中用户点击日记本用户数
val clk_diary_device_cover = sc.sql(
s"""
|select '${param.date}' as stat_date,count(distinct(device_id)) as clk_diary_device_cover
|from merge_queue_table
|where device_id in (select distinct(device_id) from data_feed_click where stat_date = '${param.date}')
""".stripMargin
)
//策略命中用户总数
val device_all_cover = sc.sql(
s"""
|select '${param.date}' as stat_date,count(distinct(device_id)) as device_all_cover
|from merge_queue_table
""".stripMargin
)
val result = clk_active_1.join(imp_active_1,"stat_date")
.join(clk_active_all,"stat_date")
.join(imp_active_all,"stat_date")
.join(clk_diary_device,"stat_date")
.join(clk_diary_device_cover,"stat_date")
.join(device_all_cover,"stat_date")
result.show()
GmeiConfig.writeToJDBCTable(result, "strategy_other", SaveMode.Append)
GmeiConfig.writeToJDBCTable(diary_id, "dairy_id_queue", SaveMode.Append)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment