Commit 704855bf authored by 赵威's avatar 赵威

parse json

parent b599f53d
......@@ -7,6 +7,10 @@ ThisBuild / organization := "com.gmei.up"
// val es = "org.elasticsearch" % "elasticsearch" % esVersion
// val esCore= "org.elasticsearch" % "elasticsearch-core" % esVersion
val jsonVersion = "3.6.10"
val jsonNative = "org.json4s" %% "json4s-native" % jsonVersion
val jsonJackson = "org.json4s" %% "json4s-jackson" % jsonVersion
val elastic4sVersion = "7.5.0"
val esCore = "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion
val esJava = "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion
......@@ -28,14 +32,13 @@ lazy val root = (project in file("."))
.settings(
name := "streamingUserPortrait",
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.75",
libraryDependencies += jsonNative,
libraryDependencies += jsonJackson,
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "6.7.8",
libraryDependencies += esCore,
libraryDependencies += esJava,
// libraryDependencies += es,
// libraryDependencies += esCore,
libraryDependencies += flinkCore,
libraryDependencies += flinkScala,
libraryDependencies += flinkStreamingScala,
......
......@@ -10,15 +10,22 @@ import org.apache.flink.streaming.connectors.kafka.{
}
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON
// TODO
// import com.alibaba.fastjson.JSON
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import com.gmei.up.utils.{ UserInfo, ES }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
implicit val formats = DefaultFormats
override def isEndOfStream(t: UserInfo): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): UserInfo = {
val info = new String(consumerRecord.value(), "UTF-8")
val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
// TODO
// val userInfo: UserInfo = JSON.parseObject(info, classOf[UserInfo])
val userInfo: UserInfo = parse(info).extract[UserInfo]
userInfo
}
......@@ -28,6 +35,10 @@ class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo]
object Main {
def main(args: Array[String]): Unit = {
// println("###############")
// ES.generateDiaryQuery(-1, List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"))
// println("###############")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// TODO read from config
......@@ -44,10 +55,6 @@ object Main {
kafkaConsumerProperties
)
println("###############")
ES.test()
println("###############")
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
stream.map { user =>
......
package com.gmei.up.utils
import scala.collection.mutable.ListBuffer
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.http.JavaClient
import org.apache.http.HttpHost
......@@ -8,52 +9,15 @@ import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
import com.sksamuel.elastic4s.requests.searches.{ SearchRequest, SearchResponse }
import com.sksamuel.elastic4s.ElasticDsl._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.JObject
// import org.json4s.jackson.Serialization.write
// import scala.concurrent._
// import scala.concurrent.duration._
// import ExecutionContext.Implicits.global
// import com.sksamuel.elastic4s.ElasticsearchClientUri
// import com.sksamuel.elastic4s.http.{ ElasticClient, ElasticProperties, Response }
// import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.ElasticDsl._
// import org.apache.http.client.config.RequestConfig
// import org.apache.http.impl.client.BasicCredentialsProvider
// import org.apache.http.auth.{ AuthScope, UsernamePasswordCredentials }
// import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
// import org.elasticsearch.client.RestClientBuilder.{ HttpClientConfigCallback, RequestConfigCallback }
// // object ES {
// // // TODO read config from file
// // val host = "172.16.52.33"
// // val port = 9200
// // val username = "elastic"
// // val password = "gengmei!@#"
// // val diaryIndex = "gm-dbmw-diary-read"
// // // val nodes = ElasticProperties(
// // // "http://172.16.52.33:9200,http://172.16.52.19:9200,http://172.16.52.27:9200,http://172.16.52.34:9200,http://172.16.52.48:9200"
// // // )
// // // val client = ElasticClient(nodes)
// // def test() {
// // val json =
// // """
// // |{
// // | "query" : {
// // | "match" : {"tags_v3" : ["光子嫩肤", "水光针"]}
// // | }
// // |}
// // |""".stripMargin
// // val resp = client.execute {
// // search("gm-dbmw-diary-read").source(json)
// // }.await
// // println(resp)
// // resp
// // }
// // }
object ESClient {
private class ElasticCredentials(username: String, password: String) extends HttpClientConfigCallback {
......@@ -61,18 +25,11 @@ object ESClient {
val credentialsProvider = new BasicCredentialsProvider()
credentialsProvider.setCredentials(AuthScope.ANY, credentials)
override def customizeHttpClient(
httpClientBuilder: HttpAsyncClientBuilder
): HttpAsyncClientBuilder =
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder =
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
}
def create(
hostname: String,
port: Int,
username: String,
password: String
): ElasticClient = {
def create(hostname: String, port: Int, username: String, password: String): ElasticClient = {
val restClient = RestClient
.builder(new HttpHost(hostname, port, "http"))
.setHttpClientConfigCallback(new ElasticCredentials(username, password))
......@@ -83,8 +40,8 @@ object ESClient {
}
object ES {
def test() {
val client = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
def test() = {
val client: ElasticClient = ESClient.create("172.16.52.33", 9200, "elastic", "gengmei!@#")
val json = """
{
"query": {
......@@ -106,4 +63,112 @@ object ES {
println(resp)
resp
}
// SearchRequest
def generateDiaryQuery(
cityId: Int = -1,
projects: List[String] = List.empty[String],
secondDemands: List[String] = List.empty[String]
): String = {
val size = 100
val includes = List("id", "tags_v3")
val functions = new ListBuffer[JObject]()
val shoulds = new ListBuffer[JObject]()
if (projects.size > 0) {
val promotes: List[JObject] =
List(("terms" -> ("tags_v3" -> projects)), ("term" -> ("is_promote" -> true)))
functions += ("filter" -> ("bool" -> ("must" -> promotes))) ~ ("weight" -> 60)
functions += ("filter" -> ("terms" -> { "tags_v3" -> projects })) ~ ("weight" -> 50)
shoulds += ("terms" -> ("tags_v3" -> projects))
}
if (secondDemands.size > 0) {
functions += ("filter" -> ("terms" -> ("second_demands" -> secondDemands))) ~ ("weight" -> 40)
shoulds += ("terms" -> ("second_demands" -> secondDemands))
}
val res = s"""
{
"query": {
"function_score": {
"functions": ${pretty(render(functions.toList))},
"boost_mode": "replace",
"score_mode": "max",
"query": {
"bool": {
"should": ${pretty(render(shoulds.toList))},
"minimum_should_match": 1,
"must": [
{
"term": {
"is_online": "true"
}
},
{
"term": {
"has_cover": "true"
}
},
{
"term": {
"is_sink": "false"
}
},
{
"term": {
"has_after_cover": "true"
}
},
{
"term": {
"has_before_cover": "true"
}
},
{
"terms": {
"content_level": [6, 5, 4, 3.5, 3]
}
},
{
"term": {
"content_simi_bol_show": 0
}
}
],
"must_not": [{ "term": { "show_by_index": 2 } }]
}
}
}
},
"size": ${size},
"_source": ${pretty(render(includes))},
"sort": [
{
"_script": {
"type": "number",
"script": {
"id": "diary-recommend",
"params": { "user_city_tag_id": ${cityId} }
},
"order": "desc"
}
},
{ "_score": { "order": "desc" } },
{ "has_video_cover": { "order": "asc" } },
{ "offline_score": { "order": "desc" } },
{ "good_click": { "order": "desc" } },
{ "last_update_time": { "order": "desc" } }
]
}
"""
// println("diary query\n")
// println(res)
res
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment