Commit fb29399c authored by 赵威's avatar 赵威

get data

parent 5250cae6
package com.gmei.up package com.gmei.up
import java.util.Properties import java.util.Properties
// TODO remove import com.typesafe.scalalogging.LazyLogging
// import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.scala._ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream } import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
import org.apache.flink.streaming.connectors.kafka.{ import org.apache.flink.streaming.connectors.kafka.{
...@@ -29,15 +28,12 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] ...@@ -29,15 +28,12 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
Types.GENERIC(classOf[UserInfo]) Types.GENERIC(classOf[UserInfo])
} }
// TODO remove object Main extends LazyLogging {
// extends LazyLogging
object Main {
def main(args: Array[String]): Unit = def main(args: Array[String]): Unit =
// println("###############") // println("###############")
// val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1) // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// println("###############") // println("###############")
try { try {
// TODO logger
// logger.info("UserPortrait Flink Started") // logger.info("UserPortrait Flink Started")
val env = StreamExecutionEnvironment.getExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment
...@@ -112,8 +108,7 @@ object Main { ...@@ -112,8 +108,7 @@ object Main {
Redis.saveServiceDiary(ES.request(serviceDiaryReq), deviceId) Redis.saveServiceDiary(ES.request(serviceDiaryReq), deviceId)
// current time // current time
// TODO logger logger.info(s"${user.eventCn} ${deviceId}")
// logger.info(s"${user.eventCn} ${deviceId}")
val cost = streamTimeBegin - user.sendTimestamp val cost = streamTimeBegin - user.sendTimestamp
if (cost > 1000) { if (cost > 1000) {
...@@ -141,8 +136,7 @@ object Main { ...@@ -141,8 +136,7 @@ object Main {
Map("Main.main" -> "stream.map", "error" -> e.getStackTrace.mkString("\n")), Map("Main.main" -> "stream.map", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception" contentType = "exception"
) )
// TODO logger logger.error(e.getStackTrace.mkString("\n"))
// logger.error(e.getStackTrace.mkString("\n"))
e.printStackTrace() e.printStackTrace()
"" ""
} }
...@@ -157,8 +151,7 @@ object Main { ...@@ -157,8 +151,7 @@ object Main {
Map("method" -> "main", "error" -> e.getStackTrace.mkString("\n")), Map("method" -> "main", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception" contentType = "exception"
) )
// TODO logger logger.error(e.getStackTrace.mkString("\n"))
// logger.error(e.getStackTrace.mkString("\n"))
e.printStackTrace() e.printStackTrace()
} }
} }
...@@ -7,7 +7,6 @@ import com.redis.{ RedisClient, RedisClientPool } ...@@ -7,7 +7,6 @@ import com.redis.{ RedisClient, RedisClientPool }
import org.json4s._ import org.json4s._
import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{ read, write } import org.json4s.jackson.Serialization.{ read, write }
import org.json4s.jackson.JsonMethods._
object Redis { object Redis {
// TODO read from config file // TODO read from config file
...@@ -78,11 +77,10 @@ object Redis { ...@@ -78,11 +77,10 @@ object Redis {
pRc4.withClient { client => pRc4.withClient { client =>
contentFuture.foreach { seq => contentFuture.foreach { seq =>
val idPairs = seq.map(c => (c.id, c.serviceId.getOrElse("-1"))) val idPairs = seq.map(c => (c.id, c.serviceId.getOrElse("-1")))
val a = write(idPairs) val res = write(idPairs)
println(a)
if (idPairs.size > 0) { if (idPairs.size > 0) {
client.del(key) client.del(key)
client.set(key, a) client.set(key, res)
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment