Commit 1e07f3e7 authored by 王志伟's avatar 王志伟
parents f85e8d00 1a38d11a
...@@ -55,88 +55,99 @@ object Data2FFM { ...@@ -55,88 +55,99 @@ object Data2FFM {
// val yesteday_have_seq = GmeiConfig.getMinusNDate(5) // val yesteday_have_seq = GmeiConfig.getMinusNDate(5)
val esmm_data = sc.sql( val esmm_data = sc.sql(
s""" s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id |select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name from esmm_train_data
|from esmm_train_data
""".stripMargin """.stripMargin
).na.drop() ).na.drop()
val column_list = esmm_data.columns val column_list = esmm_data.columns
val esmm_pre_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,diary_service_id,clevel1_id,slevel1_id,ccity_name,scity_id
|from esmm_pre_data
""".stripMargin
).na.drop()
val max_stat_date = sc.sql( val max_stat_date = sc.sql(
s""" s"""
|select max(stat_date) from esmm_train_data |select max(stat_date) from esmm_train_data
""".stripMargin """.stripMargin
) )
println("------------------------") println("------------------------")
val max_stat_date_str = max_stat_date.collect().map(s => val max_stat_date_str = max_stat_date.collect().map(s => s(0).toString).head
s(0).toString
).head
println(max_stat_date_str) println(max_stat_date_str)
println(column_list.slice(0,2).toList) println(column_list.slice(0,2).toList)
val column_number = scala.collection.mutable.Map[String,Array[String]]() val column_number = scala.collection.mutable.Map[String,Array[String]]()
for (i <- column_list){ for (i <- column_list){
column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString) column_number(i) = esmm_data.select(i).distinct().collect().map(x => x(0).toString)
} }
println("dict") println("dict")
val rdd = esmm_data.rdd.repartition(200) val rdd = esmm_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString, x(7).toString))
x(7).toString,x(8).toString,x(9).toString,x(10).toString))
rdd.persist() rdd.persist()
import sc.implicits._ import sc.implicits._
val train = rdd.filter(x => x._4 != max_stat_date_str) val train = rdd.filter(x => x._4 != max_stat_date_str)
.map(x => (x._1,x._2,x._3, .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._8),x._5,x._6))
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data") println("train")
train.show(6)
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, train, "esmm_data2ffm_train", SaveMode.Overwrite)
val test = rdd.filter(x => x._4 == max_stat_date_str) val test = rdd.filter(x => x._4 == max_stat_date_str)
.map(x => (x._1,x._2,x._3, .map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._8),x._5,x._6))
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data") println("test")
test.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, test, "esmm_data2ffm_cv", SaveMode.Overwrite)
val esmm_pre_data = sc.sql(
s"""
|select device_id,y,z,stat_date,ucity_id,cid_id,clevel1_id,ccity_name
|from esmm_pre_data
""".stripMargin
).na.drop()
val esmm_pre_cids = esmm_pre_data.select("cid_id").distinct().collect().map(
s => s(0).toString
)
val esmm_pre_city = esmm_pre_data.select("ucity_id").distinct().collect().map(
s => s(0).toString
)
val esmm_pre_device = esmm_pre_data.select("device_id").distinct().collect().map(
s => s(0).toString
)
val esmm_join_cids = esmm_pre_cids.intersect(column_number("cid_id"))
val esmm_join_city = esmm_pre_city.intersect(column_number("ucity_id"))
val esmm_join_device = esmm_pre_device.intersect(column_number("device_id"))
val rdd_pre = esmm_pre_data.rdd.repartition(200) val rdd_pre = esmm_pre_data.rdd.repartition(200)
.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString, .map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString,
x(4).toString,x(5).toString,x(6).toString, x(4).toString,x(5).toString,x(6).toString,
x(7).toString,x(8).toString,x(9).toString,x(10).toString)) x(7).toString)).filter(x => esmm_join_cids.indexOf(x._6) != -1)
rdd_pre.persist() .filter(x => esmm_join_city.indexOf(x._5) != -1)
val pre = rdd_pre.map(x => (x._1,x._2,x._3, .filter(x => esmm_join_device.indexOf(x._1) != -1)
val pre = rdd_pre.map(x => (x._1,x._2,x._3,column_number("device_id").indexOf(x._1),
column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5), column_number("stat_date").indexOf(x._4), column_number("ucity_id").indexOf(x._5),
column_number("cid_id").indexOf(x._6), column_number("diary_service_id").indexOf(x._7), column_number("cid_id").indexOf(x._6), column_number("clevel1_id").indexOf(x._7),
column_number("clevel1_id").indexOf(x._8), column_number("slevel1_id").indexOf(x._9), column_number("ccity_name").indexOf(x._8),x._5,x._6))
column_number("ccity_name").indexOf(x._10), column_number("scity_id").indexOf(x._11))) .map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0".
.map(x => ((new util.Random).nextInt(2147483647),x._2,x._3,"1:%d:1.0 2:%d:1.0 3:%d:1.0 4:%d:1.0 5:%d:1.0 6:%d:1.0 7:%d:1.0 8:%d:1.0". format(x._4,x._5,x._6,x._7,x._8,x._9),x._1,x._10,x._11)).zipWithIndex()
format(x._4,x._5,x._6,x._7,x._8,x._9,x._10,x._11))).zipWithIndex() .map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4,x._1._5,x._1._6,x._1._7))
.map(x => (x._1._1,x._2,x._1._2,x._1._3,x._1._4)) .map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5,x._6,x._7,x._8)).toDF("number","data","device_id","city_id","cid")
.map(x => (x._1,x._2+","+x._3+","+x._4+","+x._5)).toDF("number","data") println("pre")
pre.show(6)
GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbcuri, pre, "esmm_data2ffm_infer", SaveMode.Overwrite)
......
...@@ -9,6 +9,8 @@ import scopt.OptionParser ...@@ -9,6 +9,8 @@ import scopt.OptionParser
import com.gmei.lib.AbstractParams import com.gmei.lib.AbstractParams
import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.lit
import scala.util.Try
object EsmmData { object EsmmData {
...@@ -237,31 +239,74 @@ object EsmmPredData { ...@@ -237,31 +239,74 @@ object EsmmPredData {
val ti = new TiContext(sc) val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags") ti.tidbMapTable(dbName = "eagle",tableName = "src_mimas_prod_api_diary_tags")
ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag") ti.tidbMapTable(dbName = "eagle",tableName = "src_zhengxing_api_tag")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "merge_queue_table")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure") ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_exposure")
ti.tidbMapTable("jerry_prod", "nd_device_cid_similarity_matrix")
ti.tidbMapTable("eagle","ffm_diary_queue")
ti.tidbMapTable("eagle","search_queue")
ti.tidbMapTable(dbName = "jerry_test",tableName = "esmm_train_data")
import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(1) val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
val activate_data = sc.sql( // val activate_data = sc.sql(
// s"""
// |select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id
// |from merge_queue_table a
// |left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id
// |where b.stat_date ='${yesteday_have_seq}'
// |and b.device_id is not null
// """.stripMargin
// )
val raw_data = sc.sql(
s""" s"""
|select a.device_id,a.city_id as ucity_id,explode(split(a.search_queue, ',')) as cid_id |select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
|from merge_queue_table a |(select device_id,city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
|left join data_feed_exposure b on a.device_id=b.device_id and a.city_id=b.city_id |union
|where b.stat_date ='${yesteday_have_seq}' |select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|and b.device_id is not null |union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from esmm_train_data)
""".stripMargin """.stripMargin
) )
raw_data.show()
val raw_data = sc.sql(
val raw_data1 = raw_data.rdd.groupBy(_.getAs[String]("device_city")).map {
case (device_city, cid_data) =>
val device_id = Try(device_city.split(",")(0)).getOrElse("")
val city_id = Try(device_city.split(",")(1)).getOrElse("")
val cids = Try(cid_data.toSeq.map(_.getAs[String]("merge_queue").split(",")).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1).distinct.take(300).mkString(",")).getOrElse("")
(device_id,city_id ,s"$cids")
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
raw_data1.createOrReplaceTempView("raw_data1")
println(raw_data1.count())
val raw_data2 = sc.sql(
s""" s"""
|select device_id,city_id as ucity_id, explode(split(search_queue, ',')) as cid_id |select device_id,city_id,merge_queue from raw_data1 limit 10000
|from merge_queue_table
""".stripMargin """.stripMargin
) )
activate_data.createOrReplaceTempView("raw_data") raw_data2.createOrReplaceTempView("raw_data2")
println(raw_data2.count())
raw_data2.show()
val raw_data3 = sc.sql(
s"""
|select device_id,city_id as ucity_id,explode(split(merge_queue, ',')) as cid_id from raw_data2
""".stripMargin
)
raw_data3.createOrReplaceTempView("raw_data")
println(raw_data3.count())
// activate_data.createOrReplaceTempView("raw_data")
// raw_data.show() // raw_data.show()
import sc.implicits._
val yesteday = GmeiConfig.getMinusNDate(1).replace("-","") val yesteday = GmeiConfig.getMinusNDate(1).replace("-","")
val sid_data = sc.sql( val sid_data = sc.sql(
...@@ -274,7 +319,8 @@ object EsmmPredData { ...@@ -274,7 +319,8 @@ object EsmmPredData {
|where b.partition_date = '${yesteday}' |where b.partition_date = '${yesteday}'
""".stripMargin """.stripMargin
) )
sid_data.show() // sid_data.show()
println(sid_data.count())
val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0)) val sid_data_label = sid_data.withColumn("y",lit(0)).withColumn("z",lit(0))
sid_data_label.createOrReplaceTempView("union_data") sid_data_label.createOrReplaceTempView("union_data")
...@@ -336,7 +382,7 @@ object EsmmPredData { ...@@ -336,7 +382,7 @@ object EsmmPredData {
""".stripMargin """.stripMargin
) )
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show() println(union_data_scity_id.count())
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_pre_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_pre_data",SaveMode.Overwrite)
......
...@@ -33,7 +33,7 @@ object GmeiConfig extends Serializable { ...@@ -33,7 +33,7 @@ object GmeiConfig extends Serializable {
def getSparkSession():(SparkContext, SparkSession) = { def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf val sparkConf = new SparkConf
sparkConf.set("spark.sql.crossJoin.enabled", "true") sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000") sparkConf.set("spark.sql.broadcastTimeout", "6000")
if (!sparkConf.contains("spark.master")) { if (!sparkConf.contains("spark.master")) {
......
#! -*- coding: utf8 -*-
import pymysql import pymysql
import pandas as pd import pandas as pd
from multiprocessing import Pool from multiprocessing import Pool
...@@ -21,7 +23,7 @@ def con_sql(db,sql): ...@@ -21,7 +23,7 @@ def con_sql(db,sql):
def get_data(): def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select * from esmm_data where stat_date >= '2018-11-20'" sql = "select * from esmm_data where stat_date >= '2018-11-20' limit 6"
esmm = con_sql(db,sql) esmm = con_sql(db,sql)
esmm = esmm.rename(columns={0:"stat_date",1: "device_id",2:"ucity_id",3:"cid_id",4:"diary_service_id",5:"y", esmm = esmm.rename(columns={0:"stat_date",1: "device_id",2:"ucity_id",3:"cid_id",4:"diary_service_id",5:"y",
6:"z",7:"clevel1_id",8:"slevel1_id"}) 6:"z",7:"clevel1_id",8:"slevel1_id"})
...@@ -29,13 +31,13 @@ def get_data(): ...@@ -29,13 +31,13 @@ def get_data():
print(esmm.head()) print(esmm.head())
print(esmm.shape) print(esmm.shape)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select * from home_tab_click" sql = "select * from home_tab_click limit 6"
temp = con_sql(db,sql) temp = con_sql(db,sql)
temp = temp.rename(columns={0: "device_id"}) temp = temp.rename(columns={0: "device_id"})
print("click data ok") print("click data ok")
# print(temp.head()) # print(temp.head())
df = pd.merge(esmm,temp,on = "device_id",how='left').fillna(0) df = pd.merge(esmm,temp,on = "device_id",how='left').fillna(0)
print("合并后:") # print("合并后:")
print(df.shape) print(df.shape)
df["diary_service_id"] = df["diary_service_id"].astype("str") df["diary_service_id"] = df["diary_service_id"].astype("str")
...@@ -53,7 +55,7 @@ def get_data(): ...@@ -53,7 +55,7 @@ def get_data():
def transform(df): def transform(df):
model = multiFFMFormatPandas() model = multiFFMFormatPandas()
df = model.fit_transform(df, y="y", n=80000, processes=20) df = model.fit_transform(df, y="y", n=80000, processes=10)
df = pd.DataFrame(df) df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0]) df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1]) df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
...@@ -135,9 +137,9 @@ class multiFFMFormatPandas: ...@@ -135,9 +137,9 @@ class multiFFMFormatPandas:
col_type = t[col] col_type = t[col]
name = '{}_{}'.format(col, val) name = '{}_{}'.format(col, val)
if col_type.kind == 'O': if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]+1)) ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind == 'i': elif col_type.kind == 'i':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col]+1, val)) ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm) result = ' '.join(ffm)
if self.y is not None: if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result result = str(row.loc[row.index == self.y][0]) + "," + result
......
import time
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
conf = SparkConf().setMaster("spark://10.30.181.88:7077").setAppName("My app")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
for i in range(1,100):
print(i)
time.sleep(5)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment