package com.gmei import java.io.Serializable import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams object WeafareStat { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "eagle",tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "eagle",tableName = "feed_diary_boost") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) println(stat_date) val video_cids = sc.sql( s""" |select distinct(cid_id) as cid_id |from data_feed_click |where cid_type = 'diary' |and cid_id in (select cid from diary_video where stat_date='${stat_date}') |and stat_date ='${stat_date}' """.stripMargin ) // video_cids.show() video_cids.createOrReplaceTempView("tmp1") val txt_cids = sc.sql( s""" |select distinct(cid_id) as cid_id |from data_feed_click |where cid_type = 'diary' |and cid_id not in (select cid from diary_video where stat_date='${stat_date}') |and stat_date ='${stat_date}' """.stripMargin ) // txt_cids.show() txt_cids.createOrReplaceTempView("tmp2") val partition_date = stat_date.replace("-","") println(partition_date) val video_meigou_count = sc.sql( s""" |select '${stat_date}' as stat_date, count(page_name) as video_meigou_count |from online.bl_hdfs_page_view_updates pv inner join tmp1 |on pv.referrer_id = tmp1.cid_id |where pv.partition_date = '${partition_date}' |and pv.page_name='welfare_detail' |and pv.referrer='diary_detail' """.stripMargin ) // video_meigou_count.show() val txt_meigou_count = sc.sql( s""" |select '${stat_date}' as stat_date, count(page_name) as txt_meigou_count |from online.bl_hdfs_page_view_updates pv inner join tmp2 |on pv.referrer_id = tmp2.cid_id |where pv.partition_date = '${partition_date}' |and pv.page_name='welfare_detail' |and pv.referrer='diary_detail' """.stripMargin ) // txt_meigou_count.show() val video_clk_count = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as video_clk_count |from data_feed_click |where cid_type = 'diary' |and cid_id in (select cid from diary_video where stat_date='${stat_date}') |and stat_date='${stat_date}' """.stripMargin ) // video_clk_count.show() val txt_clk_count = sc.sql( s""" |select '${stat_date}' as stat_date, count(cid_id) as txt_clk_count |from data_feed_click |where cid_type = 'diary' |and cid_id not in (select cid from diary_video where stat_date='${stat_date}') |and stat_date='${stat_date}' """.stripMargin ) // txt_clk_count.show() val video_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(distinct(cid)) as video_count |from diary_video where stat_date='${stat_date}' """.stripMargin ) // video_count.show() val vlog_meigou_clk_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(page_name) as vlog_meigou_clk_num |from online.bl_hdfs_page_view_updates |where partition_date='${partition_date}' |and page_name='welfare_detail' |and referrer='diary_detail' |and referrer_id in (select distinct(diary_id) from feed_diary_boost) """.stripMargin ) // vlog_meigou_clk_count.show() val vlog_clk_count = sc.sql( s""" |select '${stat_date}' as stat_date,count(cid_id) as vlog_clk_num |from data_feed_click |where stat_date='${stat_date}' |and cid_type = 'diary' |and cid_id in (select distinct(diary_id) from feed_diary_boost) """.stripMargin ) // vlog_clk_count.show() //日记本转化美购 //1.日记本到美购转化数 val diary_meigou_count = sc.sql( s""" |select '${stat_date}' as stat_date, count(page_name) as diary_meigou_count |from online.bl_hdfs_page_view_updates |where partition_date = '${partition_date}' |and page_name='welfare_detail' |and referrer='diary_detail' """.stripMargin ) //2.日记本点击数 val diary_clk = sc.sql( s""" |select '${stat_date}' as stat_date,count(cl_id) as diary_clk |from online.tl_hdfs_maidian_view |where action = 'on_click_diary_card' |and cl_id != "NULL" |and partition_date='${partition_date}' """.stripMargin ) //3.日记本曝光数 val diary_expoure=sc.sql( s""" |select '${stat_date}' as stat_date,count(cl_id) as diary_expoure |from online.ml_community_exposure_detail |where business_type = "diary" |and cl_id != "NULL" |and partition_date='${partition_date}' """.stripMargin ) val result = video_clk_count.join(video_meigou_count,"stat_date") .join(txt_clk_count,"stat_date") .join(txt_meigou_count,"stat_date") .join(video_count,"stat_date") .join(vlog_meigou_clk_count,"stat_date") .join(vlog_clk_count,"stat_date") .join(diary_meigou_count,"stat_date") .join(diary_clk,"stat_date") .join(diary_expoure,"stat_date") val result1 = result.withColumn("video_meigou_rate",result.col("video_meigou_count")/result.col("video_clk_count")) val result2 = result1.withColumn("txt_meigou_rate",result.col("txt_meigou_count")/result.col("txt_clk_count")) val result3 = result2.withColumn("vlog_meigou_rate",result.col("vlog_meigou_clk_num")/result.col("vlog_clk_num")) val result4=result3.withColumn("diary_meigou_rate",result.col("diary_meigou_count")/result.col("diary_clk")) val result5 =result4.withColumn("diary_expoure_meigou_rate",result.col("diary_meigou_count")/result.col("diary_expoure")) result5.show() GmeiConfig.writeToJDBCTable(result5, "diary_meigou_cvr", SaveMode.Append) sc.stop() } } } object NdDataInput { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note("winter is coming") } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "nd_data_meigou_cid") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "eagle", tableName = "feed_diary_boost") val date8 = GmeiConfig.getMinusNDate(70) val result00 = sc.sql( s""" |SELECT | split(service_id,'\\\\|')[1] as sid,split(cid,'\\\\|')[1] as cid |FROM nd_data_meigou_cid |where stat_date > '${date8}' """.stripMargin ) result00.createOrReplaceTempView("tmp1") result00.show() println(result00.count()) val yesteday = GmeiConfig.getMinusNDate(1).replace("-","") val result01 = sc.sql( s""" |select a.sid as sid, a.cid as cid, b.tag_id as ctag_id, c.level1_id as clevel1_id |from tmp1 a |left join online.tl_hdfs_diary_tags_view b on a.cid=b.diary_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' """.stripMargin ) result01.createOrReplaceTempView("tmp2") result01.show() println(result01.count()) val result02 = sc.sql( s""" |select a.sid as sid, a.cid as cid, a.ctag_id as ctag_id, a.clevel1_id as clevel1_id, | b.tag_id as stag_id, c.level1_id as slevel1_id |from tmp2 a |left join online.tl_meigou_servicetag_view b on a.sid=b.service_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' """.stripMargin ) result02.createOrReplaceTempView("tmp3") result02.show() println(result02.count()) val tidb_input = sc.sql( s""" |select sid as service_id,cid |from tmp3 |where clevel1_id = slevel1_id """.stripMargin ) tidb_input.show() println(tidb_input.count()) } } } object ServiceStat { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note("winter is coming") } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "nd_data_meigou_cid") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "eagle", tableName = "feed_diary_boost") val result00 = sc.sql( s""" |select a.cl_id as device_id, |COALESCE(a.params['diary_id'], a.params['business_id'], 0) as diary_id, |c.level1_id as level1_id |from online.tl_hdfs_maidian_view a |left join online.tl_hdfs_diary_tags_view b on COALESCE(a.params['diary_id'], a.params['business_id'], 0)=b.diary_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where a.partition_date > "20181112" |and a.action="on_click_diary_card" |and a.params["page_name"]="home" |and a.cl_id != "NULL" |and b.partition_date="20181119" |and c.partition_date="20181119" """.stripMargin ) result00.collect.foreach(println) } } }