strategy_other.scala 12.3 KB
Newer Older
王志伟's avatar
王志伟 committed
1 2 3 4 5
package com.gmei

import java.io.Serializable

import com.gmei.WeafareStat.{defaultParams, parser}
王志伟's avatar
王志伟 committed
6
import org.apache.spark.sql.{DataFrame, SaveMode}
王志伟's avatar
王志伟 committed
7 8 9
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
王志伟's avatar
王志伟 committed
10 11
//import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
import org.apache.spark.sql.SparkSession
王志伟's avatar
王志伟 committed
12
import org.apache.spark.sql.functions._
王志伟's avatar
王志伟 committed
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

object strategy_other {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }
王志伟's avatar
王志伟 committed
42
//统计每天新用户的点击率
王志伟's avatar
王志伟 committed
43 44 45 46 47 48
  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

王志伟's avatar
王志伟 committed
49 50 51 52 53 54 55
//      val ti = new TiContext(sc)
      sc.sql("use jerry_prod")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
王志伟's avatar
王志伟 committed
56 57 58 59 60 61


      import sc.implicits._
      //val stat_date = GmeiConfig.getMinusNDate(1)
      //println(param.date)
      val partition_date = param.date.replace("-","")
王志伟's avatar
王志伟 committed
62
      val devicee_id_newUser = sc.sql(
王志伟's avatar
王志伟 committed
63 64 65
        s"""
           |select distinct(device_id) as device_id
           |from online.ml_device_day_active_status
王志伟's avatar
王志伟 committed
66
           |where active_type != '4'
王志伟's avatar
王志伟 committed
67 68 69 70 71 72 73 74 75 76
           |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
           |        ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
           |        ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
           |        ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
           |        ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
           |        ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
           |        ,'promotion_shike','promotion_julang_jl03')
           |and partition_date ='${partition_date}'
         """.stripMargin
      )
王志伟's avatar
王志伟 committed
77 78
      devicee_id_newUser.show()
      devicee_id_newUser.createOrReplaceTempView("device_id_new")
王志伟's avatar
王志伟 committed
79

王志伟's avatar
王志伟 committed
80
      val clk_count_newUser_Contrast = sc.sql(
王志伟's avatar
王志伟 committed
81
        s"""
王志伟's avatar
王志伟 committed
82 83 84
           |select '${param.date}' as stat_date, count(cid_id) as clk_count_newUser_Contrast
           |from data_feed_click jd inner join device_id_new
           |on jd.device_id = device_id_new.device_id
王志伟's avatar
王志伟 committed
85 86 87 88 89 90 91
           |where  (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
           |and jd.device_id regexp'1$$'
           |and jd.device_id not in (select device_id from blacklist)
           |and jd.stat_date ='${param.date}'
         """.stripMargin
      )

王志伟's avatar
王志伟 committed
92
      val imp_count_newUser_Contrast = sc.sql(
王志伟's avatar
王志伟 committed
93
        s"""
王志伟's avatar
王志伟 committed
94 95 96
           |select '${param.date}' as stat_date, count(cid_id) as imp_count_newUser_Contrast
           |from data_feed_exposure je inner join device_id_new
           |on je.device_id = device_id_new.device_id
王志伟's avatar
王志伟 committed
97
           |where je.cid_type = 'diary'
王志伟's avatar
王志伟 committed
98
           |and je.device_id regexp'1$$'
王志伟's avatar
王志伟 committed
99 100 101 102 103
           |and je.device_id not in (select device_id from blacklist)
           |and je.stat_date ='${param.date}'
         """.stripMargin
      )

王志伟's avatar
王志伟 committed
104
      val clk_count_newUser_all = sc.sql(
王志伟's avatar
王志伟 committed
105
        s"""
王志伟's avatar
王志伟 committed
106 107 108
           |select '${param.date}' as stat_date, count(cid_id) as clk_count_newUser_all
           |from data_feed_click jd inner join device_id_new
           |on jd.device_id = device_id_new.device_id
王志伟's avatar
王志伟 committed
109 110 111 112 113 114
           |where  (jd.cid_type = 'diary' or jd.cid_type = 'diary_video')
           |and jd.device_id not in (select device_id from blacklist)
           |and jd.stat_date ='${param.date}'
         """.stripMargin
      )

王志伟's avatar
王志伟 committed
115
      val imp_count_newUser_all = sc.sql(
王志伟's avatar
王志伟 committed
116
        s"""
王志伟's avatar
王志伟 committed
117
           |select '${param.date}' as stat_date, count(cid_id) as imp_count_newUser_all
王志伟's avatar
王志伟 committed
118 119
           |from data_feed_exposure je inner join device_id_new
           |on je.device_id = device_id_new.device_id
王志伟's avatar
王志伟 committed
120 121 122 123 124 125
           |where je.cid_type = 'diary'
           |and je.device_id not in (select device_id from blacklist)
           |and je.stat_date ='${param.date}'
         """.stripMargin
      )

王志伟's avatar
王志伟 committed
126 127 128 129
      val result3 = clk_count_newUser_Contrast.join(imp_count_newUser_Contrast,"stat_date")
        .join(clk_count_newUser_all,"stat_date")
        .join(imp_count_newUser_all,"stat_date")
      result3.show()
王志伟's avatar
王志伟 committed
130

王志伟's avatar
王志伟 committed
131
      GmeiConfig.writeToJDBCTable(result3, "Recommendation_strategy_newUser", SaveMode.Append)
王志伟's avatar
王志伟 committed
132 133 134 135 136 137 138 139


    }


  }

}
王志伟's avatar
王志伟 committed
140 141 142 143 144 145




//下边内容开始分析统计推荐系统评价指标

王志伟's avatar
王志伟 committed
146
object diary_exposure {
王志伟's avatar
王志伟 committed
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev",
                    date: String = "2018-08-01"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("WeafareStat")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    opt[String] ("date")
      .text(s"the date you used")
      .action((x,c) => c.copy(date = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }

  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2
王志伟's avatar
王志伟 committed
180 181 182 183 184
//      val ti = new TiContext(sc)
//      ti.tidbMapTable(dbName = "eagle", tableName = "src_mimas_prod_api_diary")
////      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
//      ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
王志伟's avatar
王志伟 committed
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205

      val mimas_url ="jdbc:mysql://rr-m5et21lafq1677pid.mysql.rds.aliyuncs.com/mimas_prod"
      val mimas_user = "mimas"
      val mimas_password = "GJL3UJe1Ck9ggL6aKnZCq4cRvM"
      val zhengxing_url = "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing"
      val zhengxing_user = "work"
      val zhengxing_password = "BJQaT9VzDcuPBqkd"

      def mysql_df(spark:SparkSession,url:String,table:String,user:String,password:String,sql:String): DataFrame ={
        val jdbcDF = spark.read
          .format("jdbc")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("url", url)
          .option("dbtable", table)
          .option("user", user)
          .option("password", password)
          .load()
        jdbcDF.createOrReplaceTempView(table)
        try {spark.sql(sql)}
        catch {case _ => spark.emptyDataFrame}
      }
王志伟's avatar
王志伟 committed
206

王志伟's avatar
王志伟 committed
207
      val stat_date = GmeiConfig.getMinusNDate(1)
王志伟's avatar
王志伟 committed
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
      val partition_date = stat_date.replace("-","")

      //机构ID
      val agency_id = sc.sql(
        s"""
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_day
           |WHERE partition_date >= '20180402'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
           |UNION ALL
           |SELECT DISTINCT(cl_id) as device_id
           |FROM online.ml_hospital_spam_pv_month
           |WHERE partition_date >= '20171101'
           |AND partition_date <= '${partition_date}'
           |AND pv_ratio >= 0.95
         """.stripMargin
      )
      agency_id.show()
      agency_id.createOrReplaceTempView("agency_id")

王志伟's avatar
王志伟 committed
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
//当日首页精选总曝光,去除结构和黑名单
      val imp_count = sc.sql(
        s"""
           |select count(cid_id) as imp_num
           |from data_feed_exposure de left join agency_id
           |on de.device_id = agency_id.device_id
           |where de.cid_type = 'diary'
           |and agency_id.device_id is null
           |and de.stat_date ='2018-11-26'
           |and de.device_id not in (select distinct(device_id) from blacklist)
         """.stripMargin
      )
      imp_count.show()

//曝光表中的日记id,去除机构和黑名单
王志伟's avatar
王志伟 committed
244 245 246 247 248 249 250
      val diary_id_temp = sc.sql(
        s"""
           |select cid_id as diary_id
           |from data_feed_exposure de left join agency_id
           |on de.device_id = agency_id.device_id
           |where de.cid_type = 'diary'
           |and agency_id.device_id is null
王志伟's avatar
王志伟 committed
251
           |and de.stat_date ='2018-11-26'
王志伟's avatar
王志伟 committed
252
           |and de.device_id not in (select distinct(device_id) from blacklist)
王志伟's avatar
王志伟 committed
253 254
         """.stripMargin
      )
王志伟's avatar
王志伟 committed
255 256
      diary_id_temp.createOrReplaceTempView("diary_id_temp")
      val diary_id = diary_id_temp.rdd.map(x =>x(0).toString).collect()
王志伟's avatar
王志伟 committed
257

王志伟's avatar
王志伟 committed
258
      val cid_tag =
王志伟's avatar
王志伟 committed
259
        s"""
王志伟's avatar
王志伟 committed
260 261
           |select diary_id,tag_id from api_diary_tags
           |where diary_id in (${diary_id.map(x => s"'$x'").mkString(",")})
王志伟's avatar
王志伟 committed
262
         """.stripMargin
王志伟's avatar
王志伟 committed
263 264 265
      val cid_city = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag)
      val tag_list = cid_city.select("tag_id").collect().map(x => x(0).toString).distinct
      val tag_city =
王志伟's avatar
王志伟 committed
266
        s"""
王志伟's avatar
王志伟 committed
267 268 269 270 271 272 273 274
           |select id,name from api_tag where tag_type = 4
           |and id in (${tag_list.map(x => s"'$x'").mkString(",")})
        """.stripMargin
      val city_df = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city)
        .na.drop().withColumnRenamed("id","tag_id")
      val df_cid_city = cid_city.join(city_df,Seq("tag_id"),"left_outer").na.drop()
        .drop("tag_id")

王志伟's avatar
王志伟 committed
275
      val final_cid_city = diary_id_temp.join(df_cid_city,Seq("diary_id"),"left_outer")
王志伟's avatar
王志伟 committed
276
      final_cid_city.show()
王志伟's avatar
王志伟 committed
277 278 279 280 281 282 283 284 285
      val df1=final_cid_city.groupBy("name").count().orderBy(desc("count"))

//3.5星以上日记本的id
      val diary_id_temp2 = sc.sql(
        s"""
           |select id as diary_id
           |from src_mimas_prod_api_diary
           |where content_level >=3.5
           |and doctor_id is not null
王志伟's avatar
王志伟 committed
286
          """.stripMargin
王志伟's avatar
王志伟 committed
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
      )

      diary_id_temp2.createOrReplaceTempView("diary_id_temp2")
      val diary_id2 = diary_id_temp2.rdd.map(x =>x(0).toString).collect()

      val cid_tag2 =
        s"""
           |select diary_id,tag_id from api_diary_tags
           |where diary_id in (${diary_id2.map(x => s"'$x'").mkString(",")})
         """.stripMargin
      val cid_city2 = mysql_df(sc,mimas_url,"api_diary_tags",mimas_user,mimas_password,cid_tag2)
      val tag_list2 = cid_city2.select("tag_id").collect().map(x => x(0).toString).distinct
      val tag_city2 =
        s"""
           |select id,name from api_tag where tag_type = 4
           |and id in (${tag_list2.map(x => s"'$x'").mkString(",")})
        """.stripMargin
      val city_df2 = mysql_df(sc,zhengxing_url,"api_tag",zhengxing_user,zhengxing_password,tag_city2)
        .na.drop().withColumnRenamed("id","tag_id")
      val df_cid_city2 = cid_city2.join(city_df2,Seq("tag_id"),"left_outer").na.drop()
        .drop("tag_id")

      val final_cid_city2 = diary_id_temp2.join(df_cid_city2,Seq("diary_id"),"left_outer")
      final_cid_city2.show()
      val df2 =final_cid_city2.groupBy("name").count().orderBy(desc("count"))

      val df3 =df1.join(df2,Seq("name"),"left_outer")
      df3.show(400)

王志伟's avatar
王志伟 committed
316
      //GmeiConfig.writeToJDBCTable(diary_id, "dairy_id_queue", SaveMode.Append)
王志伟's avatar
王志伟 committed
317 318 319 320 321 322 323 324 325


    }


  }

}

王志伟's avatar
王志伟 committed
326