Commit 2365c498 authored by 赵威's avatar 赵威

add read

parent 2b454c03
......@@ -33,52 +33,70 @@ object Main {
// val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// println("###############")
val res = Redis.getRead("64695DE0-B926-4188-9C62-D987DC20BEDF", "diary")
res.foreach(println)
val env = StreamExecutionEnvironment.getExecutionEnvironment
// val env = StreamExecutionEnvironment.getExecutionEnvironment
// TODO read from config
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty(
"bootstrap.servers",
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
// // TODO read from config
// val kafkaConsumerProperties = new Properties
// kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
// kafkaConsumerProperties.setProperty(
// "bootstrap.servers",
// "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
// )
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
"gm-portrait-update-device",
new UserInfoDeserializationSchema,
kafkaConsumerProperties
)
// val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
// "gm-portrait-update-device",
// new UserInfoDeserializationSchema,
// kafkaConsumerProperties
// )
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
// val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
stream.map { user =>
val deviceId = user.deviceId
val projects = user.projects.toList
val secondDemands = user.secondDemands.toList
val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId
println(deviceId)
// println(projects.mkString(" "))
// stream.map { user =>
// val projects = user.projects.toList
// val secondDemands = user.secondDemands.toList
// val secondPositions = user.secondPositions.toList
// val secondSolutions = user.secondSolutions.toList
// val cityId = user.cityId
// val deviceId = user.deviceId
// println(deviceId)
// // println(projects.mkString(" "))
val diaryRead = Redis.getRead(deviceId, "diary")
val tractateRead = Redis.getRead(deviceId, "tractate")
val answerRead = Redis.getRead(deviceId, "answer")
// val diaryReq =
// ES.generateDiaryRequest(projects, secondDemands, secondPositions, secondSolutions, cityId, 300)
// val tractateReq =
// ES.generateTractateRequest(projects, secondDemands, secondPositions, secondSolutions, 300)
// val answerReq = ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, 100)
// val serviceDiaryReq =
// ES.generateServiceDiaryRequest(projects, secondDemands, secondPositions, secondSolutions, cityId, 100)
val diaryReq =
ES.generateDiaryRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
300
)
val tractateReq =
ES.generateTractateRequest(projects, secondDemands, secondPositions, secondSolutions, tractateRead, 300)
val answerReq =
ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, answerRead, 100)
val serviceDiaryReq =
ES.generateServiceDiaryRequest(
projects,
secondDemands,
secondPositions,
secondSolutions,
diaryRead,
cityId,
100
)
// Redis.save(ES.request(diaryReq), deviceId, "diary")
// Redis.save(ES.request(tractateReq), deviceId, "tractate")
// Redis.save(ES.request(answerReq), deviceId, "answer")
// Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
Redis.save(ES.request(diaryReq), deviceId, "diary")
Redis.save(ES.request(tractateReq), deviceId, "tractate")
Redis.save(ES.request(answerReq), deviceId, "answer")
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
// deviceId
// }
deviceId
}
// // stream.print
// env.execute("flink streaming user portrait")
......
......@@ -65,6 +65,7 @@ object ES {
def request(req: SearchRequest): Future[IndexedSeq[Content]] = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
println(req.show)
client.execute(req).map(resp => resp.result.to[Content])
// client.close()
}
......@@ -109,12 +110,15 @@ object ES {
secondDemands: List[String],
secondPositions: List[String],
secondSolutions: List[String],
read: List[Long],
cityId: Int = -1,
size: Int = 300
): SearchRequest = {
val includes = List("id", "tags_v3")
var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
mustNotListBuffer += termQuery("show_by_index", 2)
if (projects.size > 0) {
scoreListBuffer += WeightScore(60).filter(
......@@ -138,6 +142,10 @@ object ES {
shouldListBuffer += TermsQuery("second_positions", secondPositions)
}
if (read.size > 0) {
mustNotListBuffer += termsQuery("id", read)
}
val res = search("gm-dbmw-diary-read")
.query {
functionScoreQuery()
......@@ -155,7 +163,7 @@ object ES {
rangeQuery("content_level").gte(3),
termQuery("content_simi_bol_show", 0)
)
.not(termQuery("show_by_index", 2))
.not(mustNotListBuffer.toList)
.should(shouldListBuffer.toList)
.minimumShouldMatch(1)
}
......@@ -184,11 +192,15 @@ object ES {
secondDemands: List[String],
secondPositions: List[String],
secondSolutions: List[String],
read: List[Long],
size: Int = 300
): SearchRequest = {
val includes = List("id", "tags_v3")
var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
mustNotListBuffer += termQuery("show_by_index", 2)
mustNotListBuffer += termQuery("status", 4)
if (projects.size > 0) {
scoreListBuffer += WeightScore(60).filter(
......@@ -212,6 +224,10 @@ object ES {
shouldListBuffer += TermsQuery("second_positions", secondPositions)
}
if (read.size > 0) {
mustNotListBuffer += termsQuery("id", read)
}
val res = search("gm-dbmw-tractate-read")
.query {
functionScoreQuery()
......@@ -224,7 +240,7 @@ object ES {
termQuery("is_online", true),
rangeQuery("content_level").gte(3)
)
.not(termQuery("show_by_index", 2), termQuery("status", 4))
.not(mustNotListBuffer.toList)
.should(shouldListBuffer.toList)
.minimumShouldMatch(1)
}
......@@ -244,11 +260,14 @@ object ES {
secondDemands: List[String],
secondPositions: List[String],
secondSolutions: List[String],
read: List[Long],
size: Int = 100
): SearchRequest = {
val includes = List("id", "tags_v3")
var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
mustNotListBuffer += termQuery("show_by_index", 2)
if (projects.size > 0) {
scoreListBuffer += WeightScore(60).filter(
......@@ -272,6 +291,10 @@ object ES {
shouldListBuffer += TermsQuery("second_positions", secondPositions)
}
if (read.size > 0) {
mustNotListBuffer += termsQuery("id", read)
}
val res = search("gm-dbmw-answer-read")
.query {
functionScoreQuery()
......@@ -285,7 +308,7 @@ object ES {
rangeQuery("content_level").gte(3),
rangeQuery("content_length").gte(30)
)
.not(termQuery("show_by_index", 2))
.not(mustNotListBuffer.toList)
.should(shouldListBuffer.toList)
.minimumShouldMatch(1)
}
......@@ -308,11 +331,15 @@ object ES {
secondDemands: List[String],
secondPositions: List[String],
secondSolutions: List[String],
read: List[Long],
cityId: Int = -1,
size: Int = 100
): SearchRequest = {
val includes = List("id", "tags_v3")
var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
mustNotListBuffer += termQuery("show_by_index", 2)
mustNotListBuffer += termQuery("doctor.hospital.id", "shyxylmryy2203")
if (projects.size > 0) {
shouldListBuffer += TermsQuery("tags_v3", projects)
......@@ -329,6 +356,10 @@ object ES {
shouldListBuffer += TermsQuery("second_solutions", secondSolutions)
}
if (read.size > 0) {
mustNotListBuffer += termsQuery("id", read)
}
val res = search("gm-dbmw-diary-read")
.query {
functionScoreQuery(
......@@ -350,7 +381,7 @@ object ES {
termQuery("service.is_promote", true),
rangeQuery("content_level").gte(3)
)
.not(termQuery("show_by_index", 2), termQuery("doctor.hospital.id", "shyxylmryy2203"))
.not(mustNotListBuffer.toList)
.should(shouldListBuffer.toList)
.minimumShouldMatch(1)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment