Commit abe587cc authored by 赵威's avatar 赵威

send info to dingtalk

parent f84b924f
......@@ -53,67 +53,84 @@ object Main extends LazyLogging {
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
// stream.map { user =>
// try {
// val deviceId = user.deviceId
// val projects = user.projects.toList
// val secondDemands = user.secondDemands.toList
// val secondPositions = user.secondPositions.toList
// val secondSolutions = user.secondSolutions.toList
// val cityId = user.cityId
// println(deviceId)
// // println(projects.mkString(" "))
// val diaryRead = Redis.getRead(deviceId, "diary")
// val tractateRead = Redis.getRead(deviceId, "tractate")
// val answerRead = Redis.getRead(deviceId, "answer")
// val diaryReq =
// ES.generateDiaryRequest(
// projects,
// secondDemands,
// secondPositions,
// secondSolutions,
// diaryRead,
// cityId,
// 300
// )
// val tractateReq =
// ES.generateTractateRequest(
// projects,
// secondDemands,
// secondPositions,
// secondSolutions,
// tractateRead,
// 300
// )
// val answerReq =
// ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
// val serviceDiaryReq =
// ES.generateServiceDiaryRequest(
// projects,
// secondDemands,
// secondPositions,
// secondSolutions,
// diaryRead,
// cityId,
// 100
// )
// Redis.save(ES.request(diaryReq), deviceId, "diary")
// Redis.save(ES.request(tractateReq), deviceId, "tractate")
// Redis.save(ES.request(answerReq), deviceId, "answer")
// Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
// logger.info(s"${user.eventCn} ${deviceId}")
// deviceId
// } catch {
// case e: Throwable =>
// e.printStackTrace()
// ""
// }
// }
stream.map { user =>
try {
val deviceId = user.deviceId
val projects = user.projects.toList
val secondDemands = user.secondDemands.toList
val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId
println(deviceId)
// println(projects.mkString(" "))
val diaryRead = Redis.getRead(deviceId, "diary")
val tractateRead = Redis.getRead(deviceId, "tractate")
val answerRead = Redis.getRead(deviceId, "answer")
val diaryReq =
ES.generateDiaryRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
300
)
val tractateReq =
ES.generateTractateRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
tractateRead,
300
)
val answerReq =
ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
val serviceDiaryReq =
ES.generateServiceDiaryRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
100
)
Redis.save(ES.request(diaryReq), deviceId, "diary")
Redis.save(ES.request(tractateReq), deviceId, "tractate")
Redis.save(ES.request(answerReq), deviceId, "answer")
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
logger.info(s"${user.eventCn} ${deviceId}")
if (deviceId == "64695DE0-B926-4188-9C62-D987DC20BEDF") {
DingTalk.send(
Map(
"deviceId" -> deviceId,
"eventCn" -> user.eventCn,
"action" -> user.action,
"cityId" -> user.cityId.toString,
"logTime" -> user.logTime.toString,
"projects" -> user.projects.mkString(" ")
)
)
}
deviceId
} catch {
case e: Throwable =>
DingTalk.send(
Map("Main.main" -> "stream.map", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception"
)
e.printStackTrace()
""
}
}
stream.print
env.execute("flink streaming user portrait")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment