Commit 7523b7cf authored by 赵威's avatar 赵威

get es query

parent 704855bf
......@@ -32,6 +32,7 @@ lazy val root = (project in file("."))
.settings(
name := "streamingUserPortrait",
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75",
libraryDependencies += jsonNative,
libraryDependencies += jsonJackson,
......
......@@ -10,22 +10,15 @@ import org.apache.flink.streaming.connectors.kafka.{
}
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
// TODO
// import com.alibaba.fastjson.JSON
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo, ES }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
implicit val formats = DefaultFormats
override def isEndOfStream(t: UserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
val info = new String(consumerRecord.value(), "UTF-8")
// TODO
// val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
val userInfo: UserInfo = parse(info).extract[UserInfo]
val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
userInfo
}
......@@ -35,10 +28,6 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
object Main {
def main(args: Array[String]): Unit = {
// println("###############")
// ES.generateDiaryQuery(-1, List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"))
// println("###############")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// TODO read from config
......@@ -57,9 +46,14 @@ object Main {
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
// println("###############")
// ES.generateDiaryQuery(-1, List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"))
// println("###############")
stream.map { user =>
println(user.deviceId)
user.secondDemands.foreach(println)
val query = ES.generateDiaryQuery(-1, user.projects, user.secondDemands)
ES.test(query)
println("########################")
}
......
......@@ -40,7 +40,7 @@ object ESClient {
}
object ES {
def test() = {
def test(query: String) = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
val json = """
{
......@@ -54,10 +54,10 @@ object ES {
}
}
}
"""
""";
val resp = client.execute {
search("gm-dbmw-diary-read").source(json)
search("gm-dbmw-diary-read").source(query)
}.await
println(resp)
......
......@@ -5,6 +5,6 @@ case class UserInfo(
action: String,
logTime: Double,
eventCn: String,
secondDemands: Array[String],
projects: Array[String]
secondDemands: List[String],
projects: List[String]
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment