Commit 17f2cfce authored by 王志伟's avatar 王志伟

取icon训练数据

parent 26799554
......@@ -2,6 +2,7 @@ package com.gmei
import java.io.Serializable
import breeze.linalg.split
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.SaveMode
import org.apache.log4j.{Level, Logger}
......@@ -428,3 +429,131 @@ object data_feed_exposure_precise {
}
}
object icon_train_data {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
sc.sql("use jerry_prod")
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
//日记本点击数据
val diary_click=sc.sql(
s"""
|select partition_date,cl_id as device_id, params['diary_id'] as diary_id
|from online.tl_hdfs_maidian_view
|where action = 'on_click_diary_card'
|and partition_date='${partition_date}'
""".stripMargin
)
diary_click.show()
diary_click.createOrReplaceTempView("diary_click")
val diary_tags = sc.sql(
s"""
|select * from online.tl_hdfs_diary_tags_view
|where partition_date = '${partition_date}'
""".stripMargin
)
diary_tags.createOrReplaceTempView("diary_tags")
val tag_hierarchy_detail = sc.sql(
s"""
|select * from online.bl_tag_hierarchy_detail
|where partition_date = '${partition_date}'
""".stripMargin
)
tag_hierarchy_detail.createOrReplaceTempView("tag_hierarchy_detail")
val diary_tag_detail = sc.sql(
s"""
|select a.*,b.tag_id,c.level1_id,c.level2_id,c.level3_id
|from diary_click a
|left join diary_tags b
|on a.diary_id=b.diary_id
|left join tag_hierarchy_detail c
|on b.tag_id=c.id
|where level1_id is not null
""".stripMargin
)
diary_tag_detail.createOrReplaceTempView("diary_tag_detail")
val temp_diary = sc.sql(
s"""
|select device_id,concat_ws(':',level1_id,'0.25') as level1_value,concat_ws(':',level2_id,'0.5') as level2_value,concat_ws(':',level3_id,'1') as level3_value
|from diary_tag_detail
""".stripMargin
)
temp_diary.createOrReplaceTempView("temp_diary")
val test_diary = sc.sql(
s"""
|select device_id,concat_ws(',',level1_value,level2_value,level3_value) as tag_list
|from temp_diary
""".stripMargin
)
test_diary.show()
// val df_explode = test_diary.withColumn("e", explode(split(test_diary['tag_list'], ","))).drop("tag_list")
// print("写入开始")
//
// GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",result, table="data_feed_exposure_precise",SaveMode.Append)
// println("写入完成")
// println("开始写入")
// GmeiConfig.writeToJDBCTable(config.getString("jerry.jdbcuri"),result, table="data_feed_exposure_precise",SaveMode.Append)
// println("写入完成")
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment