WeafareStat.scala 5.79 KB
package com.gmei

import java.io.Serializable

import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams

object WeafareStat {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }


  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "eagle",tableName = "feed_diary_boost")


      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(1)
      println(stat_date)
      val video_cids = sc.sql(
        s"""
           |select distinct(cid_id) as cid_id
           |from data_feed_click
           |where cid_type = 'diary'
           |and  cid_id  in (select cid from diary_video where stat_date='${stat_date}')
           |and stat_date ='${stat_date}'
         """.stripMargin
      )
      video_cids.show()
      video_cids.createOrReplaceTempView("tmp1")

      val txt_cids = sc.sql(
        s"""
           |select distinct(cid_id) as cid_id
           |from data_feed_click
           |where cid_type = 'diary'
           |and  cid_id  not in (select cid from diary_video where stat_date='${stat_date}')
           |and stat_date ='${stat_date}'
         """.stripMargin
      )
      txt_cids.show()
      txt_cids.createOrReplaceTempView("tmp2")

      val partition_date = stat_date.replace("-","")
      println(partition_date)
      val video_meigou_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(page_name) as video_meigou_count
           |from online.bl_hdfs_page_view_updates pv inner join tmp1
           |on pv.referrer_id = tmp1.cid_id
           |where pv.partition_date = '${partition_date}'
           |and pv.page_name='welfare_detail'
           |and pv.referrer='diary_detail'
         """.stripMargin
      )
      video_meigou_count.show()

      val txt_meigou_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(page_name) as txt_meigou_count
           |from online.bl_hdfs_page_view_updates pv inner join tmp2
           |on pv.referrer_id = tmp2.cid_id
           |where pv.partition_date = '${partition_date}'
           |and pv.page_name='welfare_detail'
           |and pv.referrer='diary_detail'
         """.stripMargin
      )
      txt_meigou_count.show()

      val video_clk_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as video_clk_count
           |from data_feed_click
           |where cid_type = 'diary'
           |and  cid_id  in (select cid from diary_video where stat_date='${stat_date}')
           |and stat_date='${stat_date}'
         """.stripMargin
      )
      video_clk_count.show()

      val txt_clk_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date, count(cid_id) as txt_clk_count
           |from data_feed_click
           |where cid_type = 'diary'
           |and  cid_id not in (select cid from diary_video where stat_date='${stat_date}')
           |and stat_date='${stat_date}'
         """.stripMargin
      )
      txt_clk_count.show()

      val video_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(distinct(cid)) as video_count
           |from diary_video where stat_date='${stat_date}'
         """.stripMargin
      )
      video_count.show()

      val vlog_meigou_clk_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(page_name) as vlog_meigou_clk_num
           |from online.bl_hdfs_page_view_updates
           |where partition_date='${partition_date}'
           |and page_name='welfare_detail'
           |and referrer='diary_detail'
           |and referrer_id in (select distinct(diary_id) from feed_diary_boost)
         """.stripMargin
      )
      vlog_meigou_clk_count.show()


      val vlog_clk_count = sc.sql(
        s"""
           |select '${stat_date}' as stat_date,count(cid_id) as vlog_clk_num
           |from data_feed_click
           |where stat_date='${stat_date}'
           |and  cid_type = 'diary'
           |and cid_id in (select distinct(diary_id) from feed_diary_boost)
         """.stripMargin
      )
      vlog_clk_count.show()


      val result = video_clk_count.join(video_meigou_count,"stat_date")
        .join(txt_clk_count,"stat_date")
        .join(txt_meigou_count,"stat_date")
        .join(video_count,"stat_date")
        .join(vlog_meigou_clk_count,"stat_date")
        .join(vlog_clk_count,"stat_date")


      val result1 = result.withColumn("video_meigou_rate",result.col("video_meigou_count")/result.col("video_clk_count"))
      val result2 = result1.withColumn("txt_meigou_rate",result.col("txt_meigou_count")/result.col("txt_clk_count"))
      val result3 = result2.withColumn("vlog_meigou_rate",result.col("vlog_meigou_clk_num")/result.col("vlog_clk_num"))

      result3.show()


      sc.stop()

    }
  }


}