Commit bbe53666 authored by 赵威's avatar 赵威

remove test

parent feb95858
...@@ -45,9 +45,8 @@ object Main extends LazyLogging { ...@@ -45,9 +45,8 @@ object Main extends LazyLogging {
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092" "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
) )
// TODO remove test
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo]( val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device-test", "gm-portrait-update-device",
new UserInfoDeserializationSchema, new UserInfoDeserializationSchema,
kafkaConsumerProperties kafkaConsumerProperties
) )
......
...@@ -34,34 +34,47 @@ object Redis { ...@@ -34,34 +34,47 @@ object Redis {
): Unit = { ): Unit = {
val key = s"streaming:candidate:${contentType}:device_id:${deviceId}" val key = s"streaming:candidate:${contentType}:device_id:${deviceId}"
contentEitherFuture match { try {
case Left(e) => e.printStackTrace() contentEitherFuture match {
case Right(contentFuture) => case Left(e) => e.printStackTrace()
pRc4Pool.withClient { client => case Right(contentFuture) =>
contentFuture.foreach { seq => pRc4Pool.withClient { client =>
val ids = seq.map(c => c.id) contentFuture.foreach { seq =>
if (ids.size > 0) { val ids = seq.map(c => c.id)
client.del(key) if (ids.size > 0) {
ids.foreach { id => client.del(key)
// println(id) ids.foreach { id =>
client.rpush(key, id) // println(id)
} client.rpush(key, id)
client.expire(key, 60 * 60 * 24 * 15) }
val total = System.currentTimeMillis - timeBegin client.expire(key, 60 * 60 * 24 * 15)
if (total > 1000 && total < 50000) { val total = System.currentTimeMillis - timeBegin
DingTalk.send( if (total > 1000 && total < 50000) {
Map( DingTalk.send(
"method" -> "Redis.save", Map(
"deviceId" -> deviceId, "method" -> "Redis.save",
"contentType" -> contentType, "deviceId" -> deviceId,
"description" -> "从流开始处理到存入结果的总耗时", "contentType" -> contentType,
"streamTotalSeconds" -> s"${total}ms" "description" -> "从流开始处理到存入结果的总耗时",
"streamTotalSeconds" -> s"${total}ms"
)
) )
) }
} }
} }
} }
} }
} catch {
case e: Throwable =>
DingTalk.send(
Map(
"method" -> "Redis.save",
"deviceId" -> deviceId,
"contentType" -> contentType,
"error" -> e.getStackTrace.mkString("\n")
),
contentType = "exception"
)
} }
} }
...@@ -88,7 +101,12 @@ object Redis { ...@@ -88,7 +101,12 @@ object Redis {
} catch { } catch {
case e: Throwable => case e: Throwable =>
DingTalk.send( DingTalk.send(
Map("method" -> "Redis.saveServiceDiary", "error" -> e.getStackTrace.mkString("\n")), Map(
"method" -> "Redis.saveServiceDiary",
"deviceId" -> deviceId,
"contentType" -> "service_diary",
"error" -> e.getStackTrace.mkString("\n")
),
contentType = "exception" contentType = "exception"
) )
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment