Commit db6097cd authored by 赵威's avatar 赵威

get request

parent 507ceb30
...@@ -36,10 +36,11 @@ lazy val root = (project in file(".")) ...@@ -36,10 +36,11 @@ lazy val root = (project in file("."))
libraryDependencies += jsonNative, libraryDependencies += jsonNative,
libraryDependencies += jsonJackson, libraryDependencies += jsonJackson,
//libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "6.7.8",
libraryDependencies += esCore, libraryDependencies += esCore,
libraryDependencies += esJava, libraryDependencies += esJava,
libraryDependencies += "net.debasishg" %% "redisclient" % "3.30",
libraryDependencies += flinkCore, libraryDependencies += flinkCore,
libraryDependencies += flinkScala, libraryDependencies += flinkScala,
libraryDependencies += flinkStreamingScala, libraryDependencies += flinkStreamingScala,
......
...@@ -11,7 +11,7 @@ import org.apache.flink.streaming.connectors.kafka.{ ...@@ -11,7 +11,7 @@ import org.apache.flink.streaming.connectors.kafka.{
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types } import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo, ES } import com.gmei.up.utils.{ UserInfo, ES, Redis }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] { class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
override def isEndOfStream(t: UserInfo): Boolean = false override def isEndOfStream(t: UserInfo): Boolean = false
...@@ -55,17 +55,22 @@ object Main { ...@@ -55,17 +55,22 @@ object Main {
val projects = user.projects.toList val projects = user.projects.toList
val secondDemands = user.secondDemands.toList val secondDemands = user.secondDemands.toList
val cityId = user.cityId val cityId = user.cityId
println(user.deviceId) val deviceId = user.deviceId
println(deviceId)
println(projects.mkString(" ")) println(projects.mkString(" "))
val diaryReq = ES.generateDiaryRequest(projects, secondDemands, cityId) val diaryReq = ES.generateDiaryRequest(projects, secondDemands, cityId)
val tractateReq = ES.generateTractateRequest(projects, secondDemands) val tractateReq = ES.generateTractateRequest(projects, secondDemands)
val answerReq = ES.generateAnswerRequest(projects, secondDemands) val answerReq = ES.generateAnswerRequest(projects, secondDemands)
val serviceDiaryReq = ES.generateServiceDiaryRequest(projects, secondDemands, cityId) val serviceDiaryReq = ES.generateServiceDiaryRequest(projects, secondDemands, cityId)
ES.test(diaryReq, tractateReq, answerReq, serviceDiaryReq) val a = ES.request(diaryReq)
Redis.save(ES.request(diaryReq), deviceId, "diary")
// ES.test(diaryReq, tractateReq, answerReq, serviceDiaryReq)
println("########################") println("########################")
user.deviceId deviceId
} }
// stream.print // stream.print
......
package com.gmei.up.utils
case class Content(id: String, index: String, projects: String)
...@@ -2,6 +2,8 @@ package com.gmei.up.utils ...@@ -2,6 +2,8 @@ package com.gmei.up.utils
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.util.Try import scala.util.Try
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.global
import org.json4s.JsonDSL._ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
...@@ -14,11 +16,12 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder ...@@ -14,11 +16,12 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClient import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
import com.sksamuel.elastic4s.requests.searches.{ SearchRequest, SearchResponse }
import com.sksamuel.elastic4s.requests.searches.sort.{ FieldSort, SortOrder, ScriptSortType }
import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ HitReader, Hit, ElasticClient } import com.sksamuel.elastic4s.{ HitReader, Hit, ElasticClient, Response }
import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.searches.{ SearchRequest, SearchResponse }
import com.sksamuel.elastic4s.requests.searches.sort.{ FieldSort, SortOrder, ScriptSortType }
import com.sksamuel.elastic4s.requests.searches.queries.Query import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.requests.searches.queries.term.TermsQuery import com.sksamuel.elastic4s.requests.searches.queries.term.TermsQuery
import com.sksamuel.elastic4s.requests.script.{ Script, ScriptType } import com.sksamuel.elastic4s.requests.script.{ Script, ScriptType }
...@@ -51,15 +54,21 @@ object ESClient { ...@@ -51,15 +54,21 @@ object ESClient {
} }
object ES { object ES {
case class Character(id: String, index: String, projects: String) implicit object ContentHitReader extends HitReader[Content] {
override def read(hit: Hit): Try[Content] = {
implicit object CharacterHitReader extends HitReader[Character] {
override def read(hit: Hit): Try[Character] = {
val source = hit.sourceAsMap val source = hit.sourceAsMap
Try(Character(source("id").toString, hit.index, source("tags_v3").toString)) Try(Content(source("id").toString, hit.index, source("tags_v3").toString))
} }
} }
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
def request(req: SearchRequest): Future[IndexedSeq[Content]] = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
client.execute(req).map(resp => resp.result.to[Content])
// client.close()
}
def test( def test(
diaryRequest: SearchRequest, diaryRequest: SearchRequest,
tractateRequest: SearchRequest, tractateRequest: SearchRequest,
...@@ -83,7 +92,7 @@ object ES { ...@@ -83,7 +92,7 @@ object ES {
// println(resp) // println(resp)
val a = resp.result.to[Character] val a = resp.result.to[Content]
println("%%%%%%%%%%%") println("%%%%%%%%%%%")
println(a.size) println(a.size)
......
package com.gmei.up.utils
import scala.concurrent.Future
import com.redis.RedisClient
object Redis {
// TODO read from config file
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
val pRc4 = new RedisClient("172.16.50.145", 6379, 0, Some("XfkMCCdWDIU%ls$h"), 5000)
def save(content: Future[IndexedSeq[Content]], deviceId: String, contentType: String): Unit = {
val key = s"streaming:candidate:${contentType}:device_id:${deviceId}"
println(key)
val a = content.map { seq =>
seq.map { c =>
println(c.id)
c.id
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment