ES.scala 13.8 KB
package com.gmei.up.utils

import scala.collection.mutable.ListBuffer
import scala.util.Try
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.global

import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.JObject

import org.apache.http.HttpHost
import org.apache.http.auth.{ AuthScope, UsernamePasswordCredentials }
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder

import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback

import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ HitReader, Hit, ElasticClient, Response }
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.searches.{ SearchRequest, SearchResponse }
import com.sksamuel.elastic4s.requests.searches.sort.{ FieldSort, SortOrder, ScriptSortType }
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.requests.searches.queries.term.TermsQuery
import com.sksamuel.elastic4s.requests.script.{ Script, ScriptType }
import com.sksamuel.elastic4s.requests.searches.queries.funcscorer.{
  CombineFunction,
  FunctionScoreQueryScoreMode,
  WeightScore,
  ScoreFunction,
  ScriptScore
}

object ESClient {
  private class ElasticCredentials(username: String, password: String) extends HttpClientConfigCallback {
    val credentials = new UsernamePasswordCredentials(username, password)
    val credentialsProvider = new BasicCredentialsProvider()
    credentialsProvider.setCredentials(AuthScope.ANY, credentials)

    override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder =
      httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
  }

  def create(hostname: String, port: Int, username: String, password: String): ElasticClient = {
    val restClient = RestClient
      .builder(new HttpHost(hostname, port, "http"))
      .setHttpClientConfigCallback(new ElasticCredentials(username, password))
      .build()

    ElasticClient(JavaClient.fromRestClient(restClient))
  }
}

object ES {
  implicit object ContentHitReader extends HitReader[Content] {
    override def read(hit: Hit): Try[Content] = {
      val source = hit.sourceAsMap
      Try(Content(source("id").toString.toLong, hit.index, source("tags_v3").toString))
    }
  }

  implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

  def request(req: SearchRequest): Future[IndexedSeq[Content]] = {
    val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
    // println(req.show)
    client.execute(req).map(resp => resp.result.to[Content])
    // client.close()
  }

  def test(
      diaryRequest: SearchRequest,
      tractateRequest: SearchRequest,
      answerRequest: SearchRequest,
      serviceDiaryReq: SearchRequest
  ) = {
    // TODO read from config
    val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")

    val multiReq = multi(
      diaryRequest,
      tractateRequest,
      answerRequest,
      serviceDiaryReq
    )

    println(multiReq.show)

    // TODO remove await
    val resp = client.execute(multiReq).await

    // println(resp)

    val a = resp.result.to[Content]

    println("%%%%%%%%%%%")
    println(a.size)
    a.foreach { x =>
      println(x.id, x.index, x.projects)
    }
    println("%%%%%%%%%%%")

    resp
  }

  def generateDiaryRequest(
      projects: List[String],
      secondDemands: List[String],
      secondPositions: List[String],
      secondSolutions: List[String],
      read: List[Long],
      cityId: Int = -1,
      size: Int = 300
  ): SearchRequest = {
    val includes = List("id", "tags_v3")
    var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
    var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    mustNotListBuffer += termQuery("show_by_index", 2)

    if (projects.size > 0) {
      scoreListBuffer += WeightScore(60).filter(
        must(termsQuery("tags_v3", projects), termQuery("is_promote", true))
      )
      scoreListBuffer += WeightScore(50).filter(termsQuery("tags_v3", projects))
      shouldListBuffer += TermsQuery("tags_v3", projects)
    }
    if (secondDemands.size > 0) {
      scoreListBuffer += WeightScore(40).filter(termsQuery("second_demands", secondDemands))
      shouldListBuffer += TermsQuery("second_demands", secondDemands)
    }

    if (secondSolutions.size > 0) {
      scoreListBuffer += WeightScore(30).filter(termsQuery("second_solutions", secondSolutions))
      shouldListBuffer += TermsQuery("second_solutions", secondSolutions)
    }

    if (secondPositions.size > 0) {
      scoreListBuffer += WeightScore(20).filter(termsQuery("second_positions", secondPositions))
      shouldListBuffer += TermsQuery("second_positions", secondPositions)
    }

    if (read.size > 0) {
      mustNotListBuffer += termsQuery("id", read)
    }

    val res = search("gm-dbmw-diary-read")
      .query {
        functionScoreQuery()
          .functions(scoreListBuffer.toList)
          .scoreMode(FunctionScoreQueryScoreMode.Max)
          .boostMode(CombineFunction.Replace)
          .query {
            boolQuery()
              .must(
                termQuery("is_online", true),
                termQuery("has_cover", true),
                termQuery("is_sink", false),
                termQuery("has_after_cover", true),
                termQuery("has_before_cover", true),
                rangeQuery("content_level").gte(3),
                termQuery("content_simi_bol_show", 0)
              )
              .not(mustNotListBuffer.toList)
              .should(shouldListBuffer.toList)
              .minimumShouldMatch(1)
          }
      }
      .size(size)
      .sourceInclude(includes)
      .sortBy(
        scriptSort(
          script(
            "if (doc['doctor.hospital.city_tag_id'].size()!=0){if (doc['doctor.hospital.city_tag_id'].value ==params.user_city_tag_id){return 2;} else if (doc['nearby_city_tags_v1.tag_id'].contains(params.user_city_tag_id) ) {return 1;} else {return 0;} }else{return 0}"
          ).params(Map("user_city_tag_id" -> cityId)).lang("painless")
        ).typed("number").order(SortOrder.Desc),
        FieldSort("_score").desc(),
        FieldSort("has_video_cover").asc(),
        FieldSort("offline_score").desc(),
        FieldSort("good_click").desc(),
        FieldSort("last_update_time").desc()
      )
    // println("diary query")
    // println(res.show)
    res
  }

  def generateTractateRequest(
      projects: List[String],
      secondDemands: List[String],
      secondPositions: List[String],
      secondSolutions: List[String],
      read: List[Long],
      size: Int = 300
  ): SearchRequest = {
    val includes = List("id", "tags_v3")
    var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
    var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    mustNotListBuffer += termQuery("show_by_index", 2)
    mustNotListBuffer += termQuery("status", 4)

    if (projects.size > 0) {
      scoreListBuffer += WeightScore(60).filter(
        must(termsQuery("tags_v3", projects), termQuery("is_promote", true))
      )
      scoreListBuffer += WeightScore(50).filter(termsQuery("tags_v3", projects))
      shouldListBuffer += TermsQuery("tags_v3", projects)
    }
    if (secondDemands.size > 0) {
      scoreListBuffer += WeightScore(40).filter(termsQuery("second_demands", secondDemands))
      shouldListBuffer += TermsQuery("second_demands", secondDemands)
    }

    if (secondSolutions.size > 0) {
      scoreListBuffer += WeightScore(30).filter(termsQuery("second_solutions", secondSolutions))
      shouldListBuffer += TermsQuery("second_solutions", secondSolutions)
    }

    if (secondPositions.size > 0) {
      scoreListBuffer += WeightScore(20).filter(termsQuery("second_positions", secondPositions))
      shouldListBuffer += TermsQuery("second_positions", secondPositions)
    }

    if (read.size > 0) {
      mustNotListBuffer += termsQuery("id", read)
    }

    val res = search("gm-dbmw-tractate-read")
      .query {
        functionScoreQuery()
          .functions(scoreListBuffer.toList)
          .scoreMode(FunctionScoreQueryScoreMode.Max)
          .boostMode(CombineFunction.Replace)
          .query {
            boolQuery()
              .must(
                termQuery("is_online", true),
                rangeQuery("content_level").gte(3)
              )
              .not(mustNotListBuffer.toList)
              .should(shouldListBuffer.toList)
              .minimumShouldMatch(1)
          }
      }
      .size(size)
      .sourceInclude(includes)
      .sortBy(
        FieldSort("_score").desc()
      )
    // println("tractate query")
    // println(res.show)
    res
  }

  def generateAnswerRequest(
      projects: List[String],
      secondDemands: List[String],
      secondPositions: List[String],
      secondSolutions: List[String],
      read: List[Long],
      size: Int = 100
  ): SearchRequest = {
    val includes = List("id", "tags_v3")
    var scoreListBuffer: ListBuffer[ScoreFunction] = ListBuffer.empty[ScoreFunction]
    var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    mustNotListBuffer += termQuery("show_by_index", 2)

    if (projects.size > 0) {
      scoreListBuffer += WeightScore(60).filter(
        must(termsQuery("tags_v3", projects), termQuery("is_promote", true))
      )
      scoreListBuffer += WeightScore(50).filter(termsQuery("tags_v3", projects))
      shouldListBuffer += TermsQuery("tags_v3", projects)
    }
    if (secondDemands.size > 0) {
      scoreListBuffer += WeightScore(40).filter(termsQuery("second_demands", secondDemands))
      shouldListBuffer += TermsQuery("second_demands", secondDemands)
    }

    if (secondSolutions.size > 0) {
      scoreListBuffer += WeightScore(30).filter(termsQuery("second_solutions", secondSolutions))
      shouldListBuffer += TermsQuery("second_solutions", secondSolutions)
    }

    if (secondPositions.size > 0) {
      scoreListBuffer += WeightScore(20).filter(termsQuery("second_positions", secondPositions))
      shouldListBuffer += TermsQuery("second_positions", secondPositions)
    }

    if (read.size > 0) {
      mustNotListBuffer += termsQuery("id", read)
    }

    val res = search("gm-dbmw-answer-read")
      .query {
        functionScoreQuery()
          .functions(scoreListBuffer.toList)
          .scoreMode(FunctionScoreQueryScoreMode.Max)
          .boostMode(CombineFunction.Replace)
          .query {
            boolQuery()
              .must(
                termQuery("is_online", true),
                rangeQuery("content_level").gte(3),
                rangeQuery("content_length").gte(30)
              )
              .not(mustNotListBuffer.toList)
              .should(shouldListBuffer.toList)
              .minimumShouldMatch(1)
          }
      }
      .size(size)
      .sourceInclude(includes)
      .sortBy(
        FieldSort("_score").desc(),
        FieldSort("has_picture").desc(),
        FieldSort("smart_rank_v2").desc(),
        FieldSort("good_click").desc()
      )
    // println("answer query")
    // println(res.show)
    res
  }

  def generateServiceDiaryRequest(
      projects: List[String],
      secondDemands: List[String],
      secondPositions: List[String],
      secondSolutions: List[String],
      read: List[Long],
      cityId: Int = -1,
      size: Int = 100
  ): SearchRequest = {
    val includes = List("id", "tags_v3")
    var shouldListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    val mustNotListBuffer: ListBuffer[Query] = ListBuffer.empty[Query]
    mustNotListBuffer += termQuery("show_by_index", 2)
    mustNotListBuffer += termQuery("doctor.hospital.id", "shyxylmryy2203")

    if (projects.size > 0) {
      shouldListBuffer += TermsQuery("tags_v3", projects)
    }
    if (secondDemands.size > 0) {
      shouldListBuffer += TermsQuery("second_demands", secondDemands)
    }

    if (secondPositions.size > 0) {
      shouldListBuffer += TermsQuery("second_positions", secondPositions)
    }

    if (secondSolutions.size > 0) {
      shouldListBuffer += TermsQuery("second_solutions", secondSolutions)
    }

    if (read.size > 0) {
      mustNotListBuffer += termsQuery("id", read)
    }

    val res = search("gm-dbmw-diary-read")
      .query {
        functionScoreQuery(
          )
          .functions(
            ScriptScore(
              "return (doc.containsKey('new_smr')&&doc['new_smr'].value>0?doc['new_smr'].value:0.0) * ((doc.containsKey('is_promote')&&doc['is_promote'].value?5.0:0.0)+(doc.containsKey('has_before_cover')&&doc['has_before_cover'].value?2.0:0.0) + (doc.containsKey('has_after_cover')&&doc['has_after_cover'].value?2.0:0.0)+(doc.containsKey('has_after_cover')&&doc['has_after_cover'].value?8.0:0.0))"
            ),
            WeightScore(1000000).filter(termQuery("doctor.hospital.city_tag_id", cityId)),
            WeightScore(1000000).filter(termQuery("service_nearby_city_tag_ids", cityId))
          )
          .scoreMode(FunctionScoreQueryScoreMode.Sum)
          .boostMode(CombineFunction.Sum)
          .query {
            boolQuery()
              .must(
                termQuery("is_online", true),
                termQuery("has_service", true),
                termQuery("service.is_promote", true),
                rangeQuery("content_level").gte(3)
              )
              .not(mustNotListBuffer.toList)
              .should(shouldListBuffer.toList)
              .minimumShouldMatch(1)
          }
      }
      .size(size)
      .sourceInclude(includes)
    // println("service diary query")
    // println(res.show)
    res
  }
}