Commit e010a3ab authored by 王志伟's avatar 王志伟
parents e5b5b373 39a163ee
......@@ -196,5 +196,145 @@ object EsmmData {
}
}
}
object EsmmPredData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "merge_queue_table")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
val raw_data = sc.sql(
s"""
|select device_id,city_id as ucity_id, explode(split(search_queue, ',')) as cid_id
|from merge_queue_table
""".stripMargin
)
raw_data.createOrReplaceTempView("raw_data")
// raw_data.show()
import sc.implicits._
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val sid_data = sc.sql(
s"""
|select distinct
| from_unixtime(unix_timestamp(partition_date ,'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
| a.device_id,a.ucity_id,a.cid_id, b.service_id as diary_service_id
|from raw_data a
|left join online.ml_community_diary_updates b on a.cid_id = b.diary_id
|where b.partition_date = '${yesteday}'
""".stripMargin
)
sid_data.show()
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data")
val union_data_clabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,
| c.level1_id as clevel1_id
|from union_data a
|left join online.tl_hdfs_diary_tags_view b on a.cid_id=b.diary_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
""".stripMargin
)
union_data_clabel.createOrReplaceTempView("union_data_clabel")
// union_data_clabel.show()
val union_data_slabel = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,
| c.level1_id as slevel1_id
|from union_data_clabel a
|left join online.tl_meigou_servicetag_view b on a.diary_service_id=b.service_id
|left join online.bl_tag_hierarchy_detail c on b.tag_id=c.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
""".stripMargin
)
union_data_slabel.createOrReplaceTempView("union_data_slabel")
// union_data_slabel.show()
val union_data_ccity_name = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,
| c.name as ccity_name
|from union_data_slabel a
|left join src_mimas_prod_api_diary_tags b on a.cid_id=b.diary_id
|left join src_zhengxing_api_tag c on b.tag_id=c.id
| where c.tag_type=4
""".stripMargin
)
union_data_ccity_name.createOrReplaceTempView("union_data_ccity_name")
// union_data_ccity_name.show()
val union_data_scity_id = sc.sql(
s"""
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
|left join online.tl_hdfs_doctor_view c on b.doctor_id=c.id
|left join online.tl_hdfs_hospital_view d on c.hospital_id=d.id
|where b.partition_date='${yesteday}'
|and c.partition_date='${yesteday}'
|and d.partition_date='${yesteday}'
""".stripMargin
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_pre_data",SaveMode.Overwrite)
sc.stop()
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment