Commit da90b6b6 authored by 赵威's avatar 赵威

decouple kafka

parent dc7a678b
......@@ -3,21 +3,59 @@ package com.gmei.up
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaConsumer, FlinkKafkaProducer }
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer,
KafkaDeserializationSchema
}
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
case class KafkaUserInfo(
deviceId: String,
action: String,
logTime: Double,
eventCn: String,
secondDemands: Array[String],
projects: Array[String]
)
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[KafkaUserInfo] {
override def isEndOfStream(t: KafkaUserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaUserInfo = {
// KafkaUserInfo(consumerRecord.topic(), new String(consumerRecord.value(), "UTF-8"))
println(consumerRecord)
KafkaUserInfo(
"abc",
"abc",
123123,
"abc",
Array("abc", "abc"),
Array("abc", "abc")
)
}
override def getProducedType: TypeInformation[KafkaUserInfo] =
Types.GENERIC(classOf[KafkaUserInfo])
}
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
properties.setProperty("group.id", "user_portrait_flink_streaming")
properties.setProperty("bootstrap.servers", "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092")
// TODO read from config
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty(
"bootstrap.servers",
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
val kafkaConsumer = new FlinkKafkaConsumer[String](
val kafkaConsumer = new FlinkKafkaConsumer[KafkaUserInfo](
"gm-portrait-update-device",
new SimpleStringSchema,
properties
new UserInfoDeserializationSchema,
kafkaConsumerProperties
)
val stream = env.addSource(kafkaConsumer)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment