Commit dc7a678b authored by 赵威's avatar 赵威

kafka connection

parent a3eb0138
package com.gmei.up
import org.apache.flink.streaming.api.scala._
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaConsumer, FlinkKafkaProducer }
object Main {
def main(args: Array[String]): Unit = {
println("hello")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val a = env.fromElements(1, 2, -3, 0, 5, -9, 8)
val b = a.map(_ + 1)
val properties = new Properties
properties.setProperty("group.id", "user_portrait_flink_streaming")
properties.setProperty("bootstrap.servers", "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092")
b.print()
val kafkaConsumer = new FlinkKafkaConsumer[String](
"gm-portrait-update-device",
new SimpleStringSchema,
properties
)
env.execute
val stream = env.addSource(kafkaConsumer)
// val a = env.fromElements(1, 2, -3, 0, 5, -9, 8)
// val b = a.map(_ + 1)
// b.print()
stream.print
env.execute("flink streaming user portrait")
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment