Commit 757e8c9f authored by 赵威's avatar 赵威

save service id

parent 589759b8
...@@ -30,7 +30,7 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] ...@@ -30,7 +30,7 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
} }
// TODO remove // TODO remove
// extends LazyLogging // extends LazyLogging
object Main { object Main {
def main(args: Array[String]): Unit = def main(args: Array[String]): Unit =
// println("###############") // println("###############")
...@@ -109,7 +109,7 @@ object Main { ...@@ -109,7 +109,7 @@ object Main {
Redis.save(ES.request(diaryReq), deviceId, "diary", streamTimeBegin) Redis.save(ES.request(diaryReq), deviceId, "diary", streamTimeBegin)
Redis.save(ES.request(tractateReq), deviceId, "tractate", streamTimeBegin) Redis.save(ES.request(tractateReq), deviceId, "tractate", streamTimeBegin)
Redis.save(ES.request(answerReq), deviceId, "answer", streamTimeBegin) Redis.save(ES.request(answerReq), deviceId, "answer", streamTimeBegin)
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary", streamTimeBegin) Redis.saveServiceDiary(ES.request(serviceDiaryReq), deviceId)
// current time // current time
// TODO logger // TODO logger
......
...@@ -57,7 +57,6 @@ object ES { ...@@ -57,7 +57,6 @@ object ES {
implicit object ContentHitReader extends HitReader[Content] { implicit object ContentHitReader extends HitReader[Content] {
override def read(hit: Hit): Try[Content] = { override def read(hit: Hit): Try[Content] = {
val source = hit.sourceAsMap val source = hit.sourceAsMap
println(source)
val serviceId = source.getOrElse("service", Map[String, Long]()).asInstanceOf[Map[String, Long]].get("id") val serviceId = source.getOrElse("service", Map[String, Long]()).asInstanceOf[Map[String, Long]].get("id")
Try( Try(
Content(source("id").toString.toLong, hit.index, serviceId, source("tags_v3").toString) Content(source("id").toString.toLong, hit.index, serviceId, source("tags_v3").toString)
...@@ -69,13 +68,12 @@ object ES { ...@@ -69,13 +68,12 @@ object ES {
def request(req: SearchRequest): Either[Throwable, Future[IndexedSeq[Content]]] = { def request(req: SearchRequest): Either[Throwable, Future[IndexedSeq[Content]]] = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#") val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
println("@@@@@@@@@@@@@") // println("@@@@@@@@@@@@@")
client.execute(req).map { resp => // client.execute(req).map { resp =>
val a = resp.result.to[Content] // val a = resp.result.to[Content]
a.foreach(x => println("################", x.id, x.index, x.serviceId)) // a.foreach(x => println("################", x.id, x.index, x.serviceId))
} // }
println("@@@@@@@@@@@@@") // println("@@@@@@@@@@@@@")
try { try {
Right(client.execute(req).map(resp => resp.result.to[Content])) Right(client.execute(req).map(resp => resp.result.to[Content]))
} catch { } catch {
......
...@@ -60,4 +60,26 @@ object Redis { ...@@ -60,4 +60,26 @@ object Redis {
} }
} }
} }
def saveServiceDiary(
contentEitherFuture: Either[Throwable, Future[IndexedSeq[Content]]],
deviceId: String
): Unit = {
val key = s"streaming:candidate:service_diary:device_id:${deviceId}"
contentEitherFuture match {
case Left(e) => e.printStackTrace()
case Right(contentFuture) =>
pRc4.withClient { client =>
contentFuture.foreach { seq =>
val idPaids = seq.map(c => (c.id, c.serviceId.getOrElse(-1L)))
if (idPaids.size > 0) {
client.del(key)
idPaids.foreach { pair =>
client.rpush(key, pair)
}
}
}
}
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment