Commit b12e6024 authored by 赵威's avatar 赵威

add printer

parent 18093763
...@@ -11,27 +11,19 @@ import org.apache.flink.streaming.connectors.kafka.{ ...@@ -11,27 +11,19 @@ import org.apache.flink.streaming.connectors.kafka.{
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types } import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import com.gmei.up.utils.UserInfo
case class KafkaUserInfo( class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
device_id: String, override def isEndOfStream(t: UserInfo): Boolean = false
action: String,
log_time: Double,
event_cn: String,
second_demands: Array[String],
projects: Array[String]
)
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[KafkaUserInfo] { override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
override def isEndOfStream(t: KafkaUserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaUserInfo = {
val info = new String(consumerRecord.value(), "UTF-8") val info = new String(consumerRecord.value(), "UTF-8")
val userInfo: KafkaUserInfo = JSON.parseObject(info, classOf[KafkaUserInfo]) val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
userInfo userInfo
} }
override def getProducedType: TypeInformation[KafkaUserInfo] = override def getProducedType: TypeInformation[UserInfo] =
Types.GENERIC(classOf[KafkaUserInfo]) Types.GENERIC(classOf[UserInfo])
} }
object Main { object Main {
...@@ -46,21 +38,21 @@ object Main { ...@@ -46,21 +38,21 @@ object Main {
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092" "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
) )
val kafkaConsumer = new FlinkKafkaConsumer[KafkaUserInfo]( val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device", "gm-portrait-update-device",
new UserInfoDeserializationSchema, new UserInfoDeserializationSchema,
kafkaConsumerProperties kafkaConsumerProperties
) )
val stream: DataStream[KafkaUserInfo] = env.addSource(kafkaConsumer) val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
stream.map { user => stream.map { user =>
println(user.device_id) println(user.deviceId)
user.second_demands.foreach(println) user.second_demands.foreach(println)
println("########################") println("########################")
} }
stream.print // stream.print
env.execute("flink streaming user portrait") env.execute("flink streaming user portrait")
} }
} }
package com.gmei.up.utils package com.gmei.up.utils
case class UserInfo(
deviceId: String,
action: String,
log_time: Double,
event_cn: String,
second_demands: Array[String],
projects: Array[String]
)
object UserInfo
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment