package com.gmei
import java.io.Serializable
import java.text.SimpleDateFormat
import breeze.linalg.split
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
//import org.apache.spark.sql.{Row, SaveMode, SparkSession, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import com.github.nscala_time.time.Imports._
import java.text.SimpleDateFormat
import java.util.Date
import scala.util.parsing.json.JSON
object temp_count {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
// val ti = new TiContext(sc)
sc.sql("use jerry_prod")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
val decive_id_oldUser = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type = '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown')
|and partition_date ='${partition_date}'
""".stripMargin
)
decive_id_oldUser.createOrReplaceTempView("device_id_old")
val clk_count_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(jd.cid_id) as clk_count_oldUser
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.device_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id not in (select device_id from blacklist)
|and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
|and jd.stat_date ='${stat_date}'
""".stripMargin
)
val imp_count_oldUser = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.device_id
|where je.cid_type = 'diary'
|and je.device_id not in (select device_id from blacklist)
|and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
|and je.stat_date ='${stat_date}'
""".stripMargin
)
val clk_count_all = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as clk_count_all
|from data_feed_click
|where (cid_type = 'diary' or cid_type = 'diary_video')
|and device_id not in (select device_id from blacklist)
|and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
|and stat_date ='${stat_date}'
""".stripMargin
)
val imp_count__all = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as imp_count_all
|from data_feed_exposure
|where cid_type = 'diary'
|and device_id not in (select device_id from blacklist)
|and city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
|and stat_date ='${stat_date}'
""".stripMargin
)
val clk_count_oldUser_Contrast = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as clk_count_oldUser_Contrast
|from data_feed_click jd inner join device_id_old
|on jd.device_id = device_id_old.device_id
|where (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
|and jd.device_id regexp'1$$'
|and jd.device_id not in (select device_id from blacklist)
|and jd.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
|and jd.stat_date ='${stat_date}'
""".stripMargin
)
val imp_count_oldUser_Contrast = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as imp_count_oldUser_Contrast
|from data_feed_exposure je inner join device_id_old
|on je.device_id = device_id_old.device_id
|where je.cid_type = 'diary'
|and je.device_id regexp'1$$'
|and je.device_id not in (select device_id from blacklist)
|and je.city_id in ("huzhou","liuan","zhenjiang","taizhou","jiaxing","weihai","maanshan","changzhou","shantou","nantong","yantai","wuxi","huhehaote","taiyuan","xining","yinchuan")
|and je.stat_date ='${stat_date}'
""".stripMargin
)
val result1 = clk_count_oldUser.join(imp_count_oldUser,"stat_date")
.join(clk_count_all,"stat_date")
.join(imp_count__all,"stat_date")
.join(clk_count_oldUser_Contrast,"stat_date")
.join(imp_count_oldUser_Contrast,"stat_date")
result1.show()
// GmeiConfig.writeToJDBCTable(result1, "ffm_diary_ctr", SaveMode.Append)
// GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="ffm_diary_ctr",SaveMode.Append)
println("开始写入")
// GmeiConfig.writeToJDBCTable("jerry.jdbcuri",result1, table="ffm_diary_ctr",SaveMode.Append)
// println("写入完成")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result1, table="ffm_diary_ctr",SaveMode.Append)
println("写入完成")
}
}
}
object Repeated_content_recommendation {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
// val ti = new TiContext(sc)
sc.sql("use jerry_prod")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
// val stat_date = GmeiConfig.getMinusNDate(1)
val stat_date = param.date
val partition_date = stat_date.replace("-","")
val agency_id = sc.sql(
s"""
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|select distinct(device_id)
|from blacklist
""".stripMargin
)
agency_id.createOrReplaceTempView("agency_id")
val device_id_oldUser = sc.sql(
s"""
|select distinct(om.device_id) as device_id
|from online.ml_device_day_active_status om left join agency_id
|on om.device_id = agency_id.device_id
|where om.active_type = '4'
|and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown')
|and om.partition_date ='${partition_date}'
|and agency_id.device_id is null
""".stripMargin
)
device_id_oldUser.createOrReplaceTempView("device_id_old")
device_id_oldUser.show()
val device_id_newUser = sc.sql(
s"""
|select distinct(om.device_id) as device_id
|from online.ml_device_day_active_status om left join agency_id
|on om.device_id = agency_id.device_id
|where om.active_type != '4'
|and om.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown')
|and om.partition_date ='${partition_date}'
|and agency_id.device_id is null
""".stripMargin
)
device_id_newUser.createOrReplaceTempView("device_id_new")
device_id_newUser.show()
val exp_diary_new = sc.sql(
s"""
|select concat_ws('|',de.device_id,de.cid_id)
|from data_feed_exposure de inner join device_id_new
|on de.device_id=device_id_new.device_id
|where de.cid_type = 'diary'
|and de.stat_date ='${stat_date}'
""".stripMargin
)
val get_result_new =exp_diary_new.rdd.map((_, 1)).reduceByKey(_ + _)
.sortBy(_._2,false)
val more_than2_new=get_result_new.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y)
println(more_than2_new)
val all_new =get_result_new.map(_._2).reduce((x,y)=>x+y)
println(all_new)
val repeated_rate_new= more_than2_new / all_new.toDouble
println(repeated_rate_new)
val exp_diary_old = sc.sql(
s"""
|select concat_ws('|',de.device_id,de.cid_id)
|from data_feed_exposure de inner join device_id_old
|on de.device_id=device_id_old.device_id
|where de.cid_type = 'diary'
|and de.stat_date ='${stat_date}'
""".stripMargin
)
val get_result_old =exp_diary_old.rdd.map((_, 1)).reduceByKey(_ + _)
.sortBy(_._2,false)
val more_than2_old=get_result_old.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y)
println(more_than2_old)
val all_old =get_result_old.map(_._2).reduce((x,y)=>x+y)
println(all_old)
val repeated_rate_old= more_than2_old / all_old.toDouble
println(repeated_rate_old)
val result2=List((stat_date,more_than2_old,all_old,more_than2_new,all_new))
val df2 = sc.createDataFrame(result2).toDF("stat_date","old_rep_count","old_imp_all","new_rep_count","new_imp_all")
// GmeiConfig.writeToJDBCTable(df2, table = "Repeated_evaluation_indicator", SaveMode.Append)
println("开始写入")
// GmeiConfig.writeToJDBCTable("jerry.jdbcuri",df2, table="Repeated_evaluation_indicator",SaveMode.Append)
// println("写入完成")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df2, table="Repeated_evaluation_indicator",SaveMode.Append)
println("写入完成")
// val exp_diary_old = sc.sql(
// s"""
// |select concat_ws('|',de.device_id,de.cid_id)
// |from data_feed_exposure de inner join device_id_old
// |where de.cid_type = 'diary'
// |and de.stat_date ='${stat_date}'
// """.stripMargin
// )
// val get_result_old =exp_diary_old.rdd.map((_, 1)).reduceByKey(_ + _)
// .sortBy(_._2,false)
//
// val more_than2_old=get_result_old.filter(_._2 >=2).map(_._2).reduce((x,y)=>x+y)
// println(more_than2_old)
// val all_old =get_result_old.map(_._2).reduce((x,y)=>x+y)
// println(all_old)
// val repeated_rate_old= more_than2_old / all_old.toDouble
// println(repeated_rate_old)
//
//
// val result2=List((stat_date,more_than2_old,all_old))
// val df2 = sc.createDataFrame(result2).toDF("stat_date","old_rep_count","old_imp_all")
//
// GmeiConfig.writeToJDBCTable(df2, table = "Repeated_evaluation_indicator_old", SaveMode.Append)
// val temp=get_result.collect()
// for (i <- 0 until 30 ) {
// println(temp(i))
// }
}
}
}
object Repeated_content_recommendation_moreday {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
// val ti = new TiContext(sc)
sc.sql("use jerry_prod")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = "2019-01-16"
// val partition_date = stat_date.replace("-","")
val now= new Date()
// val stat_date=param.date
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val date = dateFormat.format(now.getTime - 86400000L * 8)
val yesterday=dateFormat.format(now.getTime- 86400000L)
val exp_diary = sc.sql(
s"""
|select stat_date,device_id,concat_ws(',',collect_set(distinct cid_id)) as expoure_diary
|from data_feed_exposure_precise
|where cid_type = 'diary'
|and stat_date >='${date}'
|and device_id not in (select device_id from blacklist)
|group by device_id,stat_date
""".stripMargin
).rdd.map(row=>(row(0).toString,row(1).toString,row(2).toString)).map(row=>(row._2,row._3)).groupByKey()
.filter(x => x._2.size >1)
//打印结果
// val temp=exp_diary.take(10).foreach(println)
// val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray)
// .map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2))
//统计每个用户重复日记个数
val count_imp=exp_diary.map(_._2).map(row=>row.flatMap(x=>x.split(",")).toArray)
.map(x => (x,x)).map(x => (x._1.distinct.size,x._2.size)).map(x => (x._2-x._1,x._2)).collect()
val fenmu = count_imp.map(x => x._1).reduce((x,y) => x+y)
val fenzi = count_imp.map(x => x._2).reduce((x,y) => x+y)
val repeated_rate= fenmu / fenzi.toDouble
val result=List((yesterday,repeated_rate))
println(result)
val df_result = sc.createDataFrame(result)
// GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append)
// GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="Repeated_content_recommendation_moreday",SaveMode.Append)
println("开始写入")
// GmeiConfig.writeToJDBCTable("jerry.jdbcuri",df_result, table="Repeated_content_recommendation_moreday",SaveMode.Append)
// println("写入完成")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="Repeated_content_recommendation_moreday",SaveMode.Append)
println("写入完成")
// exp_diary.show()
// exp_diary.createOrReplaceTempView("exp_diary")
// GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append)
}
}
}
object GetHiveSearchData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
case class GetHiveSearchData_temp(stat_date:String,diaryExposureVal:String,diaryClickNum:String,meigouExposureVal:String,meigouClickNum:String,searchResultExposureVal:String,searchResultClickNum:String,searchDoctorExposureVal:String,searchDoctorClickNum:String,searchHospitalExposureVal:String,searchHospitalClickNum:String)
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
// val ti = new TiContext(sc)
sc.sql("use jerry_prod")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "GetHiveSearchData_CTR")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date
val partition_date = stat_date.replace("-","")
val strDiaryExposureAction = "/api/search/v2/diary"
val strDiaryClickAction = "search_result_more_diary_click_item" //需要确认
var (diaryExposureVal,diaryClickNum,diaryExposureMapCount,diaryExposureFilterCount) = GetSearchResultData(sc,strDiaryExposureAction,strDiaryClickAction,stat_date)
val strMeigouExposureAction = "/api/search/v2/service"
val strMeigouClickAction = "search_result_welfare_click_item"
var (meigouExposureVal,meigouClickNum,meigouExposureMapCount,meigouExposureFilterCount) = GetSearchResultData(sc,strMeigouExposureAction,strMeigouClickAction,stat_date)
val strSearchResultExposureAction = "/api/search/v2/content"
val strSearchResultClickAction = "search_result_click_diary_item" //需要确认
var (searchResultExposureVal,searchResultClickNum,searchResultExposureMapCount,searchResultExposureFilterCount) = GetSearchResultData(sc,strSearchResultExposureAction,strSearchResultClickAction,stat_date)
val strSearchDoctorExposureAction = "/api/search/v2/doctor"
val strSearchDoctorClickAction = "search_result_doctor_click_item"
var (searchDoctorExposureVal,searchDoctorClickNum,searchDoctorExposureMapCount,searchDoctorExposureFilterCount) = GetSearchResultData(sc,strSearchDoctorExposureAction,strSearchDoctorClickAction,stat_date)
val strSearchHospitalExposureAction = "/api/search/v2/hospital"
val strSearchHospitalClickAction = "search_result_hospital_click_item"
var (searchHospitalExposureVal,searchHospitalClickNum,searchHospitalExposureMapCount,searchHospitalExposureFilterCount) = GetSearchResultData(sc,strSearchHospitalExposureAction,strSearchHospitalClickAction,stat_date)
val jigou_id = sc.sql(
s"""
|SELECT cl_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date>='20180402' AND partition_date<'${partition_date}'
|AND pv_ratio>=0.95
|UNION ALL
|SELECT cl_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date>='20171101' AND partition_date<'${partition_date}'
|AND pv_ratio>=0.95
|UNION ALL
|select device_id as cl_id from blacklist
""".stripMargin
)
jigou_id.createOrReplaceTempView("jigou_id")
val diary_clickSql = sc.sql(
s"""
|select
|count(1) click_num
|from online.tl_hdfs_maidian_view ov left join jigou_id
|on ov.cl_id = jigou_id.cl_id
|where ov.partition_date='${partition_date}'
|and ov.action='on_click_diary_card'
|and ov.params['page_name']='search_result_diary'
|and jigou_id.cl_id is null
""".stripMargin
)
val diary_clickArray = diary_clickSql.collect()
val diary_click_num = diary_clickArray(0).getAs[Long]("click_num")
val content_diary_clickSql = sc.sql(
s"""
|select
|count(1) click_num
|from online.tl_hdfs_maidian_view ov left join jigou_id
|on ov.cl_id = jigou_id.cl_id
|where ov.partition_date='${partition_date}'
|and ov.action='on_click_diary_card'
|and ov.params['page_name']='search_result_more'
|and jigou_id.cl_id is null
""".stripMargin
)
val content_diary_clickArray:Array[Row] = content_diary_clickSql.collect()
val content_diary_click_num:Long = content_diary_clickArray(0).getAs[Long]("click_num")
println("searchDiaryExposureVal:" + diaryExposureVal + "\tsearchDiaryClickNum:" + diary_click_num + "\tclickRate:" + (diary_click_num.floatValue()/diaryExposureVal.floatValue()).formatted("%.2f"))
println("searchMeigouExposureVal:" + meigouExposureVal + "\tsearchMeigouClickNum:" + meigouClickNum + "\tclickRate:" + (meigouClickNum.floatValue()/meigouExposureVal.floatValue()).formatted("%.2f"))
println("searchResultExposureVal:" + searchResultExposureVal + "\tsearchResultClickNum:" + (searchResultClickNum+content_diary_click_num) + "\tclickRate:" + ((searchResultClickNum+content_diary_click_num).floatValue()/searchResultExposureVal.floatValue()).formatted("%.2f"))
println("searchDoctorExposureVal:" + searchDoctorExposureVal + "\tsearchDoctorClickNum:" + searchDoctorClickNum + "\tclickRate:" + (searchDoctorClickNum.floatValue()/searchDoctorExposureVal.floatValue()).formatted("%.2f"))
println("searchHospitalExposureVal:" + searchHospitalExposureVal + "\tsearchHospitalClickNum:" + searchHospitalClickNum + "\tclickRate:" + (searchHospitalClickNum.floatValue()/searchHospitalExposureVal.floatValue()).formatted("%.2f"))
// val add_data = sc.sql(
// s"""
// |insert into table GetHiveSearchData_CTR(stat_date,diaryExposureVal,diaryClickNum,meigouExposureVal,meigouClickNum,searchResultExposureVal,searchResultClickNum,searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum)
// |values ('${stat_date}','${diaryExposureVal}','${(diary_click_num+diaryClickNum)}','${meigouExposureVal}','${meigouClickNum}','${searchResultExposureVal}','${(searchResultClickNum+content_diary_click_num)}','${searchDoctorExposureVal}','${searchDoctorClickNum}','${searchHospitalExposureVal}','${searchHospitalClickNum}')
// """.stripMargin
// )
import sc.implicits._
val result=List((stat_date,diaryExposureVal,(diary_click_num+diaryClickNum),meigouExposureVal,meigouClickNum,searchResultExposureVal,(searchResultClickNum+content_diary_click_num),searchDoctorExposureVal,searchDoctorClickNum,searchHospitalExposureVal,searchHospitalClickNum))
// val df_result = sc.createDataFrame(result).map(x=>GetHiveSearchData_temp(x(0).toString,x(1).toString,x(2).toString,x(3).toString,x(4).toString,x(5).toString,x(6).toString,x(7).toString,x(8).toString,x(9).toString,x(10).toString)).toDF()
val df_result = sc.sparkContext.parallelize(result).map(x=>(x._1,x._2,x._3,x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11)).toDF("stat_date","diaryExposureVal","diaryClickNum","meigouExposureVal","meigouClickNum","searchResultExposureVal","searchResultClickNum","searchDoctorExposureVal",
"searchDoctorClickNum","searchHospitalExposureVal","searchHospitalClickNum")
//
// GmeiConfig.writeToJDBCTable(df_result, table = "GetHiveSearchData_CTR", SaveMode.Append)
// GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="GetHiveSearchData_CTR",SaveMode.Append)
println("开始写入")
// GmeiConfig.writeToJDBCTable("jerry.jdbcuri",df_result, table="GetHiveSearchData_CTR",SaveMode.Append)
// println("写入完成")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",df_result, table="GetHiveSearchData_CTR",SaveMode.Append)
println("写入完成")
}
def GetSearchResultData(spark: SparkSession, strExposureAction:String, strClickAction:String,stat_date:String) = {
val partition_date = stat_date.replace("-","")
val exposureAccum = spark.sparkContext.longAccumulator("search exposure data")
val jigou_id = spark.sql(
s"""
|SELECT cl_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date>='20180402' AND partition_date<'${partition_date}'
|AND pv_ratio>=0.95
|UNION ALL
|SELECT cl_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date>='20171101' AND partition_date<'${partition_date}'
|AND pv_ratio>=0.95
|UNION ALL
|select device_id as cl_id from blacklist
""".stripMargin
)
jigou_id.createOrReplaceTempView("jigou_id")
val exposureSql = spark.sql(
s"""
|select action,user_id,city_id,app
|from online.tl_hdfs_backend_view ov left join jigou_id
|on ov.cl_id = jigou_id.cl_id
|where ov.action='$strExposureAction'
|and ov.partition_date='${partition_date}'
|and jigou_id.cl_id is null
""".stripMargin
)
val exposureMapResult = exposureSql.rdd.map(row => {
//val jsonObj = JSON.parseFull(row.getAs[Map[String,Any]]("app").toString())
val rowAppFieldMap:Map[String,Any] = row.getAs[Map[String,Any]]("app")
if (rowAppFieldMap.nonEmpty)
{
// jsonMap:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String,Any]]
if (rowAppFieldMap.contains("exposure_data")){
val exposure_data_lists:List[Any] = JSON.parseFull(rowAppFieldMap("exposure_data").toString).get.asInstanceOf[List[Any]]
if (exposure_data_lists.length > 0){
exposure_data_lists.foreach(exposure_item=>{
if (exposure_item!=None && exposure_item.toString.nonEmpty){
val exposureItemMap:Map[String,Any] = exposure_item.asInstanceOf[Map[String,Any]]
if (exposureItemMap.contains("list_ids")){
//val exposure_list_ids:List[Any] = exposureItemMap.get("list_ids").get.asInstanceOf[List[Any]]
val exposure_list_ids:List[Any] = exposureItemMap("list_ids").asInstanceOf[List[Any]]
exposureAccum.add(exposure_list_ids.length)
}
}else{
None
}
})
exposure_data_lists
}else{
None
}
}
}else{
None
}
})
//must add cache
exposureMapResult.cache()
val exposureFilterResult = exposureMapResult.filter(_.!=(None))
//val exposureArray:Array[Any] = exposureFilterResult.collect()
//exposureArray.foreach(item => println(item.toString))
val exposureMapCount:Long = exposureMapResult.count()
val exposureFilterCount:Long = exposureFilterResult.count()
val clickSql = spark.sql(
s"""
|select
|count(1) click_num
|from online.tl_hdfs_maidian_view ov left join jigou_id
|on ov.cl_id = jigou_id.cl_id
|where ov.partition_date='${partition_date}'
|and ov.action='$strClickAction'
|and jigou_id.cl_id is null
""".stripMargin
)
val clickArray:Array[Row] = clickSql.collect()
val click_num:Long = clickArray(0).getAs[Long]("click_num")
(exposureAccum.value,click_num,exposureMapCount,exposureFilterCount)
}
// GmeiConfig.writeToJDBCTable(df_result, table = "Repeated_content_recommendation_moreday", SaveMode.Append)
// exp_diary.show()
// exp_diary.createOrReplaceTempView("exp_diary")
// GmeiConfig.writeToJDBCTable(df, table = "Repeated_evaluation_indicator_moreday", SaveMode.Append)
}
}
object find_reason {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
// val ti = new TiContext(sc)
sc.sql("use jerry_prod")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure_precise")
// val stat_date = GmeiConfig.getMinusNDate(1)
val stat_date=param.date
val partition_date = stat_date.replace("-","")
//机构id
val blacklist = sc.sql(
s"""
|select device_id from blacklist
""".stripMargin
)
blacklist.createOrReplaceTempView("blacklist")
val agency_id = sc.sql(
s"""
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
""".stripMargin
)
// agency_id.show()
agency_id.createOrReplaceTempView("agency_id")
//每日新用户
val device_id_newUser = sc.sql(
s"""
|select distinct(os.device_id) as device_id
|from online.ml_device_day_active_status os left join blacklist
|on os.device_id = blacklist.device_id
|where os.active_type != '4'
|and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown')
|and os.partition_date ='${partition_date}'
|and blacklist.device_id is null
""".stripMargin
)
// device_id_newUser.show()
device_id_newUser.createOrReplaceTempView("device_id_new")
//每日老用户
val device_id_oldUser = sc.sql(
s"""
|select distinct(os.device_id) as device_id
|from online.ml_device_day_active_status os left join blacklist
|on os.device_id=blacklist.device_id
|where os.active_type = '4'
|and os.first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03','','unknown')
|and os.partition_date ='${partition_date}'
|and blacklist.device_id is null
""".stripMargin
)
// device_id_oldUser.show()
device_id_oldUser.createOrReplaceTempView("device_id_old")
val all_clk = sc.sql(
s"""
|select ov.cl_id as device_id
|from online.tl_hdfs_maidian_view ov left join agency_id
|on ov.cl_id = agency_id.device_id
|where ov.action = 'on_click_diary_card'
|and ov.cl_id != "NULL"
|and ov.params['tab_name'] = '精选'
|and ov.params['page_name'] = 'home'
|and ov.partition_date='${partition_date}'
|and agency_id.device_id is null
""".stripMargin
)
// all_clk.show()
all_clk.createOrReplaceTempView("all_clk_diary_card")
//1.当天老用户中的点击用户数
val old_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as old_clk_count
|from all_clk_diary_card oc inner join device_id_old
|on oc.device_id = device_id_old.device_id
""".stripMargin
)
// old_clk_count.show()
//1.1有点击的老用户
val old_clk_device = sc.sql(
s"""
|select distinct(oc.device_id) as device_id
|from all_clk_diary_card oc inner join device_id_old
|on oc.device_id = device_id_old.device_id
""".stripMargin
)
old_clk_device.createOrReplaceTempView("old_clk_device")
//1.1无点击的老用户
val old_noclk_device = sc.sql(
s"""
|select device_id
|from device_id_old
|except
|select device_id
|from old_clk_device
""".stripMargin
)
old_noclk_device.show()
//2.当天新用户中的点击用户数
val new_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(oc.device_id)) as new_clk_count
|from all_clk_diary_card oc inner join device_id_new
|on oc.device_id = device_id_new.device_id
""".stripMargin
)
//2.1 有点击的新用户
val new_clk_device = sc.sql(
s"""
|select distinct(oc.device_id) as device_id
|from all_clk_diary_card oc inner join device_id_new
|on oc.device_id = device_id_new.device_id
""".stripMargin
)
new_clk_device.createOrReplaceTempView("new_clk_device")
//3.当天老用户数
val old_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(dio.device_id)) as old_count
|from device_id_old dio left join agency_id
|on dio.device_id = agency_id.device_id
|where agency_id.device_id is null
""".stripMargin
)
//4.当天新用户数
val new_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(din.device_id)) as new_count
|from device_id_new din left join agency_id
|on din.device_id = agency_id.device_id
|where agency_id.device_id is null
""".stripMargin
)
//5.有点击老用户的曝光数
val exp_clkold_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(dp.device_id) as imp_clkold_count
|from data_feed_exposure_precise dp inner join old_clk_device
|on dp.device_id = old_clk_device.device_id
|where stat_date='${stat_date}'
|group by stat_date
""".stripMargin
)
//6.有点击新用户的曝光数
val exp_clknew_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(dp.device_id) as imp_clknew_count
|from data_feed_exposure_precise dp inner join new_clk_device
|on dp.device_id = new_clk_device.device_id
|where stat_date='${stat_date}'
|group by stat_date
""".stripMargin
)
val result = old_clk_count.join(new_clk_count,"stat_date")
.join(old_count,"stat_date")
.join(new_count,"stat_date")
.join(exp_clkold_count,"stat_date")
.join(exp_clknew_count,"stat_date")
// GmeiConfig.writeToJDBCTable(result, "device_clk_imp_reason", SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="device_clk_imp_reason",SaveMode.Append)
println("写入完成")
}
}
}