Commit eb5ecc8a authored by 赵威's avatar 赵威

try es

parent 6e738ae3
// ThisBuild / scalaVersion := "2.12.13"
ThisBuild / scalaVersion := "2.11.12"
ThisBuild / scalaVersion := "2.12.13"
// ThisBuild / scalaVersion := "2.11.12"
ThisBuild / version := "0.1.0"
ThisBuild / organization := "com.gmei.up"
val esVersion = "7.10.2"
val es = "org.elasticsearch" % "elasticsearch" % esVersion
val esCore= "org.elasticsearch" % "elasticsearch-core" % esVersion
// val esVersion = "7.5.0"
// val es = "org.elasticsearch" % "elasticsearch" % esVersion
// val esCore= "org.elasticsearch" % "elasticsearch-core" % esVersion
val elastic4sVersion = "7.5.0"
val esCore = "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion
val esJava = "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion
val flinkVersion = "1.12.1"
val flinkCore= "org.apache.flink" % "flink-core" % flinkVersion
......@@ -25,8 +29,12 @@ lazy val root = (project in file("."))
name := "streamingUserPortrait",
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75",
libraryDependencies += es,
libraryDependencies += esCore,
// libraryDependencies += esCore,
// libraryDependencies += esJava,
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "6.7.8",
// libraryDependencies += es,
// libraryDependencies += esCore,
libraryDependencies += flinkCore,
libraryDependencies += flinkScala,
......
......@@ -11,7 +11,7 @@ import org.apache.flink.streaming.connectors.kafka.{
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON
import com.gmei.up.utils.UserInfo
import com.gmei.up.utils.{ UserInfo, ES }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
override def isEndOfStream(t: UserInfo): Boolean = false
......@@ -44,6 +44,10 @@ object Main {
kafkaConsumerProperties
)
println("###############")
ES.test()
println("###############")
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
stream.map { user =>
......
package com.gmei.up.utils
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import com.sksamuel.elastic4s.http.{ ElasticClient, ElasticProperties, Response }
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.http.ElasticDsl._
object ES {
// TODO read config from file
val host = "172.16.52.33"
val port = 9200
val username = "elastic"
val password = "gengmei!@#"
val diaryIndex = "gm-dbmw-diary-read"
val nodes = ElasticProperties(
"http://172.16.52.33:9200,http://172.16.52.19:9200,http://172.16.52.27:9200,http://172.16.52.34:9200,http://172.16.52.48:9200"
)
val client = ElasticClient(nodes)
def test() {
val json =
"""
|{
| "query" : {
| "match" : {"tags_v3" : ["光子嫩肤", "水光针"]}
| }
|}
|""".stripMargin
val resp = client.execute {
search("gm-dbmw-diary-read").source(json)
}.await
println(resp)
resp
}
}
......@@ -8,5 +8,3 @@ case class UserInfo(
secondDemands: Array[String],
projects: Array[String]
)
object UserInfo
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment