Commit efc8e8f8 authored by 赵威's avatar 赵威

add timestamp

parent abe587cc
...@@ -23,7 +23,6 @@ val flinkML = "org.apache.flink" % "flink-ml-api" % flinkVersion ...@@ -23,7 +23,6 @@ val flinkML = "org.apache.flink" % "flink-ml-api" % flinkVersion
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
val flinkHadoop = "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion % Test val flinkHadoop = "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion % Test
lazy val root = (project in file(".")) lazy val root = (project in file("."))
.settings( .settings(
name := "streamingUserPortrait", name := "streamingUserPortrait",
...@@ -51,7 +50,8 @@ lazy val root = (project in file(".")) ...@@ -51,7 +50,8 @@ lazy val root = (project in file("."))
// libraryDependencies += flinkJSON, // libraryDependencies += flinkJSON,
libraryDependencies += flinkML, libraryDependencies += flinkML,
libraryDependencies += flinkKafka, libraryDependencies += flinkKafka,
libraryDependencies += flinkHadoop libraryDependencies += flinkHadoop,
libraryDependencies += "com.alibaba.alink" %% "alink_core_flink-1.11" % "1.3.1" % "provided"
) )
scalacOptions in ThisBuild ++= Seq( scalacOptions in ThisBuild ++= Seq(
......
package com.gmei.up package com.gmei.up
import java.util.Properties import java.util.Properties
import java.time.Instant
import com.typesafe.scalalogging.LazyLogging import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.scala._ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream } import org.apache.flink.streaming.api.scala.{ StreamExecutionEnvironment, DataStream }
...@@ -37,7 +38,7 @@ object Main extends LazyLogging { ...@@ -37,7 +38,7 @@ object Main extends LazyLogging {
logger.info("UserPortrait Flink Started") logger.info("UserPortrait Flink Started")
val env = StreamExecutionEnvironment.getExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment
// // TODO read from config // TODO read from config
val kafkaConsumerProperties = new Properties val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming") kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty( kafkaConsumerProperties.setProperty(
...@@ -45,8 +46,9 @@ object Main extends LazyLogging { ...@@ -45,8 +46,9 @@ object Main extends LazyLogging {
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092" "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
) )
// TODO remove test
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo]( val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device", "gm-portrait-update-device-test",
new UserInfoDeserializationSchema, new UserInfoDeserializationSchema,
kafkaConsumerProperties kafkaConsumerProperties
) )
...@@ -61,6 +63,7 @@ object Main extends LazyLogging { ...@@ -61,6 +63,7 @@ object Main extends LazyLogging {
val secondPositions = user.secondPositions.toList val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId val cityId = user.cityId
val streamBeginTimestamp: Long = Instant.now.getEpochSecond
println(deviceId) println(deviceId)
// println(projects.mkString(" ")) // println(projects.mkString(" "))
...@@ -100,11 +103,12 @@ object Main extends LazyLogging { ...@@ -100,11 +103,12 @@ object Main extends LazyLogging {
100 100
) )
Redis.save(ES.request(diaryReq), deviceId, "diary") Redis.save(ES.request(diaryReq), deviceId, "diary", streamBeginTimestamp)
Redis.save(ES.request(tractateReq), deviceId, "tractate") Redis.save(ES.request(tractateReq), deviceId, "tractate", streamBeginTimestamp)
Redis.save(ES.request(answerReq), deviceId, "answer") Redis.save(ES.request(answerReq), deviceId, "answer", streamBeginTimestamp)
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary") Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary", streamBeginTimestamp)
// current time
logger.info(s"${user.eventCn} ${deviceId}") logger.info(s"${user.eventCn} ${deviceId}")
if (deviceId == "64695DE0-B926-4188-9C62-D987DC20BEDF") { if (deviceId == "64695DE0-B926-4188-9C62-D987DC20BEDF") {
...@@ -115,7 +119,10 @@ object Main extends LazyLogging { ...@@ -115,7 +119,10 @@ object Main extends LazyLogging {
"action" -> user.action, "action" -> user.action,
"cityId" -> user.cityId.toString, "cityId" -> user.cityId.toString,
"logTime" -> user.logTime.toString, "logTime" -> user.logTime.toString,
"projects" -> user.projects.mkString(" ") "projects" -> user.projects.mkString(" "),
"diaryRead" -> diaryRead.size.toString,
"tractateRead" -> tractateRead.size.toString,
"answerRead" -> answerRead.size.toString
) )
) )
} }
...@@ -127,12 +134,13 @@ object Main extends LazyLogging { ...@@ -127,12 +134,13 @@ object Main extends LazyLogging {
Map("Main.main" -> "stream.map", "error" -> e.getStackTrace.mkString("\n")), Map("Main.main" -> "stream.map", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception" contentType = "exception"
) )
logger.error(e.getStackTrace.mkString("\n"))
e.printStackTrace() e.printStackTrace()
"" ""
} }
} }
stream.print // stream.print
env.execute("flink streaming user portrait") env.execute("flink streaming user portrait")
} catch { } catch {
...@@ -141,6 +149,7 @@ object Main extends LazyLogging { ...@@ -141,6 +149,7 @@ object Main extends LazyLogging {
Map("method" -> "main", "error" -> e.getStackTrace.mkString("\n")), Map("method" -> "main", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception" contentType = "exception"
) )
logger.error(e.getStackTrace.mkString("\n"))
e.printStackTrace() e.printStackTrace()
} }
} }
...@@ -2,6 +2,7 @@ package com.gmei.up.utils ...@@ -2,6 +2,7 @@ package com.gmei.up.utils
import scala.concurrent.Future import scala.concurrent.Future
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import java.time.Instant
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import com.redis.{ RedisClient, RedisClientPool } import com.redis.{ RedisClient, RedisClientPool }
...@@ -22,7 +23,12 @@ object Redis { ...@@ -22,7 +23,12 @@ object Redis {
redisRes.map(s => JSON.parseArray(s, classOf[Long]).asScala.toList).getOrElse(List.empty[Long]) redisRes.map(s => JSON.parseArray(s, classOf[Long]).asScala.toList).getOrElse(List.empty[Long])
} }
def save(contentF: Future[IndexedSeq[Content]], deviceId: String, contentType: String): Unit = { def save(
contentF: Future[IndexedSeq[Content]],
deviceId: String,
contentType: String,
timestampBegin: Long
): Unit = {
val key = s"streaming:candidate:${contentType}:device_id:${deviceId}" val key = s"streaming:candidate:${contentType}:device_id:${deviceId}"
pRc4.withClient { client => pRc4.withClient { client =>
...@@ -35,6 +41,16 @@ object Redis { ...@@ -35,6 +41,16 @@ object Redis {
client.rpush(key, id) client.rpush(key, id)
} }
client.expire(key, 60 * 60 * 24 * 15) client.expire(key, 60 * 60 * 24 * 15)
if (deviceId == "64695DE0-B926-4188-9C62-D987DC20BEDF") {
DingTalk.send(
Map(
"method" -> "Redis.save",
"deviceId" -> deviceId,
"contentType" -> contentType,
"streamTotalSeconds" -> s"${(Instant.now.getEpochSecond - timestampBegin)}s"
)
)
}
} }
} }
} }
......
...@@ -10,5 +10,6 @@ case class UserInfo( ...@@ -10,5 +10,6 @@ case class UserInfo(
secondPositions: Array[String], secondPositions: Array[String],
secondSolutions: Array[String], secondSolutions: Array[String],
projects: Array[String], projects: Array[String],
businessTags: Array[String] businessTags: Array[String],
sendTimestamp: Long
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment