Commit 6bffb6ec authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add test file
parents 8e34ccfe 0fd944d6
package com.gmei
import java.io.{File, PrintWriter, Serializable}
import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, TiContext}
import scopt.OptionParser
object Data2FFM {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def writecsv(path:String,str:String): Unit ={
val writer = new PrintWriter(new File(path))
writer.write(str)
writer.close()
println("写入成功")
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_train_data")
ti.tidbMapTable(dbName = "jerry_test", tableName = "esmm_pre_data")
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val esmm_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_train_data
""".stripMargin
).na.drop()
val column_list = esmm_data.columns
val esmm_pre_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_pre_data
""".stripMargin
).na.drop()
val max_stat_date = sc.sql(
s"""
|select max(stat_date) from esmm_train_data
""".stripMargin
)
println("------------------------")
val max_stat_date_str = max_stat_date.collect().map(s =>
s(0).toString
).head
println(max_stat_date_str)
println(column_list.slice(0,2).toList)
val column_number = scala.collection.mutable.Map[String,Array[String]]()
for (i <- column_list){
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
}
println("dict")
val rdd = esmm_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString))
rdd.persist()
import sc.implicits._
val train = rdd.filter(x => x._4 != max_stat_date_str)
.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)
val test = rdd.filter(x => x._4 == max_stat_date_str)
.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString))
rdd_pre.persist()
val pre = rdd_pre.map(x => (x._1,x._2,x._3,
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9),
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11)))
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0".
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex()
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4))
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data")
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
sc.stop()
}
}
}
......@@ -241,13 +241,14 @@ object EsmmPredData {
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
val activate_data = sc.sql(
s"""
|select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
|from merge_queue_table a
|left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
|where b.stat_date >'${yesteday_have_seq}'
|where b.stat_date ='${yesteday_have_seq}'
|and b.device_id is not null
""".stripMargin
)
......
......@@ -206,6 +206,7 @@ object WeafareStat {
val result5 =result4.withColumn("diary_expoure_meigou_rate",result.col("diary_meigou_count")/result.col("diary_expoure"))
result5.show()
GmeiConfig.writeToJDBCTable(result5, "diary_meigou_cvr", SaveMode.Append)
sc.stop()
......
......@@ -44,27 +44,64 @@ object testt {
ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
val view_count = sc.sql(
//机构id
// val agency_id = sc.sql(
// s"""
// |SELECT DISTINCT(cl_id) as device_id
// |FROM online.ml_hospital_spam_pv_day
// |WHERE partition_date >= '20180402'
// |AND partition_date <= '20181203'
// |AND pv_ratio >= 0.95
// |UNION ALL
// |SELECT DISTINCT(cl_id) as device_id
// |FROM online.ml_hospital_spam_pv_month
// |WHERE partition_date >= '20171101'
// |AND partition_date <= '20181203'
// |AND pv_ratio >= 0.95
// """.stripMargin
// )
//// agency_id.show()
// agency_id.createOrReplaceTempView("agency_id")
val a = Array("answer_detail","article_detail","diary_detail","home_page","question_detail","search_content","search_diary")
for (i <- 0 until a.length){
val diary_pv = sc.sql(
s"""
|select params["business_id"] as diary_id,(params["out"]-params["in"]) as dur_time
|from online.tl_hdfs_maidian_view
|where action="page_view"
|and params["page_name"]="diary_detail"
|and partition_date >='20180901'
""".stripMargin
|select partition_date,count(cl_id)
|from online.ml_community_exposure_detail
|where business_type ='diary'
|and partition_date >='20181120'
|and event='${a(i)}'
|GROUP BY partition_date
|order by partition_date
""".stripMargin
)
view_count.show()
view_count.createOrReplaceTempView("temp")
println("该来源的数据为:"+ a(i))
diary_pv.show()
}
// val diary_pv = sc.sql(
// s"""
// |select partition_date,count(cl_id)
// |from online.tl_hdfs_maidian_view
// |where action ='on_click_diary_card'
// |and params["page_name"]="home"
// |and params["tab_name"]="精选"
// |and partition_date >='20181120'
// |GROUP BY partition_date
// |order by partition_date
// """.stripMargin
// )
// diary_pv.show()
GmeiConfig.writeToJDBCTable(view_count, "avg", SaveMode.Overwrite)
// GmeiConfig.writeToJDBCTable(view_count, "avg", SaveMode.Overwrite)
val result = view_count
result.show()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment