package com.gmei
import java.io.Serializable
import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
object WeafareStat {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "eagle",tableName = "feed_diary_boost")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
println(stat_date)
val video_cids = sc.sql(
s"""
|select distinct(cid_id) as cid_id
|from data_feed_click
|where cid_type = 'diary'
|and cid_id in (select cid from diary_video where stat_date='${stat_date}')
|and stat_date ='${stat_date}'
""".stripMargin
)
// video_cids.show()
video_cids.createOrReplaceTempView("tmp1")
val txt_cids = sc.sql(
s"""
|select distinct(cid_id) as cid_id
|from data_feed_click
|where cid_type = 'diary'
|and cid_id not in (select cid from diary_video where stat_date='${stat_date}')
|and stat_date ='${stat_date}'
""".stripMargin
)
// txt_cids.show()
txt_cids.createOrReplaceTempView("tmp2")
val partition_date = stat_date.replace("-","")
println(partition_date)
val video_meigou_count = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(page_name) as video_meigou_count
|from online.bl_hdfs_page_view_updates pv inner join tmp1
|on pv.referrer_id = tmp1.cid_id
|where pv.partition_date = '${partition_date}'
|and pv.page_name='welfare_detail'
|and pv.referrer='diary_detail'
""".stripMargin
)
// video_meigou_count.show()
val txt_meigou_count = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(page_name) as txt_meigou_count
|from online.bl_hdfs_page_view_updates pv inner join tmp2
|on pv.referrer_id = tmp2.cid_id
|where pv.partition_date = '${partition_date}'
|and pv.page_name='welfare_detail'
|and pv.referrer='diary_detail'
""".stripMargin
)
// txt_meigou_count.show()
val video_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as video_clk_count
|from data_feed_click
|where cid_type = 'diary'
|and cid_id in (select cid from diary_video where stat_date='${stat_date}')
|and stat_date='${stat_date}'
""".stripMargin
)
// video_clk_count.show()
val txt_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(cid_id) as txt_clk_count
|from data_feed_click
|where cid_type = 'diary'
|and cid_id not in (select cid from diary_video where stat_date='${stat_date}')
|and stat_date='${stat_date}'
""".stripMargin
)
// txt_clk_count.show()
val video_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(distinct(cid)) as video_count
|from diary_video where stat_date='${stat_date}'
""".stripMargin
)
// video_count.show()
val vlog_meigou_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(page_name) as vlog_meigou_clk_num
|from online.bl_hdfs_page_view_updates
|where partition_date='${partition_date}'
|and page_name='welfare_detail'
|and referrer='diary_detail'
|and referrer_id in (select distinct(diary_id) from feed_diary_boost)
""".stripMargin
)
// vlog_meigou_clk_count.show()
val vlog_clk_count = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(cid_id) as vlog_clk_num
|from data_feed_click
|where stat_date='${stat_date}'
|and cid_type = 'diary'
|and cid_id in (select distinct(diary_id) from feed_diary_boost)
""".stripMargin
)
// vlog_clk_count.show()
//日记本转化美购
//1.日记本到美购转化数
val diary_meigou_count = sc.sql(
s"""
|select '${stat_date}' as stat_date, count(page_name) as diary_meigou_count
|from online.bl_hdfs_page_view_updates
|where partition_date = '${partition_date}'
|and page_name='welfare_detail'
|and referrer='diary_detail'
""".stripMargin
)
//2.日记本点击数
val diary_clk = sc.sql(
s"""
|select '${stat_date}' as stat_date,count(cl_id) as diary_clk
|from online.tl_hdfs_maidian_view
|where action = 'on_click_diary_card'
|and cl_id != "NULL"
|and partition_date='${partition_date}'
""".stripMargin
)
//3.日记本曝光数
val diary_expoure=sc.sql(
s"""
|select '${stat_date}' as stat_date,count(cl_id) as diary_expoure
|from online.ml_community_exposure_detail
|where business_type = "diary"
|and cl_id != "NULL"
|and partition_date='${partition_date}'
""".stripMargin
)
val result = video_clk_count.join(video_meigou_count,"stat_date")
.join(txt_clk_count,"stat_date")
.join(txt_meigou_count,"stat_date")
.join(video_count,"stat_date")
.join(vlog_meigou_clk_count,"stat_date")
.join(vlog_clk_count,"stat_date")
.join(diary_meigou_count,"stat_date")
.join(diary_clk,"stat_date")
.join(diary_expoure,"stat_date")
val result1 = result.withColumn("video_meigou_rate",result.col("video_meigou_count")/result.col("video_clk_count"))
val result2 = result1.withColumn("txt_meigou_rate",result.col("txt_meigou_count")/result.col("txt_clk_count"))
val result3 = result2.withColumn("vlog_meigou_rate",result.col("vlog_meigou_clk_num")/result.col("vlog_clk_num"))
val result4=result3.withColumn("diary_meigou_rate",result.col("diary_meigou_count")/result.col("diary_clk"))
val result5 =result4.withColumn("diary_expoure_meigou_rate",result.col("diary_meigou_count")/result.col("diary_expoure"))
result5.show()
GmeiConfig.writeToJDBCTable(result5, "diary_meigou_cvr", SaveMode.Append)
sc.stop()
}
}
}
object NdDataInput {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note("winter is coming")
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod", tableName = "nd_data_meigou_cid")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "eagle", tableName = "feed_diary_boost")
val date8 = GmeiConfig.getMinusNDate(70)
val result00 = sc.sql(
s"""
|SELECT
| split(service_id,'\\\\|')[1] as sid,split(cid,'\\\\|')[1] as cid
|FROM nd_data_meigou_cid
|where stat_date > '${date8}'
""".stripMargin
)
result00.createOrReplaceTempView("tmp1")
result00.show()
println(result00.count())
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val result01 = sc.sql(
s"""
|select a.sid as sid, a.cid as cid, b.tag_id as ctag_id, c.level1_id as clevel1_id
|from tmp1 a
|left join online.tl_hdfs_diary_tags_view b on a.cid=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
""".stripMargin
)
result01.createOrReplaceTempView("tmp2")
result01.show()
println(result01.count())
val result02 = sc.sql(
s"""
|select a.sid as sid, a.cid as cid, a.ctag_id as ctag_id, a.clevel1_id as clevel1_id,
| b.tag_id as stag_id, c.level1_id as slevel1_id
|from tmp2 a
|left join online.tl_meigou_servicetag_view b on a.sid=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
""".stripMargin
)
result02.createOrReplaceTempView("tmp3")
result02.show()
println(result02.count())
val tidb_input = sc.sql(
s"""
|select sid as service_id,cid
|from tmp3
|where clevel1_id = slevel1_id
""".stripMargin
)
tidb_input.show()
println(tidb_input.count())
}
}
}
object ServiceStat {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note("winter is coming")
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod", tableName = "nd_data_meigou_cid")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "eagle", tableName = "feed_diary_boost")
val result00 = sc.sql(
s"""
|select a.cl_id as device_id,
|COALESCE(a.params['diary_id'], a.params['business_id'], 0) as diary_id,
|c.level1_id as level1_id
|from online.tl_hdfs_maidian_view a
|left join online.tl_hdfs_diary_tags_view b on COALESCE(a.params['diary_id'], a.params['business_id'], 0)=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where a.partition_date > "20181112"
|and a.action="on_click_diary_card"
|and a.params["page_name"]="home"
|and a.cl_id != "NULL"
|and b.partition_date="20181119"
|and c.partition_date="20181119"
""".stripMargin
)
result00.collect.foreach(println)
}
}
}