strategy_other.scala 12.4 KB
package com.gmei

import java.io.Serializable

import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
//import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object strategy_other {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }
//统计每天新用户的点击率
  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
      ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")


      import sc.implicits._
      //val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = param.date.replace("-","")
      val devicee_id_newUser = sc.sql(
        s"""
           |select distinct(device_id) as device_id
           |from online.ml_device_day_active_status
           |where active_type != '4'
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |        ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |        ,'promotion_shike','promotion_julang_jl03')
           |and partition_date ='${partition_date}'
         """.stripMargin
      )
      devicee_id_newUser.show()
      devicee_id_newUser.createOrReplaceTempView("device_id_new")

      val clk_count_newUser_Contrast = sc.sql(
        s"""
           |select '${param.date}' as stat_date, count(cid_id) as clk_count_newUser_Contrast
           |from data_feed_click jd inner join device_id_new
           |on jd.device_id = device_id_new.device_id
           |where  (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
           |and jd.device_id regexp'1$$'
           |and jd.device_id not in (select device_id from blacklist)
           |and jd.stat_date ='${param.date}'
         """.stripMargin
      )

      val imp_count_newUser_Contrast = sc.sql(
        s"""
           |select '${param.date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast
           |from data_feed_exposure je inner join device_id_new
           |on je.device_id = device_id_new.device_id
           |where je.cid_type = 'diary'
           |and je.device_id regexp'1$$'
           |and je.device_id not in (select device_id from blacklist)
           |and je.stat_date ='${param.date}'
         """.stripMargin
      )

      val clk_count_newUser_all = sc.sql(
        s"""
           |select '${param.date}' as stat_date, count(cid_id) as clk_count_newUser_all
           |from data_feed_click jd inner join device_id_new
           |on jd.device_id = device_id_new.device_id
           |where  (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
           |and jd.device_id not in (select device_id from blacklist)
           |and jd.stat_date ='${param.date}'
         """.stripMargin
      )

      val imp_count_newUser_all = sc.sql(
        s"""
           |select '${param.date}' as stat_date, count(cid_id) as imp_count_newUser_all
           |from data_feed_exposure je inner join device_id_new
           |on je.device_id = device_id_new.device_id
           |where je.cid_type = 'diary'
           |and je.device_id not in (select device_id from blacklist)
           |and je.stat_date ='${param.date}'
         """.stripMargin
      )

      val result3 = clk_count_newUser_Contrast.join(imp_count_newUser_Contrast,"stat_date")
        .join(clk_count_newUser_all,"stat_date")
        .join(imp_count_newUser_all,"stat_date")
      result3.show()

      GmeiConfig.writeToJDBCTable(result3, "Recommendation_strategy_newUser", SaveMode.Append)


    }


  }

}




//下边内容开始分析统计推荐系统评价指标

object diary_exposure {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2
      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "eagle", tableName = "src_mimas_prod_api_diary")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")

      val mimas_url ="jdbc:mysql://rr-m5et21lafq1677pid.mysql.rds.aliyuncs.com/mimas_prod"
      val mimas_user = "mimas"
      val mimas_password = "GJL3UJe1Ck9ggL6aKnZCq4cRvM"
      val zhengxing_url = "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing"
      val zhengxing_user = "work"
      val zhengxing_password = "BJQaT9VzDcuPBqkd"

      def mysql_df(spark:SparkSession,url:String,table:String,user:String,password:String,sql:String): DataFrame ={
        val jdbcDF = spark.read
          .format("jdbc")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("url", url)
          .option("dbtable", table)
          .option("user", user)
          .option("password", password)
          .load()
        jdbcDF.createOrReplaceTempView(table)
        try {spark.sql(sql)}
        catch {case _ => spark.emptyDataFrame}
      }

      val stat_date = GmeiConfig.getMinusNDate(1)
      val partition_date = stat_date.replace("-","")

      //机构ID
      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.show()
      agency_id.createOrReplaceTempView("agency_id")

//当日首页精选总曝光,去除结构和黑名单
      val imp_count = sc.sql(
        s"""
           |select count(cid_id) as imp_num
           |from data_feed_exposure de left join agency_id
           |on de.device_id = agency_id.device_id
           |where de.cid_type = 'diary'
           |and agency_id.device_id is null
           |and de.stat_date ='2018-11-26'
           |and de.device_id not in (select distinct(device_id) from blacklist)
         """.stripMargin
      )
      imp_count.show()

//曝光表中的日记id,去除机构和黑名单
      val diary_id_temp = sc.sql(
        s"""
           |select cid_id as diary_id
           |from data_feed_exposure de left join agency_id
           |on de.device_id = agency_id.device_id
           |where de.cid_type = 'diary'
           |and agency_id.device_id is null
           |and de.stat_date ='2018-11-26'
           |and de.device_id not in (select distinct(device_id) from blacklist)
         """.stripMargin
      )
      diary_id_temp.createOrReplaceTempView("diary_id_temp")
      val diary_id = diary_id_temp.rdd.map(x =>x(0).toString).collect()

      val cid_tag =
        s"""
           |select diary_id,tag_id from api_diary_tags
           |where diary_id in (${diary_id.map(x => s"'$x'").mkString(",")})
         """.stripMargin
      val cid_city = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag)
      val tag_list = cid_city.select("tag_id").collect().map(x => x(0).toString).distinct
      val tag_city =
        s"""
           |select id,name from api_tag where tag_type = 4
           |and id in (${tag_list.map(x => s"'$x'").mkString(",")})
        """.stripMargin
      val city_df = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city)
        .na.drop().withColumnRenamed("id","tag_id")
      val df_cid_city = cid_city.join(city_df,Seq("tag_id"),"left_outer").na.drop()
        .drop("tag_id")

      val final_cid_city = diary_id_temp.join(df_cid_city,Seq("diary_id"),"left_outer")
      final_cid_city.show()
      val df1=final_cid_city.groupBy("name").count().orderBy(desc("count"))

//3.5星以上日记本的id
      val diary_id_temp2 = sc.sql(
        s"""
           |select id as diary_id
           |from src_mimas_prod_api_diary
           |where content_level >=3.5
           |and doctor_id is not null
          """.stripMargin
      )

      diary_id_temp2.createOrReplaceTempView("diary_id_temp2")
      val diary_id2 = diary_id_temp2.rdd.map(x =>x(0).toString).collect()

      val cid_tag2 =
        s"""
           |select diary_id,tag_id from api_diary_tags
           |where diary_id in (${diary_id2.map(x => s"'$x'").mkString(",")})
         """.stripMargin
      val cid_city2 = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag2)
      val tag_list2 = cid_city2.select("tag_id").collect().map(x => x(0).toString).distinct
      val tag_city2 =
        s"""
           |select id,name from api_tag where tag_type = 4
           |and id in (${tag_list2.map(x => s"'$x'").mkString(",")})
        """.stripMargin
      val city_df2 = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city2)
        .na.drop().withColumnRenamed("id","tag_id")
      val df_cid_city2 = cid_city2.join(city_df2,Seq("tag_id"),"left_outer").na.drop()
        .drop("tag_id")

      val final_cid_city2 = diary_id_temp2.join(df_cid_city2,Seq("diary_id"),"left_outer")
      final_cid_city2.show()
      val df2 =final_cid_city2.groupBy("name").count().orderBy(desc("count"))

      val df3 =df1.join(df2,Seq("name"),"left_outer")
      df3.show(400)










      //GmeiConfig.writeToJDBCTable(diary_id, "dairy_id_queue", SaveMode.Append)


    }


  }

}