1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.gmei.up
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer,
KafkaDeserializationSchema
}
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo, ES }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
override def isEndOfStream(t: UserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
val info = new String(consumerRecord.value(), "UTF-8")
val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
userInfo
}
override def getProducedType: TypeInformation[UserInfo] =
Types.GENERIC(classOf[UserInfo])
}
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// TODO read from config
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty(
"bootstrap.servers",
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device",
new UserInfoDeserializationSchema,
kafkaConsumerProperties
)
println("###############")
ES.test()
println("###############")
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
stream.map { user =>
println(user.deviceId)
user.secondDemands.foreach(println)
println("########################")
}
// stream.print
env.execute("flink streaming user portrait")
}
}