Commit 3f870ce0 authored by 赵威's avatar 赵威

add printer

parent 9e8f97b0
...@@ -24,7 +24,7 @@ lazy val root = (project in file(".")) ...@@ -24,7 +24,7 @@ lazy val root = (project in file("."))
.settings( .settings(
name := "streamingUserPortrait", name := "streamingUserPortrait",
// libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75", libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75",
libraryDependencies += es, libraryDependencies += es,
libraryDependencies += esCore, libraryDependencies += esCore,
......
...@@ -10,13 +10,14 @@ import org.apache.flink.streaming.connectors.kafka.{ ...@@ -10,13 +10,14 @@ import org.apache.flink.streaming.connectors.kafka.{
} }
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types } import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON
case class KafkaUserInfo( case class KafkaUserInfo(
deviceId: String, device_id: String,
action: String, action: String,
logTime: Double, log_time: Double,
eventCn: String, event_cn: String,
secondDemands: Array[String], second_demands: Array[String],
projects: Array[String] projects: Array[String]
) )
...@@ -24,10 +25,11 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[KafkaUser ...@@ -24,10 +25,11 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[KafkaUser
override def isEndOfStream(t: KafkaUserInfo): Boolean = false override def isEndOfStream(t: KafkaUserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaUserInfo = { override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaUserInfo = {
// KafkaUserInfo(consumerRecord.topic(), new String(consumerRecord.value(), "UTF-8")) val info = new String(consumerRecord.value(), "UTF-8")
println(consumerRecord) val userInfo: KafkaUserInfo = JSON.parseObject(info, classOf[KafkaUserInfo])
val value = new String(consumerRecord.value(), "UTF-8") println(userInfo.device_id)
println(value) userInfo.projects.foreach(println)
println("##############") println("##############")
KafkaUserInfo( KafkaUserInfo(
"abc", "abc",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment