Commit 54e81fa9 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

修改了sql语句
parents dec44e54 e63b1501
if [[ $# -ne 2 ]];then
echo 'Usage:'$0' <startdate> <enddate>'
exit
fi
startdate=`date -d "$1" +%Y-%m-%d`
enddate=`date -d "$2" +%Y-%m-%d`
while [[ $startdate < $enddate]]
do
/opt/spark/bin/spark-submit --master spark://10.31.242.83:7077 --total-executor-cores 20 --executor-memory 3g --executor-cores 2 --driver-memory 8g --conf spark.default.parallelism=200 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --class com.gmei.jerry.strategy_clk_imp_oldUser /srv/apps/ffm-baseline/eda/feededa/target/scala-2.11/feededa-assembly-0.1.jar --env prod --date $startdate >>ctr1.log
startdate=`date -d "+1 day $startdate" +%Y-%m-%d`
done
......@@ -42,6 +42,7 @@ object WeafareStat {
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "eagle",tableName = "feed_diary_boost")
import sc.implicits._
......@@ -127,15 +128,44 @@ object WeafareStat {
)
video_count.show()
val vlog_meigou_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(page_name) as vlog_meigou_clk_num
|from online.bl_hdfs_page_view_updates
|where partition_date='${partition_date}'
|and page_name='welfare_detail'
|and referrer='diary_detail'
|and referrer_id in (select distinct(diary_id) from feed_diary_boost)
""".stripMargin
)
vlog_meigou_clk_count.show()
val vlog_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(cid_id) as vlog_clk_num
|from data_feed_click
|where stat_date='${stat_date}'
|and cid_type = 'diary'
|and cid_id in (select distinct(diary_id) from feed_diary_boost)
""".stripMargin
)
vlog_clk_count.show()
val result = video_clk_count.join(video_meigou_count,"stat_date")
.join(txt_clk_count,"stat_date")
.join(txt_meigou_count,"stat_date")
.join(video_count,"stat_date")
.join(vlog_meigou_clk_count,"stat_date")
.join(vlog_clk_count,"stat_date")
val result1 = result.withColumn("video_rate",result.col("video_meigou_count")/result.col("video_clk_count"))
val result2 = result1.withColumn("txt_rate",result.col("txt_meigou_count")/result.col("txt_clk_count"))
val result1 = result.withColumn("video_meigou_rate",result.col("video_meigou_count")/result.col("video_clk_count"))
val result2 = result1.withColumn("txt_meigou_rate",result.col("txt_meigou_count")/result.col("txt_clk_count"))
val result3 = result2.withColumn("vlog_meigou_rate",result.col("vlog_meigou_clk_num")/result.col("vlog_clk_num"))
result2.show()
result3.show()
sc.stop()
......
......@@ -13,7 +13,8 @@ object strategy_clk_imp_oldUser {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
......@@ -23,6 +24,9 @@ object strategy_clk_imp_oldUser {
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
......@@ -48,9 +52,9 @@ object strategy_clk_imp_oldUser {
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
println(stat_date)
val partition_date = stat_date.replace("-","")
// val stat_date = GmeiConfig.getMinusNDate(1)
println(param.date)
val partition_date = param.date.replace("-","")
val decive_id_oldUser = sc.sql(
s"""
|select distinct(device_id) as decive_id
......@@ -77,57 +81,54 @@ object strategy_clk_imp_oldUser {
for (strategy <- strategies){
val clk_count_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as get_clk_count_old
|select '${param.date}' as stat_date, count(cid_id) as get_clk_count_old
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'${strategy}'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${stat_date}'
|and jd.stat_date ='${param.date}'
""".stripMargin
)
clk_count_oldUser.show()
val imp_count_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as get_imp_count_old
|select '${param.date}' as stat_date, count(cid_id) as get_imp_count_old
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'${strategy}'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${stat_date}'
|and je.stat_date ='${param.date}'
""".stripMargin
)
imp_count_oldUser.show()
val clk_count_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as get_clk_count_newUser
|select '${param.date}' as stat_date, count(cid_id) as get_clk_count_newUser
|from data_feed_click jd inner join device_id_newUser
|on jd.device_id = device_id_newUser.decive_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'${strategy}'
|and jd.device_id not in (select device_id from bl_device_list)
|and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${stat_date}'
|and jd.stat_date ='${param.date}'
""".stripMargin
)
clk_count_newUser.show()
val imp_count_newUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as get_imp_count_newUser
|select '${param.date}' as stat_date, count(cid_id) as get_imp_count_newUser
|from data_feed_exposure je inner join device_id_newUser
|on je.device_id = device_id_newUser.decive_id
|where je.cid_type = 'diary'
|and je.device_id regexp'${strategy}'
|and je.device_id not in (select device_id from bl_device_list)
|and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${stat_date}'
|and je.stat_date ='${param.date}'
""".stripMargin
)
imp_count_newUser.show()
......
package com.gmei
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.mutable.ArrayBuffer
object testt {
def main(args: Array[String]): Unit ={
val dateArray2 = get_date()
println(dateArray2(0))
for (elem <- dateArray2) {
println(elem)
}
}
def get_date(): ArrayBuffer[String] ={
val startTime = "2017-12-01"
val endTime = "2017-12-10"
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val dateFiled = Calendar.DAY_OF_MONTH
var beginDate = dateFormat.parse(startTime)
val endDate = dateFormat.parse(endTime)
val calendar = Calendar.getInstance()
calendar.setTime(beginDate)
val dateArray: ArrayBuffer[String] = ArrayBuffer()
while (beginDate.compareTo(endDate) <= 0) {
dateArray += dateFormat.format(beginDate)
calendar.add(dateFiled, 1)
beginDate = calendar.getTime
}
//println(dateArray)
dateArray
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment