package com.gmei import java.io.Serializable import org.apache.spark.sql.{SaveMode, TiContext} import org.apache.log4j.{Level, Logger} import scopt.OptionParser import com.gmei.lib.AbstractParams import org.apache.spark.sql.functions.lit object EsmmData { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) case class Params(env: String = "dev" ) extends AbstractParams[Params] with Serializable val defaultParams = Params() val parser = new OptionParser[Params]("Feed_EDA") { head("EsmmData") opt[String]("env") .text(s"the databases environment you used") .action((x, c) => c.copy(env = x)) note( """ |For example, the following command runs this app on a tidb dataset: | | spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \ """.stripMargin + s"| --env ${defaultParams.env}" ) } def main(args: Array[String]): Unit = { parser.parse(args, defaultParams).map { param => GmeiConfig.setup(param.env) val spark_env = GmeiConfig.getSparkSession() val sc = spark_env._2 val ti = new TiContext(sc) ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") import sc.implicits._ val stat_date = GmeiConfig.getMinusNDate(14) println(stat_date) val imp_data = sc.sql( s""" |select distinct stat_date,device_id,city_id as ucity_id, | cid_id,diary_service_id |from data_feed_exposure |where cid_type = 'diary' |and stat_date >'${stat_date}' """.stripMargin ) // imp_data.show() // println("imp_data.count()") // println(imp_data.count()) val clk_data = sc.sql( s""" |select distinct stat_date,device_id,city_id as ucity_id, | cid_id,diary_service_id |from data_feed_click |where cid_type = 'diary' |and stat_date >'${stat_date}' """.stripMargin ) // clk_data.show() // println("clk_data.count()") // println(clk_data.count()) val imp_data_filter = imp_data.except(clk_data).withColumn("y",lit(0)).withColumn("z",lit(0)) // imp_data_filter.createOrReplaceTempView("imp_data_filter") // imp_data_filter.show() // println("imp_data_filter.count()") // println(imp_data_filter.count()) val stat_date_not = GmeiConfig.getMinusNDate(14).replace("-","") val cvr_data = sc.sql( s""" |select distinct | from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date, | cl_id as device_id,city_id as ucity_id, | params["referrer_id"] as cid_id,params["business_id"] as diary_service_id |from online.tl_hdfs_maidian_view |where action='page_view' |and partition_date >'${stat_date_not}' |and params['page_name'] = 'welfare_detail' |and params['referrer'] = 'diary_detail' """.stripMargin ) val cvr_data_filter = cvr_data.withColumn("y",lit(1)).withColumn("z",lit(1)) // cvr_data_filter.createOrReplaceTempView("cvr_data_filter") // cvr_data_filter.show() // println("cvr_data_filter.count()") // println(cvr_data_filter.count()) val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0)) // clk_data_filter.createOrReplaceTempView("clk_data_filter") // clk_data_filter.show() // println("clk_data_filter.count()") // println(clk_data_filter.count()) val union_data = imp_data_filter.union(clk_data_filter).union(cvr_data_filter) union_data.createOrReplaceTempView("union_data") // union_data.show() // println("union_data.count()") // println(union_data.count()) val yesteday = GmeiConfig.getMinusNDate(1).replace("-","") val union_data_clabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z, | c.level1_id as clevel1_id |from union_data a |left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' """.stripMargin ) union_data_clabel.createOrReplaceTempView("union_data_clabel") // union_data_clabel.show() val union_data_slabel = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id, | c.level1_id as slevel1_id |from union_data_clabel a |left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id |left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' """.stripMargin ) union_data_slabel.createOrReplaceTempView("union_data_slabel") // union_data_slabel.show() val union_data_ccity_name = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id, | c.name as ccity_name |from union_data_slabel a |left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id |left join src_zhengxing_api_tag c on b.tag_id=c.id | where c.tag_type=4 """.stripMargin ) union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name") // union_data_ccity_name.show() val union_data_scity_id = sc.sql( s""" |select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name, | d.city_id as scity_id |from union_data_ccity_name a |left join online.tl_meigou_service_view b on a.diary_service_id=b.id |left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id |left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id |where b.partition_date='${yesteday}' |and c.partition_date='${yesteday}' |and d.partition_date='${yesteday}' """.stripMargin ) // union_data_scity_id.createOrReplaceTempView("union_data_scity_id") union_data_scity_id.show() GmeiConfig.writeToJDBCTable(union_data_scity_id, table="esmm_data",SaveMode.Overwrite) sc.stop() } } }