package com.gmei.up

import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
import org.apache.flink.streaming.connectors.kafka.{
  FlinkKafkaConsumer,
  FlinkKafkaProducer,
  KafkaDeserializationSchema
}
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo, ES, Redis }

class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
  override def isEndOfStream(t: UserInfo): Boolean = false

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
    val info = new String(consumerRecord.value(), "UTF-8")
    // println(info)
    val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
    userInfo
  }

  override def getProducedType: TypeInformation[UserInfo] =
    Types.GENERIC(classOf[UserInfo])
}

object Main {
  def main(args: Array[String]): Unit = {
    // println("###############")
    // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
    // println("###############")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // TODO read from config
    val kafkaConsumerProperties = new Properties
    kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
    kafkaConsumerProperties.setProperty(
      "bootstrap.servers",
      "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
    )

    val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
      "gm-portrait-update-device",
      new UserInfoDeserializationSchema,
      kafkaConsumerProperties
    )

    val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)

    stream.map { user =>
      val projects = user.projects.toList
      val secondDemands = user.secondDemands.toList
      val cityId = user.cityId
      val deviceId = user.deviceId
      println(deviceId)
      println(projects.mkString(" "))

      val diaryReq = ES.generateDiaryRequest(projects, secondDemands, cityId, 300)
      val tractateReq = ES.generateTractateRequest(projects, secondDemands, 300)
      val answerReq = ES.generateAnswerRequest(projects, secondDemands, 100)
      val serviceDiaryReq = ES.generateServiceDiaryRequest(projects, secondDemands, cityId, 100)

      Redis.save(ES.request(diaryReq), deviceId, "diary")
      Redis.save(ES.request(tractateReq), deviceId, "tractate")
      Redis.save(ES.request(answerReq), deviceId, "answer")
      Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")

      deviceId
    }

    // stream.print
    env.execute("flink streaming user portrait")
  }
}