Commit ee417a0d authored by 赵威's avatar 赵威

request type to Either

parent efc8e8f8
......@@ -63,47 +63,60 @@ object ES {
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
def request(req: SearchRequest): Future[IndexedSeq[Content]] = {
def request(req: SearchRequest): Either[Throwable, Future[IndexedSeq[Content]]] = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
// println(req.show)
client.execute(req).map(resp => resp.result.to[Content])
try {
Right(client.execute(req).map(resp => resp.result.to[Content]))
} catch {
case e: Throwable =>
DingTalk.send(
Map(
"method" -> "ES.request",
"error" -> e.getStackTrace.mkString("\n"),
"request" -> req.show.toString
),
contentType = "exception"
)
Left(e)
}
// client.close()
}
def test(
diaryRequest: SearchRequest,
tractateRequest: SearchRequest,
answerRequest: SearchRequest,
serviceDiaryReq: SearchRequest
) = {
// TODO read from config
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
// def test(
// diaryRequest: SearchRequest,
// tractateRequest: SearchRequest,
// answerRequest: SearchRequest,
// serviceDiaryReq: SearchRequest
// ) = {
// // TODO read from config
// val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
val multiReq = multi(
diaryRequest,
tractateRequest,
answerRequest,
serviceDiaryReq
)
// val multiReq = multi(
// diaryRequest,
// tractateRequest,
// answerRequest,
// serviceDiaryReq
// )
println(multiReq.show)
// println(multiReq.show)
// TODO remove await
val resp = client.execute(multiReq).await
// // TODO remove await
// val resp = client.execute(multiReq).await
// println(resp)
// // println(resp)
val a = resp.result.to[Content]
// val a = resp.result.to[Content]
println("%%%%%%%%%%%")
println(a.size)
a.foreach { x =>
println(x.id, x.index, x.projects)
}
println("%%%%%%%%%%%")
// println("%%%%%%%%%%%")
// println(a.size)
// a.foreach { x =>
// println(x.id, x.index, x.projects)
// }
// println("%%%%%%%%%%%")
resp
}
// resp
// }
def generateDiaryRequest(
projects: List[String],
......
......@@ -24,35 +24,39 @@ object Redis {
}
def save(
contentF: Future[IndexedSeq[Content]],
contentEitherFuture: Either[Throwable, Future[IndexedSeq[Content]]],
deviceId: String,
contentType: String,
timestampBegin: Long
): Unit = {
val key = s"streaming:candidate:${contentType}:device_id:${deviceId}"
pRc4.withClient { client =>
contentF.foreach { seq =>
val ids = seq.map(c => c.id)
if (ids.size > 0) {
client.del(key)
ids.foreach { id =>
// println(id)
client.rpush(key, id)
}
client.expire(key, 60 * 60 * 24 * 15)
if (deviceId == "64695DE0-B926-4188-9C62-D987DC20BEDF") {
DingTalk.send(
Map(
"method" -> "Redis.save",
"deviceId" -> deviceId,
"contentType" -> contentType,
"streamTotalSeconds" -> s"${(Instant.now.getEpochSecond - timestampBegin)}s"
)
)
contentEitherFuture match {
case Left(e) => e.printStackTrace()
case Right(contentFuture) =>
pRc4.withClient { client =>
contentFuture.foreach { seq =>
val ids = seq.map(c => c.id)
if (ids.size > 0) {
client.del(key)
ids.foreach { id =>
// println(id)
client.rpush(key, id)
}
client.expire(key, 60 * 60 * 24 * 15)
if (deviceId == "64695DE0-B926-4188-9C62-D987DC20BEDF") {
DingTalk.send(
Map(
"method" -> "Redis.save",
"deviceId" -> deviceId,
"contentType" -> contentType,
"streamTotalSeconds" -> s"${(Instant.now.getEpochSecond * 1000 - timestampBegin * 1000)}ms"
)
)
}
}
}
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment