Main.scala 1.92 KB
package com.gmei.up

import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
import org.apache.flink.streaming.connectors.kafka.{
  FlinkKafkaConsumer,
  FlinkKafkaProducer,
  KafkaDeserializationSchema
}
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo, ES }

class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
  override def isEndOfStream(t: UserInfo): Boolean = false

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
    val info = new String(consumerRecord.value(), "UTF-8")
    val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
    userInfo
  }

  override def getProducedType: TypeInformation[UserInfo] =
    Types.GENERIC(classOf[UserInfo])
}

object Main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // TODO read from config
    val kafkaConsumerProperties = new Properties
    kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
    kafkaConsumerProperties.setProperty(
      "bootstrap.servers",
      "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
    )

    val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
      "gm-portrait-update-device",
      new UserInfoDeserializationSchema,
      kafkaConsumerProperties
    )

    println("###############")
    ES.test()
    println("###############")

    val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)

    stream.map { user =>
      println(user.deviceId)
      user.secondDemands.foreach(println)
      println("########################")
    }

    // stream.print
    env.execute("flink streaming user portrait")
  }
}