Commit add72c44 authored by 王志伟's avatar 王志伟
parents 6c47ab88 ab79dc08
package com.gmei
import java.io.{File, PrintWriter, Serializable}
import com.gmei.GmeiConfig
import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import scopt.OptionParser
import scala.util.Random
object Data2FFM {
......@@ -54,7 +52,7 @@ object Data2FFM {
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")
val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val esmm_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
......@@ -64,6 +62,14 @@ object Data2FFM {
val column_list = esmm_data.columns
val esmm_pre_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_pre_data
""".stripMargin
).na.drop()
val max_stat_date = sc.sql(
s"""
|select max(stat_date) from esmm_train_data
......@@ -116,102 +122,27 @@ object Data2FFM {
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
sc.stop()
}
}
}
object Data2FFMInfer {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def writecsv(path:String,str:String): Unit ={
val writer = new PrintWriter(new File(path))
writer.write(str)
writer.close()
println("写入成功")
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")
val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val esmm_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_pre_data
""".stripMargin
).na.drop()
val column_list = esmm_data.columns
println(column_list.slice(0,2).toList)
val column_number = scala.collection.mutable.Map[String,Array[String]]()
for (i <- column_list){
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
}
println("dict")
val rdd = esmm_data.rdd.repartition(200)
val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString))
rdd.persist()
import sc.implicits._
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
val test = rdd.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
rdd_pre.persist()
val pre = rdd_pre.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_infer", SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
sc.stop()
}
}
}
\ No newline at end of file
}
......@@ -323,7 +323,7 @@ object EsmmPredData {
val union_data_scity_id = sc.sql(
s"""
|select distinct a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
|select a.stat_date,a.device_id,a.ucity_id,a.cid_id,a.diary_service_id,a.y,a.z,a.clevel1_id,a.slevel1_id,a.ccity_name,
| d.city_id as scity_id
|from union_data_ccity_name a
|left join online.tl_meigou_service_view b on a.diary_service_id=b.id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment