Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
S
streamingUserPortrait
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
赵威
streamingUserPortrait
Commits
d9e557d1
Commit
d9e557d1
authored
4 years ago
by
赵威
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update package
parent
3d56c888
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
79 additions
and
78 deletions
+79
-78
Main.scala
src/main/scala/com.gmei.up/Main.scala
+79
-78
No files found.
src/main/scala/com.gmei.up/Main.scala
View file @
d9e557d1
...
...
@@ -35,87 +35,88 @@ object Main extends LazyLogging {
// println("###############")
try
{
val
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
logger
.
info
(
"UserPortrait Flink Started"
)
// // TODO read from config
// val kafkaConsumerProperties = new Properties
// kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
// kafkaConsumerProperties.setProperty(
// "bootstrap.servers",
// "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
// )
// val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
// "gm-portrait-update-device",
// new UserInfoDeserializationSchema,
// kafkaConsumerProperties
// )
// val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
// stream.map { user =>
// try {
// val deviceId = user.deviceId
// val projects = user.projects.toList
// val secondDemands = user.secondDemands.toList
// val secondPositions = user.secondPositions.toList
// val secondSolutions = user.secondSolutions.toList
// val cityId = user.cityId
// println(deviceId)
// // println(projects.mkString(" "))
// val diaryRead = Redis.getRead(deviceId, "diary")
// val tractateRead = Redis.getRead(deviceId, "tractate")
// val answerRead = Redis.getRead(deviceId, "answer")
// val diaryReq =
// ES.generateDiaryRequest(
// projects,
// secondDemands,
// secondPositions,
// secondSolutions,
// diaryRead,
// cityId,
// 300
// )
// val tractateReq =
// ES.generateTractateRequest(
// projects,
// secondDemands,
// secondPositions,
// secondSolutions,
// tractateRead,
// 300
// )
// val answerReq =
// ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
// val serviceDiaryReq =
// ES.generateServiceDiaryRequest(
// projects,
// secondDemands,
// secondPositions,
// secondSolutions,
// diaryRead,
// cityId,
// 100
// )
// Redis.save(ES.request(diaryReq), deviceId, "diary")
// Redis.save(ES.request(tractateReq), deviceId, "tractate")
// Redis.save(ES.request(answerReq), deviceId, "answer")
// Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
// deviceId
// } catch {
// case e: Throwable =>
// e.printStackTrace()
// ""
// }
// }
// // stream.print
// env.execute("flink streaming user portrait")
val
kafkaConsumerProperties
=
new
Properties
kafkaConsumerProperties
.
setProperty
(
"group.id"
,
"user_portrait_flink_streaming"
)
kafkaConsumerProperties
.
setProperty
(
"bootstrap.servers"
,
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
val
kafkaConsumer
=
new
FlinkKafkaConsumer
[
UserInfo
](
"gm-portrait-update-device"
,
new
UserInfoDeserializationSchema
,
kafkaConsumerProperties
)
val
stream
:
DataStream
[
UserInfo
]
=
env
.
addSource
(
kafkaConsumer
)
stream
.
map
{
user
=>
try
{
val
deviceId
=
user
.
deviceId
val
projects
=
user
.
projects
.
toList
val
secondDemands
=
user
.
secondDemands
.
toList
val
secondPositions
=
user
.
secondPositions
.
toList
val
secondSolutions
=
user
.
secondSolutions
.
toList
val
cityId
=
user
.
cityId
println
(
deviceId
)
// println(projects.mkString(" "))
val
diaryRead
=
Redis
.
getRead
(
deviceId
,
"diary"
)
val
tractateRead
=
Redis
.
getRead
(
deviceId
,
"tractate"
)
val
answerRead
=
Redis
.
getRead
(
deviceId
,
"answer"
)
val
diaryReq
=
ES
.
generateDiaryRequest
(
projects
,
secondDemands
,
secondPositions
,
secondSolutions
,
diaryRead
,
cityId
,
300
)
val
tractateReq
=
ES
.
generateTractateRequest
(
projects
,
secondDemands
,
secondPositions
,
secondSolutions
,
tractateRead
,
300
)
val
answerReq
=
ES
.
generateAnswerRequest
(
projects
,
secondDemands
,
secondPositions
,
secondSolutions
,
answerRead
,
100
)
val
serviceDiaryReq
=
ES
.
generateServiceDiaryRequest
(
projects
,
secondDemands
,
secondPositions
,
secondSolutions
,
diaryRead
,
cityId
,
100
)
Redis
.
save
(
ES
.
request
(
diaryReq
),
deviceId
,
"diary"
)
Redis
.
save
(
ES
.
request
(
tractateReq
),
deviceId
,
"tractate"
)
Redis
.
save
(
ES
.
request
(
answerReq
),
deviceId
,
"answer"
)
Redis
.
save
(
ES
.
request
(
serviceDiaryReq
),
deviceId
,
"service_diary"
)
logger
.
info
(
deviceId
)
deviceId
}
catch
{
case
e
:
Throwable
=>
e
.
printStackTrace
()
""
}
}
// stream.print
env
.
execute
(
"flink streaming user portrait"
)
}
catch
{
case
e
:
Throwable
=>
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment