Commit 5acbce0e authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

修改测试文件
parents 0e78fd85 06679a72
......@@ -34,6 +34,7 @@ object GmeiConfig extends Serializable {
val sparkConf = new SparkConf
sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.sql.broadcastTimeout", "1000")
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[3]")
......
package com.gmei
import java.io.Serializable
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
object Recommendation_strategy_all {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
import sc.implicits._
//val stat_date = GmeiConfig.getMinusNDate(1)
//println(param.date)
val partition_date = param.date.replace("-","")
val decive_id_oldUser = sc.sql(
s"""
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
decive_id_oldUser.createOrReplaceTempView("device_id_old")
val clk_count_oldUser_Contrast = sc.sql(
s"""
|select '${param.date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'1$$'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
""".stripMargin
)
val imp_count_oldUser_Contrast = sc.sql(
s"""
|select '${param.date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'1$$'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date}'
""".stripMargin
)
val clk_count_oldUser_all = sc.sql(
s"""
|select '${param.date}' as stat_date, count(cid_id) as clk_count_oldUser_all
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
""".stripMargin
)
val imp_count_oldUser_all = sc.sql(
s"""
|select '${param.date}' as stat_date, count(cid_id) as imp_count_oldUser_all
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.decive_id
|where je.cid_type = 'diary'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date}'
""".stripMargin
)
//获取策略命中用户device_id
val device_id_cover = sc.sql(
s"""
|select distinct(device_id) as device_id
|from merge_queue_table
""".stripMargin
)
device_id_cover.createOrReplaceTempView("device_id_cover_older")
val clk_count_oldUser_Cover = sc.sql(
s"""
|select '${param.date}' as stat_date, count(cid_id) as clk_count_oldUser_Cover
|from data_feed_click jd inner join device_id_cover_older
|on jd.device_id = device_id_cover_older.device_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${param.date}'
""".stripMargin
)
val imp_count_oldUser_Cover = sc.sql(
s"""
|select '${param.date}' as stat_date, count(cid_id) as imp_count_oldUser_Cover
|from data_feed_exposure je inner join device_id_cover_older
|on je.device_id = device_id_cover_older.device_id
|where je.cid_type = 'diary'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${param.date}'
""".stripMargin
)
val result = clk_count_oldUser_Contrast.join(imp_count_oldUser_Contrast,"stat_date")
.join(clk_count_oldUser_all,"stat_date")
.join(imp_count_oldUser_all,"stat_date")
.join(clk_count_oldUser_Cover,"stat_date")
.join(imp_count_oldUser_Cover,"stat_date")
result.show()
GmeiConfig.writeToJDBCTable(result, "Recommendation_strategy_temp", SaveMode.Append)
}
}
}
......@@ -40,7 +40,7 @@ object WeafareStat {
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_video")
ti.tidbMapTable(dbName = "eagle",tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "eagle",tableName = "feed_diary_boost")
......
......@@ -60,6 +60,13 @@ object strategy_clk_imp_oldUser {
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
......@@ -71,6 +78,13 @@ object strategy_clk_imp_oldUser {
|select distinct(device_id) as decive_id
|from online.ml_device_day_active_status
|where active_type != '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
......
package com.gmei
import java.io.Serializable
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import java.io._
object temp_analysis {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
//println(param.date)
val partition_date = stat_date.replace("-","")
//获取策略命中用户device_id
val device_id_cover = sc.sql(
s"""
|select distinct(device_id) as device_id
|from merge_queue_table
""".stripMargin
)
device_id_cover.createOrReplaceTempView("device_id_cover_older")
val diary_pv = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(params["business_id"]) as diary_pv,count(distinct(cl_id)) as device_num_diary
|from online.tl_hdfs_maidian_view
|where action="page_view"
|and params["page_name"]="diary_detail"
|and (params["out"]-params["in"])<7200
|and partition_date ='${partition_date}'
""".stripMargin
)
val meigou_pv = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(params["business_id"]) as meigou_pv,count(distinct(cl_id)) as device_num_meigou
|from online.tl_hdfs_maidian_view
|where action="page_view"
|and params["page_name"]="welfare_detail"
|and (params["out"]-params["in"])<7200
|and partition_date ='${partition_date}'
""".stripMargin
)
val result = diary_pv.join(meigou_pv,"stat_date")
result.show()
GmeiConfig.writeToJDBCTable(result, "diary_pv", SaveMode.Append)
// result.select("stat_date","diary_pv","device_num_diary","meigou_pv","device_num_meigou").write.mode(SaveMode.Append).save("/data2/test.txt")
}
}
}
......@@ -7,29 +7,28 @@ import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import com.gmei.GmeiConfig.{writeToJDBCTable,getMinusNDate}
object testt {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev") extends AbstractParams[Params] with Serializable
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("testt")
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
//opt[String] ("date")
// .text(s"the date you used")
// .action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.testt ./target/scala-2.11/feededa-assembly-0.1.jar \
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
......@@ -48,25 +47,34 @@ object testt {
ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
val strategies = Seq("3$","4$","5$","6$","7$","8$","c$","d$","e$","A$","B$","C$","D$")
for (strategy <- strategies){
println(strategy)
val get_data_dura = sc.sql(
val view_count = sc.sql(
s"""
|select partition_date, sum(params['duration']) as total_dur,count(distinct(cl_id)) as num
|select params["business_id"] as diary_id,(params["out"]-params["in"]) as dur_time
|from online.tl_hdfs_maidian_view
|where action="on_app_session_over"
|where action="page_view"
|and params["page_name"]="diary_detail"
|and (params["out"]-params["in"])<7200
|and partition_date >='20180901'
""".stripMargin
)
get_data_dura.show()
view_count.show()
view_count.createOrReplaceTempView("temp")
GmeiConfig.writeToJDBCTable(view_count, "avg", SaveMode.Overwrite)
val result = view_count
result.show()
}
}
}
}
}
#该文件统计ffm\node2vec\搜索推荐三个策略merge后的全量用户的ctr
#device_id尾号为1的用户为对照组
#该表中未能去除每天新用户
# -*- coding: UTF-8 -*-
from utils import con_sql,get_yesterday_date,get_between_day
import time
import sys
OUTPUT_PATH = "/data2/models/eda/gray_stat/"
#获取过滤机构用户和黑用户名单后的曝光量
def get_imp_count_all(stragety,sta_date):
sql = "select count(cid_id) from jerry_prod.data_feed_exposure where " \
"cid_type = 'diary' " \
"and device_id regexp'{}$' " \
"and device_id not in (select distinct(device_id) from jerry_test.bl_device_list) " \
"and device_id not in (select device_id from jerry_prod.blacklist) and stat_date='{}'".format(stragety,sta_date)
imp_count_all = con_sql(sql)[0][0]
return imp_count_all
def get_clk_count_all(stragety,sta_date):
sql = "select count(cid_id) from jerry_prod.data_feed_click " \
"where (cid_type = 'diary' or cid_type = 'diary_video') " \
"and device_id regexp'{}$' " \
"and device_id not in (select device_id from jerry_test.bl_device_list) " \
"and device_id not in (select device_id from jerry_prod.blacklist) " \
"and stat_date='{}'".format(stragety,sta_date)
clk_count_all = con_sql(sql)[0][0]
return clk_count_all
if __name__ == '__main__':
yesterday= get_yesterday_date()
if len(sys.argv) != 2:
print("usage: python recommendation_strategy_indicator.py date")
date1 = sys.argv[1]
date_list = get_between_day(date1, yesterday)
stragety_list = ['[1|2]', '[3|4]', '[5|6]', '[7|8]']
stragety_l = ['1', '3', '5', '7']
start_time = time.time()
for my_date in date_list:
result1_imp=[]
result2_imp=[]
result1_clk = []
result2_clk = []
print ("开始获取{}数据".format(my_date))
for i in stragety_list:
result1 = get_clk_count_all(i, my_date)
result1_clk.append(result1)
result1_all=get_imp_count_all(i,my_date)
result1_imp.append(result1_all)
for j in stragety_l:
result2 = get_clk_count(j, my_date)
result2_clk.append(result2)
result2_all=get_imp_count(j,my_date)
result2_imp.append(result2_all)
num_click_2=result1_clk[0]-result2_clk[0]
num_click_4=result1_clk[1]-result2_clk[1]
num_click_6 =result1_clk[2]-result2_clk[2]
num_click_8 =result1_clk[3]-result2_clk[3]
num_imp_2 = result1_imp[0] - result2_imp[0]
num_imp_4 = result1_imp[1] - result2_imp[1]
num_imp_6 = result1_imp[2] - result2_imp[2]
num_imp_8 = result1_imp[3] - result2_imp[3]
ctr_12 = round(result1_clk[0]/result1_imp[0],6)
ctr_34 = round(result1_clk[1]/result1_imp[1],6)
ctr_56 = round(result1_clk[2]/result1_imp[2],6)
ctr_78 = round(result1_clk[3]/result1_imp[3],6)
ctr_1 =round(result2_clk[0]/result2_imp[0],6)
ctr_2 =round(num_click_2/num_imp_2,6)
ctr_3 =round(result2_clk[1]/result2_imp[1],6)
ctr_4 =round(num_click_4/num_imp_4,6)
ctr_5 =round(result2_clk[2]/result2_imp[2],6)
ctr_6 =round(num_click_6/num_imp_6,6)
ctr_7 =round(result2_clk[3]/result2_imp[3],6)
ctr_8 =round(num_click_8/num_imp_8,6)
print(result1_clk[0])
print(result1_imp[0])
print(ctr_12)
print(ctr_34)
output_path = OUTPUT_PATH + "recommendation.csv"
with open(output_path, 'a+') as f:
line = my_date.replace('-','')+','+str(result1_clk[0])+','+str(result1_clk[1])+','+str(result1_clk[2])+','+str(result1_clk[3])\
+','+str(result1_imp[0])+','+str(result1_imp[1])+','+str(result1_imp[2])+','+str(result1_imp[3])+','\
+str(result2_clk[0])+','+str(result2_clk[1])+','+str(result2_clk[2])+','+str(result2_clk[3])\
+','+str(result2_imp[0])+','+str(result2_imp[1])+','+str(result2_imp[2])+','+str(result2_imp[3]) \
+ ',' + str(num_click_2)+','+str(num_click_4)+','+str(num_click_6)+','+str(num_click_8) \
+ ',' + str(num_imp_2)+','+str(num_imp_4)+','+str(num_imp_6)+','+str(num_imp_8) \
+ ',' + str(ctr_12)+ ',' + str(ctr_34)+ ',' + str(ctr_56)+ ',' + str(ctr_78) \
+ ',' + str(ctr_1)+ ',' + str(ctr_2)+ ',' + str(ctr_3)+ ',' + str(ctr_4) \
+ ',' + str(ctr_5)+ ',' + str(ctr_6)+ ',' + str(ctr_7)+ ',' + str(ctr_8)+'\n'
f.write(line)
end_time = time.time()
print("程序执行时间:{}s".format(end_time - start_time))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment