Commit f7e12461 authored by 赵威's avatar 赵威

search diary

parent 99ed8914
......@@ -30,8 +30,8 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
object Main {
def main(args: Array[String]): Unit = {
// println("###############")
// val query = ES.generateTractateQuery(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"))
// println(query)
// val query = ES.generateDiaryQuery(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// query
// println("###############")
val env = StreamExecutionEnvironment.getExecutionEnvironment
......
......@@ -2,24 +2,32 @@ package com.gmei.up.utils
import scala.collection.mutable.ListBuffer
import scala.util.Try
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.http.JavaClient
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.JObject
import org.apache.http.HttpHost
import org.apache.http.auth.{ AuthScope, UsernamePasswordCredentials }
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
import com.sksamuel.elastic4s.requests.searches.{ SearchRequest, SearchResponse }
import com.sksamuel.elastic4s.requests.searches.sort.{ FieldSort, SortOrder, ScriptSortType }
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ HitReader, Hit }
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.JObject
// import org.json4s.jackson.Serialization.write
// import scala.concurrent._
// import scala.concurrent.duration._
// import ExecutionContext.Implicits.global
import com.sksamuel.elastic4s.{ HitReader, Hit, ElasticClient }
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.requests.searches.queries.term.TermsQuery
import com.sksamuel.elastic4s.requests.script.{ Script, ScriptType }
import com.sksamuel.elastic4s.requests.searches.queries.funcscorer.{
CombineFunction,
FunctionScoreQueryScoreMode,
WeightScore,
ScoreFunction
}
object ESClient {
private class ElasticCredentials(username: String, password: String) extends HttpClientConfigCallback {
......@@ -51,12 +59,12 @@ object ES {
}
}
def test(diaryQuery: String, tractateQuery: String) = {
def test(diaryQuery: SearchRequest, tractateQuery: String) = {
// TODO read from config
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
val bb = multi(
search("gm-dbmw-diary-read").source(s"""${diaryQuery}"""),
diaryQuery,
search("gm-dbmw-tractate-read").source(tractateQuery)
)
......@@ -81,110 +89,69 @@ object ES {
resp
}
// SearchRequest
def generateDiaryQuery(projects: List[String], secondDemands: List[String], cityId: Int = -1): String = {
val size = 30
def generateDiaryQuery(
projects: List[String],
secondDemands: List[String],
cityId: Int = -1
): SearchRequest = {
val size = 15
val includes = List("id", "tags_v3")
var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
// TODO read
val functions = new ListBuffer[JObject]()
val shoulds = new ListBuffer[JObject]()
if (projects.size > 0) {
val promotes: List[JObject] =
List(("terms" -> ("tags_v3" -> projects)), ("term" -> ("is_promote" -> true)))
functions += ("filter" -> ("bool" -> ("must" -> promotes))) ~ ("weight" -> 60)
functions += ("filter" -> ("terms" -> { "tags_v3" -> projects })) ~ ("weight" -> 50)
shoulds += ("terms" -> ("tags_v3" -> projects))
scoreListBuffer += WeightScore(60).filter(
must(termsQuery("tags_v3", projects), termQuery("is_promote", true))
)
scoreListBuffer += WeightScore(50).filter(termsQuery("tags_v3", projects))
shouldListBuffer += TermsQuery("tags_v3", projects)
}
if (secondDemands.size > 0) {
functions += ("filter" -> ("terms" -> ("second_demands" -> secondDemands))) ~ ("weight" -> 40)
shoulds += ("terms" -> ("second_demands" -> secondDemands))
scoreListBuffer += WeightScore(40).filter(termsQuery("second_demands", secondDemands))
shouldListBuffer += TermsQuery("second_demands", secondDemands)
}
val query = s"""
{
"query": {
"function_score": {
"functions": ${pretty(render(functions.toList))},
"boost_mode": "replace",
"score_mode": "max",
"query": {
"bool": {
"should": ${pretty(render(shoulds.toList))},
"minimum_should_match": 1,
"must": [
{
"term": {
"is_online": "true"
}
},
{
"term": {
"has_cover": "true"
}
},
{
"term": {
"is_sink": "false"
}
},
{
"term": {
"has_after_cover": "true"
}
},
{
"term": {
"has_before_cover": "true"
}
},
{
"terms": {
"content_level": [6, 5, 4, 3.5, 3]
}
},
{
"term": {
"content_simi_bol_show": 0
}
}
],
"must_not": [{ "term": { "show_by_index": 2 } }]
}
}
}
},
"size": ${size},
"_source": {"include": ${pretty(render(includes))}},
"sort": [
{
"_script": {
"type": "number",
"script": {
"id": "diary-recommend",
"params": { "user_city_tag_id": ${cityId} }
},
"order": "desc"
val res = search("gm-dbmw-diary-read")
.query {
functionScoreQuery()
.functions(scoreListBuffer.toList)
.scoreMode(FunctionScoreQueryScoreMode.Max)
.boostMode(CombineFunction.Replace)
.query {
boolQuery()
.must(
termQuery("is_online", true),
termQuery("has_cover", true),
termQuery("is_sink", false),
termQuery("has_after_cover", true),
termQuery("has_before_cover", true),
rangeQuery("content_level").gte(3),
termQuery("content_simi_bol_show", 0)
)
.not(termQuery("show_by_index", 2))
.should(shouldListBuffer.toList)
.minimumShouldMatch(1)
}
}
},
{ "_score": { "order": "desc" } },
{ "has_video_cover": { "order": "asc" } },
{ "offline_score": { "order": "desc" } },
{ "good_click": { "order": "desc" } },
{ "last_update_time": { "order": "desc" } }
]
}
""".stripMargin
// println("diary query:")
// println(query)
query
.size(size)
.sourceInclude(includes)
.sortBy(
scriptSort(
script(
"if (doc['doctor.hospital.city_tag_id'].size()!=0){if (doc['doctor.hospital.city_tag_id'].value ==params.user_city_tag_id){return 2;} else if (doc['nearby_city_tags_v1.tag_id'].contains(params.user_city_tag_id) ) {return 1;} else {return 0;} }else{return 0}"
).params(Map("user_city_tag_id" -> cityId)).lang("painless")
).typed("number").order(SortOrder.Desc),
FieldSort("_score").desc(),
FieldSort("has_video_cover").asc(),
FieldSort("offline_score").desc(),
FieldSort("good_click").desc(),
FieldSort("last_update_time").desc()
)
println("diary query")
println(res.show)
res
}
def generateTractateQuery(projects: List[String], secondDemands: List[String]): String = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment