package com.gmei.up import java.util.Properties import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream } import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema } import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types } import org.apache.kafka.clients.consumer.ConsumerRecord import com.alibaba.fastjson.JSON import com.gmei.up.utils.{ UserInfo, ES, Redis } class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] { override def isEndOfStream(t: UserInfo): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = { val info = new String(consumerRecord.value(), "UTF-8") // println(info) val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo]) userInfo } override def getProducedType: TypeInformation[UserInfo] = Types.GENERIC(classOf[UserInfo]) } object Main { def main(args: Array[String]): Unit = { // println("###############") // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1) // println("###############") val env = StreamExecutionEnvironment.getExecutionEnvironment // TODO read from config val kafkaConsumerProperties = new Properties kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming") kafkaConsumerProperties.setProperty( "bootstrap.servers", "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092" ) val kafkaConsumer = new FlinkKafkaConsumer[UserInfo]( "gm-portrait-update-device", new UserInfoDeserializationSchema, kafkaConsumerProperties ) val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer) stream.map { user => val projects = user.projects.toList val secondDemands = user.secondDemands.toList val cityId = user.cityId val deviceId = user.deviceId println(deviceId) println(projects.mkString(" ")) val diaryReq = ES.generateDiaryRequest(projects, secondDemands, cityId, 300) val tractateReq = ES.generateTractateRequest(projects, secondDemands, 300) val answerReq = ES.generateAnswerRequest(projects, secondDemands, 100) val serviceDiaryReq = ES.generateServiceDiaryRequest(projects, secondDemands, cityId, 100) Redis.save(ES.request(diaryReq), deviceId, "diary") Redis.save(ES.request(tractateReq), deviceId, "tractate") Redis.save(ES.request(answerReq), deviceId, "answer") Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary") deviceId } // stream.print env.execute("flink streaming user portrait") } }