Commit 64fb1491 authored by 高雅喆's avatar 高雅喆

add device map cid func

parent 1a24af8e
...@@ -3,6 +3,8 @@ package com.gmei ...@@ -3,6 +3,8 @@ package com.gmei
import java.util.Properties import java.util.Properties
import java.io.Serializable import java.io.Serializable
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.typesafe.config._ import com.typesafe.config._
import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.{SparkConf,SparkContext}
...@@ -30,14 +32,6 @@ object GmeiConfig extends Serializable { ...@@ -30,14 +32,6 @@ object GmeiConfig extends Serializable {
this this
} }
//这种情况下的param1的值为null,所以会出现空指针异常的错误
// val env = param1.env match {
// case "prod" => ENV.PROD
// case "dev" => ENV.DEV
// case "pre" => ENV.PRE
// case _ => ENV.DEV
// }
// val config = initConfig(env)
def initConfig(env: ENV) = { def initConfig(env: ENV) = {
lazy val c = ConfigFactory.load() lazy val c = ConfigFactory.load()
...@@ -47,6 +41,7 @@ object GmeiConfig extends Serializable { ...@@ -47,6 +41,7 @@ object GmeiConfig extends Serializable {
def getSparkSession():(SparkContext, SparkSession) = { def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf val sparkConf = new SparkConf
sparkConf.set("spark.sql.crossJoin.enabled", "true") sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "100")
if (!sparkConf.contains("spark.master")) { if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[3]") sparkConf.setMaster("local[3]")
...@@ -86,4 +81,14 @@ object GmeiConfig extends Serializable { ...@@ -86,4 +81,14 @@ object GmeiConfig extends Serializable {
println(jdbcuri, table) println(jdbcuri, table)
writeToJDBCTable(jdbcuri, df, table, saveMode) writeToJDBCTable(jdbcuri, df, table, saveMode)
} }
def getMinusNDate(n: Int):String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-n)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}
} }
package com.gmei package com.gmei
import java.io.Serializable import java.io.Serializable
import scala.util.Try
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix} import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.sql.{SaveMode,TiContext} import org.apache.spark.sql.{SaveMode, TiContext}
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import scopt.OptionParser import scopt.OptionParser
import com.gmei.lib.AbstractParams import com.gmei.lib.AbstractParams
...@@ -12,9 +14,6 @@ import com.soundcloud.lsh.Lsh ...@@ -12,9 +14,6 @@ import com.soundcloud.lsh.Lsh
object Main { object Main {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
...@@ -73,17 +72,11 @@ object Main { ...@@ -73,17 +72,11 @@ object Main {
.action((x, c) => c.copy(nodePath = x)) .action((x, c) => c.copy(nodePath = x))
note( note(
""" """
|For example, the following command runs this app on a synthetic dataset: |For example, the following command runs this app on a tidb dataset:
| |
| bin/spark-submit --class com.nhn.sunny.vegapunk.ml.model.Node2vec \ | spark-submit --class com.gmei.Main ./target/scala-2.11/Node2vec-assembly-0.2.jar \
""".stripMargin + """.stripMargin +
s"| --lr ${defaultParams.lr}" + s"| --env ${defaultParams.env}"
s"| --iter ${defaultParams.iter}" +
s"| --numPartition ${defaultParams.numPartition}" +
s"| --dim ${defaultParams.dim}" +
s"| --window ${defaultParams.window}" +
s"| --node <nodeFilePath>" +
s"| --output <path>"
) )
} }
...@@ -98,13 +91,17 @@ object Main { ...@@ -98,13 +91,17 @@ object Main {
val sc = GmeiConfig.getSparkSession()._2 val sc = GmeiConfig.getSparkSession()._2
val ti = new TiContext(sc) val ti = new TiContext(sc)
ti.tidbMapTable(dbName = GmeiConfig.config.getString("tidb.database"),tableName = "data_meigou_cid") ti.tidbMapTable(dbName = GmeiConfig.config.getString("tidb.database"),tableName = "nd_data_meigou_cid")
ti.tidbMapTable(dbName = GmeiConfig.config.getString("tidb.database"),tableName = "data_feed_click")
val date8 = GmeiConfig.getMinusNDate(8)
val tidb_inupt = sc.sql( val tidb_inupt = sc.sql(
s""" s"""
|SELECT |SELECT
| service_id,cid | service_id,cid
|FROM data_meigou_cid |FROM nd_data_meigou_cid
|where stat_date > '${date8}'
""".stripMargin """.stripMargin
) )
...@@ -170,10 +167,10 @@ object Main { ...@@ -170,10 +167,10 @@ object Main {
(word1, word2, entry.value) (word1, word2, entry.value)
} }
remapSecond.take(20).foreach(println) // remapSecond.take(20).foreach(println)
val score_result = remapSecond.toDF("cid1","cid2","score") val score_result = remapSecond.toDF("cid1","cid2","score")
GmeiConfig.writeToJDBCTable(score_result, table="cid_pairs_cosine_distince", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(score_result, table="nd_cid_pairs_cosine_distince", SaveMode.Overwrite)
// group by neighbours to get a list of similar words and then take top k // group by neighbours to get a list of similar words and then take top k
...@@ -183,11 +180,41 @@ object Main { ...@@ -183,11 +180,41 @@ object Main {
val similar = similarWords.toSeq.sortBy(-1 * _._3).filter(_._2.startsWith("diary")).take(10).map(_._2).mkString(",") val similar = similarWords.toSeq.sortBy(-1 * _._3).filter(_._2.startsWith("diary")).take(10).map(_._2).mkString(",")
(word1,s"$similar") (word1,s"$similar")
} }
// print out the results for the first 10 words // result.take(20).foreach(println)
result.take(20).foreach(println)
val similar_result = result.toDF("cid","similarity_cid") val similar_result = result.toDF("cid","similarity_cid")
GmeiConfig.writeToJDBCTable(similar_result, table="cid_similarity_matrix", SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(similar_result, table="nd_cid_similarity_matrix", SaveMode.Overwrite)
//3. cids queue map to device_id
ti.tidbMapTable(dbName = GmeiConfig.config.getString("tidb.database"),tableName = "nd_cid_similarity_matrix")
val device_id = sc.sql(
s"""
|select a.device_id device_id,b.similarity_cid similarity_cid from
|(select device_id,first(cid) as cid from data_feed_click
|where cid_type='diary'
|and stat_date > '${date8}'
|group by device_id) a left join
|nd_cid_similarity_matrix b
|on a.cid = b.cid
|where b.similarity_cid is not null
""".stripMargin
)
device_id.show()
val device_queue = device_id.rdd.map {item =>
val parts = (item.getAs[String](fieldName = "device_id"),item.getAs[String](fieldName = "similarity_cid"))
Try {
(parts._1, Try(parts._2.toString.replace("diary|","")).getOrElse(null))
}.getOrElse(null)
}.filter(_!=null).toDF("device_id","similarity_cid")
device_queue.take(20).foreach(println)
GmeiConfig.writeToJDBCTable(device_queue, table="nd_device_cid_similarity_matrix", SaveMode.Overwrite)
} }
} getOrElse { } getOrElse {
...@@ -195,3 +222,5 @@ object Main { ...@@ -195,3 +222,5 @@ object Main {
} }
} }
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
object TestFunc {
def getNowDate():String= {
var now: Date = new Date()
var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var hehe = dateFormat.format(now)
hehe
}
def getMinusNDate(n: Int):String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-n)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}
def main(args: Array[String]) {
println("现在时间:"+getNowDate())
println("一周之前时间:"+getMinusNDate(8))
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment