Commit dd6c4b88 authored by 赵威's avatar 赵威

try logger

parent 98e3bbc2
...@@ -3,10 +3,6 @@ ThisBuild / scalaVersion := "2.11.12" ...@@ -3,10 +3,6 @@ ThisBuild / scalaVersion := "2.11.12"
ThisBuild / version := "0.1.0" ThisBuild / version := "0.1.0"
ThisBuild / organization := "com.gmei.up" ThisBuild / organization := "com.gmei.up"
// val esVersion = "7.5.0"
// val es = "org.elasticsearch" % "elasticsearch" % esVersion
// val esCore= "org.elasticsearch" % "elasticsearch-core" % esVersion
val jsonVersion = "3.6.10" val jsonVersion = "3.6.10"
val jsonNative = "org.json4s" %% "json4s-native" % jsonVersion val jsonNative = "org.json4s" %% "json4s-native" % jsonVersion
val jsonJackson = "org.json4s" %% "json4s-jackson" % jsonVersion val jsonJackson = "org.json4s" %% "json4s-jackson" % jsonVersion
...@@ -32,6 +28,8 @@ lazy val root = (project in file(".")) ...@@ -32,6 +28,8 @@ lazy val root = (project in file("."))
.settings( .settings(
name := "streamingUserPortrait", name := "streamingUserPortrait",
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75", libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75",
libraryDependencies += jsonNative, libraryDependencies += jsonNative,
libraryDependencies += jsonJackson, libraryDependencies += jsonJackson,
......
package com.gmei.up package com.gmei.up
import java.util.Properties import java.util.Properties
import com.typesafe.scalalogging.Logger
import org.apache.flink.api.scala._ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream } import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
import org.apache.flink.streaming.connectors.kafka.{ import org.apache.flink.streaming.connectors.kafka.{
...@@ -33,86 +34,88 @@ object Main { ...@@ -33,86 +34,88 @@ object Main {
// val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1) // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// println("###############") // println("###############")
try { try {
val logger = Logger("Root")
val env = StreamExecutionEnvironment.getExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment
logger.info("hello")
// TODO read from config // // TODO read from config
val kafkaConsumerProperties = new Properties // val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming") // kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty( // kafkaConsumerProperties.setProperty(
"bootstrap.servers", // "bootstrap.servers",
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092" // "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
) // )
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo]( // val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device", // "gm-portrait-update-device",
new UserInfoDeserializationSchema, // new UserInfoDeserializationSchema,
kafkaConsumerProperties // kafkaConsumerProperties
) // )
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer) // val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
stream.map { user => // stream.map { user =>
try { // try {
val deviceId = user.deviceId // val deviceId = user.deviceId
val projects = user.projects.toList // val projects = user.projects.toList
val secondDemands = user.secondDemands.toList // val secondDemands = user.secondDemands.toList
val secondPositions = user.secondPositions.toList // val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList // val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId // val cityId = user.cityId
println(deviceId) // println(deviceId)
// println(projects.mkString(" ")) // // println(projects.mkString(" "))
val diaryRead = Redis.getRead(deviceId, "diary") // val diaryRead = Redis.getRead(deviceId, "diary")
val tractateRead = Redis.getRead(deviceId, "tractate") // val tractateRead = Redis.getRead(deviceId, "tractate")
val answerRead = Redis.getRead(deviceId, "answer") // val answerRead = Redis.getRead(deviceId, "answer")
val diaryReq = // val diaryReq =
ES.generateDiaryRequest( // ES.generateDiaryRequest(
projects, // projects,
secondDemands, // secondDemands,
secondPositions, // secondPositions,
secondSolutions, // secondSolutions,
diaryRead, // diaryRead,
cityId, // cityId,
300 // 300
) // )
val tractateReq = // val tractateReq =
ES.generateTractateRequest( // ES.generateTractateRequest(
projects, // projects,
secondDemands, // secondDemands,
secondPositions, // secondPositions,
secondSolutions, // secondSolutions,
tractateRead, // tractateRead,
300 // 300
) // )
val answerReq = // val answerReq =
ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100) // ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
val serviceDiaryReq = // val serviceDiaryReq =
ES.generateServiceDiaryRequest( // ES.generateServiceDiaryRequest(
projects, // projects,
secondDemands, // secondDemands,
secondPositions, // secondPositions,
secondSolutions, // secondSolutions,
diaryRead, // diaryRead,
cityId, // cityId,
100 // 100
) // )
Redis.save(ES.request(diaryReq), deviceId, "diary") // Redis.save(ES.request(diaryReq), deviceId, "diary")
Redis.save(ES.request(tractateReq), deviceId, "tractate") // Redis.save(ES.request(tractateReq), deviceId, "tractate")
Redis.save(ES.request(answerReq), deviceId, "answer") // Redis.save(ES.request(answerReq), deviceId, "answer")
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary") // Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
deviceId // deviceId
} catch { // } catch {
case e: Throwable => // case e: Throwable =>
e.printStackTrace() // e.printStackTrace()
"" // ""
} // }
} // }
// stream.print // // stream.print
env.execute("flink streaming user portrait") // env.execute("flink streaming user portrait")
} catch { } catch {
case e: Throwable => case e: Throwable =>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment