Commit 17140e3b authored by 赵威's avatar 赵威

try multi search

parent 41acdc4e
......@@ -29,6 +29,11 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
object Main {
def main(args: Array[String]): Unit = {
// println("###############")
// val query = ES.generateTractateQuery(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"))
// println(query)
// println("###############")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// TODO read from config
......@@ -47,17 +52,14 @@ object Main {
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
// println("###############")
// ES.generateDiaryQuery(-1, List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"))
// println("###############")
stream.map { user =>
println(user.deviceId)
val projects = user.projects.toList
val secondDemands = user.secondDemands.toList
if ((projects.size > 0) && (secondDemands.size > 0)) {
val query = ES.generateDiaryQuery(-1, projects, secondDemands)
ES.test(query)
if ((projects.size > 0) || (secondDemands.size > 0)) {
val diaryQuery = ES.generateDiaryQuery(projects, secondDemands, -1)
val tractateQuery = ES.generateTractateQuery(projects, secondDemands)
ES.test(diaryQuery, tractateQuery)
}
println("########################")
}
......
......@@ -40,39 +40,29 @@ object ESClient {
}
object ES {
def test(query: String) = {
def test(diaryQuery: String, tractateQuery: String) = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
val json = """
{
"query": {
"bool": {
"filter": [
{
"terms": { "tags_v3": ["光子嫩肤", "水光针"] }
}
]
}
}
}
""";
val resp = client.execute {
search("gm-dbmw-diary-read").source(query)
}.await
val resp = client
.execute(
multi(
search("gm-dbmw-diary-read").source(diaryQuery),
search("gm-dbmw-tractate-read").source(tractateQuery)
)
)
.await
println(resp)
resp
}
// SearchRequest
def generateDiaryQuery(
cityId: Int = -1,
projects: List[String] = List.empty[String],
secondDemands: List[String] = List.empty[String]
): String = {
val size = 100
def generateDiaryQuery(projects: List[String], secondDemands: List[String], cityId: Int = -1): String = {
val size = 10
val includes = List("id", "tags_v3")
// TODO read
val functions = new ListBuffer[JObject]()
val shoulds = new ListBuffer[JObject]()
if (projects.size > 0) {
......@@ -92,7 +82,7 @@ object ES {
shoulds += ("terms" -> ("second_demands" -> secondDemands))
}
val res = s"""
val query = s"""
{
"query": {
"function_score": {
......@@ -167,8 +157,71 @@ object ES {
}
"""
// println("diary query\n")
// println(res)
res
// println("diary query:")
// println(query)
query
}
def generateTractateQuery(projects: List[String], secondDemands: List[String]): String = {
val size = 10
val includes = List("id", "tags_v3")
val functions = new ListBuffer[JObject]()
val shoulds = new ListBuffer[JObject]()
if (projects.size > 0) {
val promotes: List[JObject] =
List(("terms" -> ("tags_v3" -> projects)), ("term" -> ("is_promote" -> true)))
functions += ("filter" -> ("bool" -> ("must" -> promotes))) ~ ("weight" -> 60)
functions += ("filter" -> ("terms" -> { "tags_v3" -> projects })) ~ ("weight" -> 50)
shoulds += ("terms" -> ("tags_v3" -> projects))
}
if (secondDemands.size > 0) {
functions += ("filter" -> ("terms" -> ("second_demands" -> secondDemands))) ~ ("weight" -> 40)
shoulds += ("terms" -> ("second_demands" -> secondDemands))
}
val query = s"""
{
"query": {
"function_score": {
"functions": ${pretty(render(functions.toList))},
"boost_mode": "replace",
"score_mode": "max",
"query": {
"bool": {
"should": ${pretty(render(shoulds.toList))},
"minimum_should_match": 1,
"must": [
{
"term": {
"is_online": "true"
}
},
{
"terms": {
"content_level": [6, 5, 4, 3.5, 3]
}
}
],
"must_not": [
{ "term": { "status": 4 } },
{ "term": { "show_by_index": 2 } }
]
}
}
}
},
"size": ${size},
"_source": ${pretty(render(includes))}
}
"""
// println("tractate query:")
// println(query)
query
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment