#!/usr/bin/env python # -*- coding: utf-8 -*- import logging import traceback import json from libs.es import ESPerform from .common import TopicDocumentField from search.utils.common import * class TopicUtils(object): @classmethod def get_related_user_info(cls, user_id, offset=0, size=10): """ :remark:获取指定用户相关用户列表 :param user_id: :param offset: :param size: :return: """ try: q = dict() q["query"] = { "term":{ "user_id": user_id } } q["_source"] = ["tag_list","attention_user_id_list", "pick_user_id_list", "same_group_user_id_list"] result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset, size) return result_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return {"total_count":0,"hits":[]} @classmethod def analyze_related_user_id_list(cls,related_user_id_list): """ :remark:获取指定用户关联的 用户列表 :param related_user_id_list: :return: """ try: chinese_user_id_list = list() japan_user_id_list = list() korea_user_id_list = list() for item in related_user_id_list: if item["country_id"] == 0: chinese_user_id_list.append(item["user_id"]) elif item["country_id"] == 1: japan_user_id_list.append(item["user_id"]) elif item["country_id"] == 2: korea_user_id_list.append(item["user_id"]) return (chinese_user_id_list,japan_user_id_list,korea_user_id_list) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return ([],[],[]) @classmethod def refresh_redis_hash_data(cls, redis_cli,redis_key,redis_data_dict): try: redis_cli.hmset(redis_key, redis_data_dict) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def ___get_should_term_list(cls,ori_list,field_name="tag_list"): try: should_term_list = list() for term_id in ori_list: term_dict = { "term":{ field_name:{"value":term_id} } } should_term_list.append(term_dict) return should_term_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_recommend_topic_ids(cls,user_id,offset,size,query=None,query_type=TopicPageType.HOME_RECOMMEND): """ :需增加打散逻辑 :remark:获取首页推荐帖子列表 :param user_id: :param offset: :param size: :param is_first_time: :return: """ try: attention_user_id_list = list() pick_user_id_list = list() same_group_id_list = list() user_tag_list = list() result_dict = TopicUtils.get_related_user_info(user_id, 0, 1) if len(result_dict["hits"]) == 0: logging.warning("not find user_id:%d in es!" % int(user_id)) else: attention_user_info_list = result_dict["hits"][0]["_source"]["attention_user_id_list"] attention_user_id_list = [item["user_id"] for item in attention_user_info_list] pick_user_info_list = result_dict["hits"][0]["_source"]["pick_user_id_list"] pick_user_id_list = [item["user_id"] for item in pick_user_info_list] same_group_user_info_list = result_dict["hits"][0]["_source"]["same_group_user_id_list"] same_group_id_list = [item["user_id"] for item in same_group_user_info_list] user_tag_list = result_dict["hits"][0]["_source"]["tag_list"] # attention_user_id_term_list = cls.___get_should_term_list(attention_user_id_list,field_name="user_id") # pick_user_id_term_list = cls.___get_should_term_list(pick_user_id_list,field_name="user_id") # same_group_user_id_term_list = cls.___get_should_term_list(same_group_id_list,field_name="user_id") q = dict() q["query"] = dict() functions_list = [ { "gauss": { "update_time": { "scale": "1d", "decay": 0.5 } } } ] if len(attention_user_id_list)>0: functions_list.append( { "filter": {"bool": { "should": {"terms":{"user_id":attention_user_id_list}}}}, "weight": 3, } ) if len(pick_user_id_list)>0: functions_list.append( { "filter": {"bool": { "should": {"terms":{"user_id":pick_user_id_list}}}}, "weight": 2 } ) if len(same_group_id_list)>0: functions_list.append( { "filter": {"bool": { "should": {"terms":{"user_id":same_group_id_list}}}}, "weight": 1 } ) # query_tag_term_list = cls.___get_should_term_list(user_tag_list) if len(user_tag_list)>0: functions_list.append( { "filter":{"bool":{ "should":{"terms":{"tag_list":user_tag_list}}}}, "weight": 1 } ) low_content_level = 4 if query_type==TopicPageType.FIND_PAGE else 3 query_function_score = { "query": { "bool": { "must": [ {"range": {"content_level": {"gte": low_content_level, "lte": 5}}}, {"term": {"has_image":True}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}} ] } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } if query is not None:#搜索帖子 multi_fields = { 'description': 200, 'content': 300, 'name': 400, 'tag_name_list':300, } query_fields = ['^'.join((k, str(v))) for (k, v) in multi_fields.items()] multi_match = { 'query': query, 'type': 'cross_fields', 'operator': 'and', 'fields': query_fields, } query_function_score["query"]["bool"]["should"] = [ {'multi_match': multi_match} ] query_function_score["query"]["bool"]["minimum_should_match"] = 1 q["query"]["function_score"] = query_function_score q["_source"] = { "include":["id","group_id","offline_score"] } q["sort"] = [ { "_script":{ "type":"number", "script":{ "lang":"painless", "source":"_score+params._source.offline_score" }, "order":"desc" } } ] result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, offset=offset, size=size) if len(result_dict["hits"])>0: return [item["_source"] for item in result_dict["hits"]] else: return [] except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_topic_detail_recommend_list(cls,user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size): """ :remark 帖子详情页推荐列表,缺少按时间衰减 :param user_id: :param topic_tag_list: :param topic_group_id: :param topic_user_id: :param offset: :param size: :return: """ try: q = dict() q["query"] = dict() functions_list = [ { "filter": {"term": { "user_id": topic_user_id}}, "weight": 1000 }, { "gauss": { "update_time": { "scale": "1d", "decay": 0.5 } } } ] if isinstance(topic_group_id,int) and topic_group_id > 0: functions_list.append( { "filter": {"term": { "group_id": topic_group_id}}, "weight": 1, } ) # query_tag_term_list = cls.___get_should_term_list(topic_tag_list) query_function_score = { "query":{ "bool":{ "must": [ {"range": {"content_level": {"gte": 3, "lte": 5}}} ], "must_not":{ "term":{ "id":topic_id } } } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } if filter_topic_user_id: query_function_score["query"]["bool"]["must"].append({"term": {"user_id": topic_user_id}}) if len(topic_tag_list)>0: query_function_score["query"]["bool"]["should"]={ "terms":{ "tag_list":topic_tag_list } } q["query"]["function_score"] = query_function_score q["_source"] = { "include":["id","group_id","user_id","_score"] } result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, offset=offset, size=size) return result_dict["hits"] except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_topic_tag_id_list(cls,topic_id): """ :remark 获取帖子标签列表 :param topic_id: :return: """ try: q = dict() q["query"] = { "term":{ "id": topic_id } } q["_source"] = { "include":[TopicDocumentField.TAG_LIST] } result_dict = ESPerform.get_search_results(ESPerform.get_cli(),sub_index_name="topic",query_body=q,size=1) tag_id_list = [] if len(result_dict["hits"])>0: tag_id_list = result_dict["hits"][0]["_source"][TopicDocumentField.TAG_LIST] return tag_id_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list() @classmethod def get_tag_aggregation_topic_id_list(cls,user_id,tag_id,offset,size): try: attention_user_id_list = list() pick_user_id_list = list() result_dict = TopicUtils.get_related_user_info(user_id, 0, 1) if len(result_dict["hits"]) == 0: logging.warning("not find user_id:%d in es!" % int(user_id)) else: attention_user_info_list = result_dict["hits"][0]["_source"]["attention_user_id_list"] attention_user_id_list = [item["user_id"] for item in attention_user_info_list] pick_user_info_list = result_dict["hits"][0]["_source"]["pick_user_id_list"] pick_user_id_list = [item["user_id"] for item in pick_user_info_list] functions_list = [ { "gauss": { "update_time": { "scale": "1d", "decay": 0.5 } } } ] if len(attention_user_id_list)>0: functions_list.append( { "filter": {"bool": { "should": {"terms":{"user_id":attention_user_id_list}}}}, "weight": 3, } ) if len(pick_user_id_list)>0: functions_list.append( { "filter": {"bool": { "should": {"terms":{"user_id":pick_user_id_list}}}}, "weight": 2 } ) query_function_score = { "query":{ "bool":{ "must": [ #{"range": {"content_level": {"gte": 3, "lte": 5}}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}}, {"term": {"tag_list":tag_id}} ], "must_not":[ {"terms": {"content_level": [1,2]}} ] } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } q = dict() q["query"] = { "function_score":query_function_score } q["_source"] = { "include":["id","group_id","user_id","_score","offline_score","manual_score"] } q["sort"] = [ { "_script":{ "type":"number", "script":{ "lang":"painless", "source":"_score+params._source.offline_score+params._source.manual_score" }, "order":"desc" } } ] result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, offset=offset, size=size) return result_dict["hits"] except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list()