Commit d849ca0b authored by 赵威's avatar 赵威

save msg

parent 39714577
......@@ -46,7 +46,7 @@ object Main extends LazyLogging {
)
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device",
"gm-portrait-update-device-test",
new UserInfoDeserializationSchema,
kafkaConsumerProperties
)
......
......@@ -67,23 +67,30 @@ object Redis {
def saveServiceDiary(
contentEitherFuture: Either[Throwable, Future[IndexedSeq[Content]]],
deviceId: String
): Unit = {
implicit val formats = Serialization.formats(NoTypeHints)
): Unit =
try {
implicit val formats = Serialization.formats(NoTypeHints)
val key = s"streaming:candidate:service_diary:device_id:${deviceId}"
contentEitherFuture match {
case Left(e) => e.printStackTrace()
case Right(contentFuture) =>
pRc4.withClient { client =>
contentFuture.foreach { seq =>
val idPairs = seq.map(c => (c.id, c.serviceId.getOrElse("-1")))
val res = write(idPairs)
if (idPairs.size > 0) {
client.del(key)
client.set(key, res)
val key = s"streaming:candidate:service_diary:device_id:${deviceId}"
contentEitherFuture match {
case Left(e) => e.printStackTrace()
case Right(contentFuture) =>
pRc4.withClient { client =>
contentFuture.foreach { seq =>
val idPairs = seq.map(c => (c.id, c.serviceId.getOrElse("-1")))
val res = write(idPairs)
if (idPairs.size > 0) {
client.del(key)
client.set(key, res)
}
}
}
}
}
} catch {
case e: Throwable =>
DingTalk.send(
Map("method" -> "Redis.saveServiceDiary", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception"
)
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment