Commit 638cf1b3 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

change test file
parents 27a72180 8e7da259
......@@ -2,11 +2,17 @@ package com.gmei
import java.io.Serializable
import breeze.linalg.split
import com.gmei.WeafareStat.{defaultParams, parser}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
import org.apache.spark
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
object data_feed_exposure_precise {
......@@ -422,6 +428,338 @@ object data_feed_exposure_precise {
// GmeiConfig.writeToJDBCTable(config.getString("jerry.jdbcuri"),result, table="data_feed_exposure_precise",SaveMode.Append)
// println("写入完成")
}
}
}
object icon_train_data {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
sc.sql("use jerry_prod")
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
//日记本点击数据
val diary_click=sc.sql(
s"""
|select partition_date,cl_id as device_id, params['diary_id'] as diary_id
|from online.tl_hdfs_maidian_view
|where action = 'on_click_diary_card'
|and partition_date='${partition_date}'
|and params['diary_id'] is not null
""".stripMargin
)
diary_click.show()
diary_click.createOrReplaceTempView("diary_click")
val diary_tags = sc.sql(
s"""
|select * from online.tl_hdfs_diary_tags_view
|where partition_date = '${partition_date}'
""".stripMargin
)
diary_tags.createOrReplaceTempView("diary_tags")
// val tag_hierarchy_detail = sc.sql(
// s"""
// |select * from online.bl_tag_hierarchy_detail
// |where partition_date = '${partition_date}'
// """.stripMargin
// )
//
// tag_hierarchy_detail.createOrReplaceTempView("tag_hierarchy_detail")
val diary_tag_detail = sc.sql(
s"""
|select a.*,b.tag_id
|from diary_click a
|left join diary_tags b
|on a.diary_id=b.diary_id
""".stripMargin
)
diary_tag_detail.createOrReplaceTempView("diary_tag_detail")
val temp_diary = sc.sql(
s"""
|select '${stat_date}' as stat_date,device_id,tag_id
|from diary_tag_detail
""".stripMargin
)
temp_diary.show()
temp_diary.createOrReplaceTempView("temp_diary")
// 美购点击数据
val meigou_click=sc.sql(
s"""
|select partition_date,cl_id as device_id, params['service_id'] as service_id
|from online.tl_hdfs_maidian_view
|where action = 'goto_welfare_detail'
|and partition_date='${partition_date}'
|and params['service_id'] is not null
""".stripMargin
)
meigou_click.show()
meigou_click.createOrReplaceTempView("meigou_click")
val meigou_tags = sc.sql(
s"""
|select * from online.tl_meigou_servicetag_view
|where partition_date = '${partition_date}'
""".stripMargin
)
meigou_tags.createOrReplaceTempView("meigou_tags")
val meigou_tag_detail = sc.sql(
s"""
|select a.*,b.tag_id
|from meigou_click a
|left join meigou_tags b
|on a.service_id=b.service_id
""".stripMargin
)
meigou_tag_detail.createOrReplaceTempView("meigou_tag_detail")
val temp_meigou = sc.sql(
s"""
|select '${stat_date}' as stat_date,device_id,tag_id
|from diary_tag_detail
""".stripMargin
)
temp_meigou.createOrReplaceTempView("temp_meigou")
val final_train = sc.sql(
s"""
|select *
|from temp_diary
|union all
|select *
|from temp_meigou
""".stripMargin
)
final_train.show()
println("开始写入")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",final_train, table="icon_train_data",SaveMode.Append)
println("写入完成")
}
}
}
object tag_value {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
sc.sql("use jerry_prod")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date
//println(param.date)
val partition_date = stat_date.replace("-","")
val t1=0.25
val t2=0.5
val t3=1.0
//level_id到权重映射
val level1_id=sc.sql(
s"""
|select distinct(id) as level_id,'${t1}' as value
|from online.bl_tag_hierarchy_detail
|where partition_date = '${partition_date}'
|and id !=0
|and tag_type=1
""".stripMargin
)
level1_id.createOrReplaceTempView("level1_id")
val level2_id=sc.sql(
s"""
|select distinct(id) as level_id,'${t2}' as value
|from online.bl_tag_hierarchy_detail
|where partition_date = '${partition_date}'
|and id !=0
|and tag_type=2
""".stripMargin
)
level2_id.createOrReplaceTempView("level2_id")
val level3_id=sc.sql(
s"""
|select distinct(id) as level_id,'${t3}' as value
|from online.bl_tag_hierarchy_detail
|where partition_date = '${partition_date}'
|and id !=0
|and tag_type=3
""".stripMargin
)
level3_id.createOrReplaceTempView("level3_id")
val level_value=sc.sql(
s"""
|select *
|from level1_id
|union all
|select *
|from level2_id
|union all
|select *
|from level3_id
""".stripMargin
)
// tag_value.show(300)
println("开始写入")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",level_value, table="tagId_value",SaveMode.Overwrite)
println("写入完成")
//level_id到index映射
val level_index_temp=sc.sql(
s"""
|select id as tag_id
|from online.bl_tag_hierarchy_detail
|where partition_date = '${partition_date}'
|and id !=0
""".stripMargin
)
level_index_temp.createOrReplaceTempView("level_index_temp")
// temp1.createOrReplaceTempView("temp1")
// import implicit_
val result = level_index_temp.select("tag_id").distinct().rdd.map{x => x.toString().substring(1,x.toString().length - 1)}.zipWithIndex().toDF("level_id","index")
val test=result.select(result.col("level_id").cast(DoubleType).as("level_id"),result.col("index").cast(DoubleType).as("index"))
// val resDf = spark.createDataFrame(rowRdd)
// val temp2=sc.sql(
// s"""
// |select concat_ws(',',tag_id,level_id) as tag_level from temp1
// """.stripMargin
// )
// temp2.show()
// temp2.createOrReplaceTempView("temp2")
// val w = Window.orderBy("level_id")
// val result = temp1.select("level_id").distinct().withColumn("index", row_number().over(w))
// result.show()
// val tag_level_index=result.rdd.map(row=>(row(0).toString,row(1).toString)).map(row=>(row._1.split(",").head,row._1.split(",")(1),row._2)).toDF("tag_id","level_id","index")
// val colNames = tag_level_index.columns
// val cols = colNames.map(f => f.toInt)
//
// val test=tag_level_index.select(tag_level_index.col("tag_id").cast(DoubleType).as("tag_id"),tag_level_index.col("level_id").cast(DoubleType).as("level_id"),tag_level_index.col("index").cast(DoubleType).as("index"))
println("开始写入")
GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",test, table="tag_level_index",SaveMode.Overwrite)
println("写入完成")
// zhengxing库里面的数据同步到jerry_prod
}
......
......@@ -51,8 +51,8 @@ object testt {
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
// ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
// val stat_date = GmeiConfig.getMinusNDate(1)
val stat_date=param.date
val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date=param.date
val partition_date = stat_date.replace("-","")
//机构id
val blacklist = sc.sql(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment