Commit 18093763 authored by 赵威's avatar 赵威

add printer

parent 3f870ce0
......@@ -2,7 +2,7 @@ package com.gmei.up
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer,
......@@ -27,18 +27,7 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[KafkaUser
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaUserInfo = {
val info = new String(consumerRecord.value(), "UTF-8")
val userInfo: KafkaUserInfo = JSON.parseObject(info, classOf[KafkaUserInfo])
println(userInfo.device_id)
userInfo.projects.foreach(println)
println("##############")
KafkaUserInfo(
"abc",
"abc",
123123,
"abc",
Array("abc", "abc"),
Array("abc", "abc")
)
userInfo
}
override def getProducedType: TypeInformation[KafkaUserInfo] =
......@@ -63,15 +52,15 @@ object Main {
kafkaConsumerProperties
)
val stream = env.addSource(kafkaConsumer)
val stream: DataStream[KafkaUserInfo] = env.addSource(kafkaConsumer)
// val a = env.fromElements(1, 2, -3, 0, 5, -9, 8)
// val b = a.map(_ + 1)
// b.print()
stream.map { user =>
println(user.device_id)
user.second_demands.foreach(println)
println("########################")
}
stream.print
env.execute("flink streaming user portrait")
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment