EsmmData.scala 12.3 KB
Newer Older
高雅喆's avatar
高雅喆 committed
1 2 3 4 5 6 7 8 9
package com.gmei


import java.io.Serializable

import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
高雅喆's avatar
高雅喆 committed
10
import org.apache.spark.sql.functions.lit
高雅喆's avatar
高雅喆 committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48



object EsmmData {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
高雅喆's avatar
高雅喆 committed
49
      ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
高雅喆's avatar
高雅喆 committed
50 51 52 53 54 55 56 57 58 59
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")



      import sc.implicits._
      val stat_date = GmeiConfig.getMinusNDate(14)
      println(stat_date)
      val imp_data = sc.sql(
        s"""
高雅喆's avatar
高雅喆 committed
60
           |select distinct stat_date,device_id,city_id as ucity_id,
高雅喆's avatar
高雅喆 committed
61 62 63 64 65 66
           |  cid_id,diary_service_id
           |from data_feed_exposure
           |where cid_type = 'diary'
           |and stat_date >'${stat_date}'
         """.stripMargin
      )
高雅喆's avatar
高雅喆 committed
67 68 69
//      imp_data.show()
//      println("imp_data.count()")
//      println(imp_data.count())
高雅喆's avatar
高雅喆 committed
70 71 72 73


      val clk_data = sc.sql(
        s"""
高雅喆's avatar
高雅喆 committed
74
           |select distinct stat_date,device_id,city_id as ucity_id,
高雅喆's avatar
高雅喆 committed
75 76 77 78 79 80
           |  cid_id,diary_service_id
           |from data_feed_click
           |where cid_type = 'diary'
           |and stat_date >'${stat_date}'
         """.stripMargin
      )
高雅喆's avatar
高雅喆 committed
81 82 83
//      clk_data.show()
//      println("clk_data.count()")
//      println(clk_data.count())
高雅喆's avatar
高雅喆 committed
84 85 86



高雅喆's avatar
高雅喆 committed
87
      val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0))
高雅喆's avatar
高雅喆 committed
88 89 90 91
//      imp_data_filter.createOrReplaceTempView("imp_data_filter")
//      imp_data_filter.show()
//      println("imp_data_filter.count()")
//      println(imp_data_filter.count())
高雅喆's avatar
高雅喆 committed
92 93 94 95 96


      val stat_date_not = GmeiConfig.getMinusNDate(14).replace("-","")
      val cvr_data = sc.sql(
        s"""
高雅喆's avatar
高雅喆 committed
97
           |select distinct
高雅喆's avatar
高雅喆 committed
98
           |  from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
高雅喆's avatar
高雅喆 committed
99 100 101 102 103 104 105 106 107
           |  cl_id as device_id,city_id as ucity_id,
           |  params["referrer_id"] as cid_id,params["business_id"] as diary_service_id
           |from online.tl_hdfs_maidian_view
           |where action='page_view'
           |and partition_date >'${stat_date_not}'
           |and params['page_name'] = 'welfare_detail'
           |and params['referrer'] = 'diary_detail'
         """.stripMargin
      )
高雅喆's avatar
高雅喆 committed
108

高雅喆's avatar
高雅喆 committed
109
      val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1))
高雅喆's avatar
高雅喆 committed
110 111 112 113
//      cvr_data_filter.createOrReplaceTempView("cvr_data_filter")
//      cvr_data_filter.show()
//      println("cvr_data_filter.count()")
//      println(cvr_data_filter.count())
高雅喆's avatar
高雅喆 committed
114 115


高雅喆's avatar
高雅喆 committed
116 117

      val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
高雅喆's avatar
高雅喆 committed
118 119 120 121
//      clk_data_filter.createOrReplaceTempView("clk_data_filter")
//      clk_data_filter.show()
//      println("clk_data_filter.count()")
//      println(clk_data_filter.count())
高雅喆's avatar
高雅喆 committed
122 123


高雅喆's avatar
高雅喆 committed
124
      val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter)
高雅喆's avatar
高雅喆 committed
125
      union_data.createOrReplaceTempView("union_data")
高雅喆's avatar
高雅喆 committed
126 127 128
//      union_data.show()
//      println("union_data.count()")
//      println(union_data.count())
高雅喆's avatar
高雅喆 committed
129 130


高雅喆's avatar
高雅喆 committed
131 132 133 134 135 136
      val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
      val union_data_clabel = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
           |  c.level1_id as clevel1_id
           |from union_data a
高雅喆's avatar
高雅喆 committed
137
           |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
高雅喆's avatar
高雅喆 committed
138 139 140 141 142 143
           |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_clabel.createOrReplaceTempView("union_data_clabel")
高雅喆's avatar
高雅喆 committed
144
//      union_data_clabel.show()
高雅喆's avatar
高雅喆 committed
145 146 147 148 149 150 151 152 153 154 155 156 157

      val union_data_slabel = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
           |  c.level1_id as slevel1_id
           |from union_data_clabel a
           |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
           |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_slabel.createOrReplaceTempView("union_data_slabel")
高雅喆's avatar
高雅喆 committed
158 159 160 161 162 163 164 165 166 167 168 169 170 171
//      union_data_slabel.show()


      val union_data_ccity_name = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
           |  c.name as ccity_name
           |from union_data_slabel a
           |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
           |left join src_zhengxing_api_tag c on b.tag_id=c.id
           | where c.tag_type=4
         """.stripMargin
      )
      union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
高雅喆's avatar
高雅喆 committed
172
//      union_data_ccity_name.show()
高雅喆's avatar
高雅喆 committed
173 174 175

      val union_data_scity_id = sc.sql(
        s"""
高雅喆's avatar
高雅喆 committed
176
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
高雅喆's avatar
高雅喆 committed
177 178 179 180 181 182 183 184 185 186
           |  d.city_id as scity_id
           |from union_data_ccity_name a
           |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
           |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
           |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
           |and d.partition_date='${yesteday}'
         """.stripMargin
      )
高雅喆's avatar
高雅喆 committed
187
//      union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
高雅喆's avatar
高雅喆 committed
188
      union_data_scity_id.show()
189
      GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Overwrite)
高雅喆's avatar
高雅喆 committed
190

高雅喆's avatar
高雅喆 committed
191 192


高雅喆's avatar
高雅喆 committed
193 194 195 196 197 198 199

      sc.stop()

    }
  }

}
高雅喆's avatar
高雅喆 committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243




object EsmmPredData {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  case class Params(env: String = "dev"
                   ) extends AbstractParams[Params] with Serializable

  val defaultParams = Params()

  val parser = new OptionParser[Params]("Feed_EDA") {
    head("EsmmData")
    opt[String]("env")
      .text(s"the databases environment you used")
      .action((x, c) => c.copy(env = x))
    note(
      """
        |For example, the following command runs this app on a tidb dataset:
        |
        | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
      """.stripMargin +
        s"|   --env ${defaultParams.env}"
    )
  }



  def main(args: Array[String]): Unit = {
    parser.parse(args, defaultParams).map { param =>
      GmeiConfig.setup(param.env)
      val spark_env = GmeiConfig.getSparkSession()
      val sc = spark_env._2

      val ti = new TiContext(sc)
      ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
      ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "merge_queue_table")
      ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")


244
      val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
245 246 247 248 249
      val activate_data = sc.sql(
        s"""
           |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
           |from merge_queue_table a
           |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
250
           |where b.stat_date >'${yesteday_have_seq}'
251 252
         """.stripMargin
      )
高雅喆's avatar
高雅喆 committed
253 254 255 256 257 258 259

      val raw_data = sc.sql(
        s"""
           |select device_id,city_id as ucity_id, explode(split(search_queue, ',')) as cid_id
           |from merge_queue_table
         """.stripMargin
      )
260
      activate_data.createOrReplaceTempView("raw_data")
高雅喆's avatar
高雅喆 committed
261
//      raw_data.show()
高雅喆's avatar
高雅喆 committed
262 263 264

      import sc.implicits._

高雅喆's avatar
高雅喆 committed
265 266 267 268 269 270 271 272 273 274 275 276
      val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
      val sid_data = sc.sql(
        s"""
           |select distinct
           |  from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
           |  a.device_id,a.ucity_id,a.cid_id, b.service_id as diary_service_id
           |from raw_data a
           |left join online.ml_community_diary_updates b on a.cid_id = b.diary_id
           |where b.partition_date = '${yesteday}'
         """.stripMargin
      )
      sid_data.show()
高雅喆's avatar
高雅喆 committed
277

高雅喆's avatar
高雅喆 committed
278 279
      val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
      sid_data_label.createOrReplaceTempView("union_data")
高雅喆's avatar
高雅喆 committed
280 281


高雅喆's avatar
高雅喆 committed
282 283 284 285 286 287 288 289 290 291 292 293 294
      val union_data_clabel = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
           |  c.level1_id as clevel1_id
           |from union_data a
           |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
           |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_clabel.createOrReplaceTempView("union_data_clabel")
      //      union_data_clabel.show()
高雅喆's avatar
高雅喆 committed
295

高雅喆's avatar
高雅喆 committed
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
      val union_data_slabel = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
           |  c.level1_id as slevel1_id
           |from union_data_clabel a
           |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
           |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
         """.stripMargin
      )
      union_data_slabel.createOrReplaceTempView("union_data_slabel")
      //      union_data_slabel.show()


      val union_data_ccity_name = sc.sql(
        s"""
           |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
           |  c.name as ccity_name
           |from union_data_slabel a
           |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
           |left join src_zhengxing_api_tag c on b.tag_id=c.id
           | where c.tag_type=4
         """.stripMargin
      )
      union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
      //      union_data_ccity_name.show()
高雅喆's avatar
高雅喆 committed
323

高雅喆's avatar
高雅喆 committed
324 325
      val union_data_scity_id = sc.sql(
        s"""
高雅喆's avatar
高雅喆 committed
326
           |select distinct a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
高雅喆's avatar
高雅喆 committed
327 328 329 330 331 332 333 334 335 336 337 338
           |  d.city_id as scity_id
           |from union_data_ccity_name a
           |left join online.tl_meigou_service_view b on a.diary_service_id=b.id
           |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
           |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
           |where b.partition_date='${yesteday}'
           |and c.partition_date='${yesteday}'
           |and d.partition_date='${yesteday}'
         """.stripMargin
      )
      //      union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
      union_data_scity_id.show()
高雅喆's avatar
高雅喆 committed
339
      GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_pre_data",SaveMode.Overwrite)
高雅喆's avatar
高雅喆 committed
340 341 342 343 344 345 346 347 348 349




      sc.stop()

    }
  }

}