package com.gmei import java.io.Serializable import com.gmei.WeafareStat.{defaultParams, parser} import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import java.io._ object temp_analysis { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev", date: String = "2018-08-01" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("WeafareStat") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) opt[String] ("date") .text(s"the date you used") .action((x,c) => c.copy(date = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist") ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list") ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(1) //println(param.date) val partition_date = stat_date.replace("-","") //获取策略命中用户device_id val device_id_cover = sc.sql( s""" |select distinct(device_id) as device_id |from merge_queue_table """.stripMargin ) device_id_cover.createOrReplaceTempView("device_id_cover_older") val diary_pv = sc.sql( s""" |select '${stat_date}' as stat_date,count(params["business_id"]) as diary_pv,count(distinct(cl_id)) as device_num_diary |from online.tl_hdfs_maidian_view |where action="page_view" |and params["page_name"]="diary_detail" |and (params["out"]-params["in"])<7200 |and partition_date ='${partition_date}' """.stripMargin ) val meigou_pv = sc.sql( s""" |select '${stat_date}' as stat_date,count(params["business_id"]) as meigou_pv,count(distinct(cl_id)) as device_num_meigou |from online.tl_hdfs_maidian_view |where action="page_view" |and params["page_name"]="welfare_detail" |and (params["out"]-params["in"])<7200 |and partition_date ='${partition_date}' """.stripMargin ) val result = diary_pv.join(meigou_pv,"stat_date") result.show() GmeiConfig.writeToJDBCTable(result, "diary_pv", SaveMode.Append) // result.select("stat_date","diary_pv","device_num_diary","meigou_pv","device_num_meigou").write.mode(SaveMode.Append).save("/data2/test.txt") } } }