Commit bcf083fc authored by 赵威's avatar 赵威

add try except

parent 0c2fba9a
...@@ -28,77 +28,94 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] ...@@ -28,77 +28,94 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
} }
object Main { object Main {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit =
// println("###############") // println("###############")
// val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1) // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// println("###############") // println("###############")
try {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment // TODO read from config
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty(
"bootstrap.servers",
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
// TODO read from config val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
val kafkaConsumerProperties = new Properties "gm-portrait-update-device",
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming") new UserInfoDeserializationSchema,
kafkaConsumerProperties.setProperty( kafkaConsumerProperties
"bootstrap.servers", )
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo]( val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
"gm-portrait-update-device",
new UserInfoDeserializationSchema,
kafkaConsumerProperties
)
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer) stream.map { user =>
try {
val deviceId = user.deviceId
val projects = user.projects.toList
val secondDemands = user.secondDemands.toList
val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId
println(deviceId)
// println(projects.mkString(" "))
stream.map { user => val diaryRead = Redis.getRead(deviceId, "diary")
val deviceId = user.deviceId val tractateRead = Redis.getRead(deviceId, "tractate")
val projects = user.projects.toList val answerRead = Redis.getRead(deviceId, "answer")
val secondDemands = user.secondDemands.toList
val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId
println(deviceId)
// println(projects.mkString(" "))
val diaryRead = Redis.getRead(deviceId, "diary") val diaryReq =
val tractateRead = Redis.getRead(deviceId, "tractate") ES.generateDiaryRequest(
val answerRead = Redis.getRead(deviceId, "answer") projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
300
)
val tractateReq =
ES.generateTractateRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
tractateRead,
300
)
val answerReq =
ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
val serviceDiaryReq =
ES.generateServiceDiaryRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
100
)
val diaryReq = Redis.save(ES.request(diaryReq), deviceId, "diary")
ES.generateDiaryRequest( Redis.save(ES.request(tractateReq), deviceId, "tractate")
projects, Redis.save(ES.request(answerReq), deviceId, "answer")
secondDemands, Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
secondPositions,
secondSolutions,
diaryRead,
cityId,
300
)
val tractateReq =
ES.generateTractateRequest(projects, secondDemands, secondPositions, secondSolutions, tractateRead, 300)
val answerReq =
ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
val serviceDiaryReq =
ES.generateServiceDiaryRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
100
)
Redis.save(ES.request(diaryReq), deviceId, "diary") deviceId
Redis.save(ES.request(tractateReq), deviceId, "tractate") } catch {
Redis.save(ES.request(answerReq), deviceId, "answer") case e: Throwable =>
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary") e.printStackTrace()
""
}
}
deviceId // stream.print
} env.execute("flink streaming user portrait")
// stream.print } catch {
env.execute("flink streaming user portrait") case e: Throwable =>
} e.printStackTrace()
}
} }
...@@ -65,7 +65,7 @@ object ES { ...@@ -65,7 +65,7 @@ object ES {
def request(req: SearchRequest): Future[IndexedSeq[Content]] = { def request(req: SearchRequest): Future[IndexedSeq[Content]] = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#") val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
println(req.show) // println(req.show)
client.execute(req).map(resp => resp.result.to[Content]) client.execute(req).map(resp => resp.result.to[Content])
// client.close() // client.close()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment