Commit aef54307 authored by 赵威's avatar 赵威

get read

parent 474d2adc
...@@ -33,51 +33,53 @@ object Main { ...@@ -33,51 +33,53 @@ object Main {
// val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1) // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// println("###############") // println("###############")
val env = StreamExecutionEnvironment.getExecutionEnvironment Redis.getRead("abc", "diary")
// TODO read from config // val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties.setProperty(
"bootstrap.servers",
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
)
val kafkaConsumer = new FlinkKafkaConsumer[UserInfo]( // // TODO read from config
"gm-portrait-update-device", // val kafkaConsumerProperties = new Properties
new UserInfoDeserializationSchema, // kafkaConsumerProperties.setProperty("group.id", "user_portrait_flink_streaming")
kafkaConsumerProperties // kafkaConsumerProperties.setProperty(
) // "bootstrap.servers",
// "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
// )
val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer) // val kafkaConsumer = new FlinkKafkaConsumer[UserInfo](
// "gm-portrait-update-device",
// new UserInfoDeserializationSchema,
// kafkaConsumerProperties
// )
stream.map { user => // val stream: DataStream[UserInfo] = env.addSource(kafkaConsumer)
val projects = user.projects.toList
val secondDemands = user.secondDemands.toList
val secondPositions = user.secondPositions.toList
val secondSolutions = user.secondSolutions.toList
val cityId = user.cityId
val deviceId = user.deviceId
println(deviceId)
// println(projects.mkString(" "))
val diaryReq = // stream.map { user =>
ES.generateDiaryRequest(projects, secondDemands, secondPositions, secondSolutions, cityId, 300) // val projects = user.projects.toList
val tractateReq = // val secondDemands = user.secondDemands.toList
ES.generateTractateRequest(projects, secondDemands, secondPositions, secondSolutions, 300) // val secondPositions = user.secondPositions.toList
val answerReq = ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, 100) // val secondSolutions = user.secondSolutions.toList
val serviceDiaryReq = // val cityId = user.cityId
ES.generateServiceDiaryRequest(projects, secondDemands, secondPositions, secondSolutions, cityId, 100) // val deviceId = user.deviceId
// println(deviceId)
// // println(projects.mkString(" "))
Redis.save(ES.request(diaryReq), deviceId, "diary") // val diaryReq =
Redis.save(ES.request(tractateReq), deviceId, "tractate") // ES.generateDiaryRequest(projects, secondDemands, secondPositions, secondSolutions, cityId, 300)
Redis.save(ES.request(answerReq), deviceId, "answer") // val tractateReq =
Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary") // ES.generateTractateRequest(projects, secondDemands, secondPositions, secondSolutions, 300)
// val answerReq = ES.generateAnswerRequest(projects, secondDemands, secondPositions, secondSolutions, 100)
// val serviceDiaryReq =
// ES.generateServiceDiaryRequest(projects, secondDemands, secondPositions, secondSolutions, cityId, 100)
deviceId // Redis.save(ES.request(diaryReq), deviceId, "diary")
} // Redis.save(ES.request(tractateReq), deviceId, "tractate")
// Redis.save(ES.request(answerReq), deviceId, "answer")
// Redis.save(ES.request(serviceDiaryReq), deviceId, "service_diary")
// deviceId
// }
// stream.print // // stream.print
env.execute("flink streaming user portrait") // env.execute("flink streaming user portrait")
} }
} }
...@@ -8,9 +8,36 @@ object Redis { ...@@ -8,9 +8,36 @@ object Redis {
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
val pRc2 = new RedisClient("172.16.40.173", 6379, 0, Some("ReDis!GmTx*0aN9"), 5000)
// def get_user_portrait_tag3_read_v2(device_id: str, content_type: str) -> Tuple[List[Any], List[Any]]:
// key = get_user_portrait_tag3_read_key_v2(device_id, content_type)
// date_list_dict = await redis_client2.hgetall(key)
// read_list_v2 = []
// today_read = []
// if date_list_dict:
// before_5_days_date = get_before_5_days_date()
// today_read = json.loads(date_list_dict.get(before_5_days_date[0].encode(), '[]'))
// # 删除5天以外的已读记录
// for day in date_list_dict:
// day_decode = day.decode()
// if day_decode in before_5_days_date:
// read_list_v2.extend(json.loads(date_list_dict[day]))
// else:
// await redis_client2.hdel(key, day)
// res = (read_list_v2, today_read)
// return res
val pRc4 = val pRc4 =
new RedisClientPool("172.16.50.145", 6379, database = 0, secret = Some("XfkMCCdWDIU%ls$h"), timeout = 500) new RedisClientPool("172.16.50.145", 6379, database = 0, secret = Some("XfkMCCdWDIU%ls$h"), timeout = 500)
def getRead(deviceId: String, contentType: String): Unit = {
val key = s"doris:user_portrait:tag3:read_v2:device_id:${deviceId}:content_type:${contentType}"
val today = java.time.LocalDate.now.toString()
val res = pRc2.hget(key, today)
println(res)
}
def save(contentF: Future[IndexedSeq[Content]], deviceId: String, contentType: String): Unit = { def save(contentF: Future[IndexedSeq[Content]], deviceId: String, contentType: String): Unit = {
val key = s"streaming:candidate:${contentType}:device_id:${deviceId}" val key = s"streaming:candidate:${contentType}:device_id:${deviceId}"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment