Commit 58e5a861 authored by 赵威's avatar 赵威

get query

parent 7feaa92b
...@@ -11,14 +11,14 @@ import org.apache.flink.streaming.connectors.kafka.{ ...@@ -11,14 +11,14 @@ import org.apache.flink.streaming.connectors.kafka.{
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types } import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo } import com.gmei.up.utils.{ UserInfo, ES }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] { class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
override def isEndOfStream(t: UserInfo): Boolean = false override def isEndOfStream(t: UserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = { override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
val info = new String(consumerRecord.value(), "UTF-8") val info = new String(consumerRecord.value(), "UTF-8")
println(info) // println(info)
val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo]) val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
userInfo userInfo
} }
...@@ -53,9 +53,9 @@ object Main { ...@@ -53,9 +53,9 @@ object Main {
stream.map { user => stream.map { user =>
println(user.deviceId) println(user.deviceId)
// val query = ES.generateDiaryQuery(-1, user.projects, user.secondDemands) val query = ES.generateDiaryQuery(-1, user.projects.toList, user.secondDemands.toList)
// println(query) println(query)
// ES.test(query) ES.test(query)
println("########################") println("########################")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment