#!/usr/bin/env python # -*- coding: utf-8 -*- import logging import traceback import json from alpha_types.venus import TOPIC_SEARCH_SORT from libs.es import ESPerform from .common import TopicDocumentField from search.utils.common import * class TopicUtils(object): @classmethod def get_related_user_info(cls, user_id, offset=0, size=10): """ :remark:获取指定用户相关用户列表 :param user_id: :param offset: :param size: :return: """ try: q = dict() q["query"] = { "term": { "user_id": user_id } } q["_source"] = { "include": ["tag_list", "attention_user_id_list", "pick_user_id_list", "same_pictorial_user_id_list"] } result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset, size) return result_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return {"total_count": 0, "hits": []} @classmethod def analyze_related_user_id_list(cls, related_user_id_list): """ :remark:获取指定用户关联的 用户列表 :param related_user_id_list: :return: """ try: chinese_user_id_list = list() japan_user_id_list = list() korea_user_id_list = list() for item in related_user_id_list: if item["country_id"] == 0: chinese_user_id_list.append(item["user_id"]) elif item["country_id"] == 1: japan_user_id_list.append(item["user_id"]) elif item["country_id"] == 2: korea_user_id_list.append(item["user_id"]) return (chinese_user_id_list, japan_user_id_list, korea_user_id_list) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return ([], [], []) @classmethod def refresh_redis_hash_data(cls, redis_cli, redis_key, redis_data_dict): try: redis_cli.hmset(redis_key, redis_data_dict) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def ___get_should_term_list(cls, ori_list, field_name="tag_list"): try: should_term_list = list() for term_id in ori_list: term_dict = { "term": { field_name: {"value": term_id} } } should_term_list.append(term_dict) return should_term_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_topic_tag_info(cls, offset, size, topic_id_list, user_id): try: q = { "query": { "terms": { "id": topic_id_list } }, "_source": { "includes": ["id", "pictorial_id", "offline_score", "user_id", "edit_tag_list"] } } result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, offset=offset, size=size) topic_id_dict = dict() for item in result_dict["hits"]: if "edit_tag_list" in item["_source"]: topic_id_dict[str(item["_source"]["id"])] = item["_source"]["edit_tag_list"] else: topic_id_dict[str(item["_source"]["id"])] = list() return topic_id_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return {} @classmethod def get_recommend_topic_ids(cls,user_id,tag_id,offset,size,single_size,query=None,query_type=TopicPageType.FIND_PAGE, filter_topic_id_list=[],test_score=False,must_topic_id_list=[],recommend_tag_list=[], user_similar_score_list=[],index_type="topic",routing=None,attention_tag_list=[],linucb_user_id_list = []): """ :remark:获取首页推荐帖子列表 :param user_id: :param offset: :param size: :param is_first_time: :return: """ try: attention_user_id_list = list() # pick_user_id_list = list() # same_group_id_list = list() user_tag_list = list() result_dict = TopicUtils.get_related_user_info(user_id, 0, 1) if len(result_dict["hits"]) == 0: logging.warning("not find user_id:%d in es!" % int(user_id)) else: attention_user_info_list = result_dict["hits"][0]["_source"]["attention_user_id_list"] attention_user_id_list = [item["user_id"] for item in attention_user_info_list] # pick_user_info_list = result_dict["hits"][0]["_source"]["pick_user_id_list"] # pick_user_id_list = [item["user_id"] for item in pick_user_info_list] # same_pictorial_user_info_list = result_dict["hits"][0]["_source"]["same_pictorial_user_id_list"] # # same_pictorial_id_list = [item["user_id"] for item in same_pictorial_user_info_list] # same_pictorial_id_list = same_pictorial_id_list[:100] user_tag_list = result_dict["hits"][0]["_source"]["tag_list"] q = dict() q["query"] = dict() functions_list = [ { "filter": { "term": { "language_type": 1 } }, "weight": 60 }, { "gauss": { "create_time": { "scale": "1d", "decay": 0.99 } }, "weight": 60 }, { "filter": { "constant_score":{ "filter":{ "term": { "content_level": 6 } } } }, "weight": 600 } ] # if len(user_similar_score_list) > 0: # for item in user_similar_score_list[:100]: # score_item = 2 + item[1] # functions_list.append( # { # "filter": {"bool": { # "should": {"term": {"user_id": item[0]}}}}, # "weight": score_item, # } # ) if len(attention_user_id_list) > 0: functions_list.append( { "filter": {"bool": { "should": {"terms": {"user_id": attention_user_id_list}}}}, "weight": 30, } ) if len(attention_tag_list) > 0: functions_list.append( { "filter": {"bool": { "should": {"terms": {"tag_list": attention_tag_list}}}}, "weight": 100 } ) query_function_score = { "query": { "bool": { "filter": [ {"range": {"content_level": {"gte": 4, "lte": 6}}}, # {"term": {"has_image":True}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}} ], "should": [ { "bool": { "must": [ {"term": {"has_image": True}}, {"term": {"has_video": False}} ] } }, { "bool": { "must": { "term": {"has_video": True} } } } ], "minimum_should_match": 1 } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } if len(must_topic_id_list) > 0: query_function_score["query"]["bool"]["must"] = { "terms": { "id": must_topic_id_list } } if len(filter_topic_id_list) > 0: query_function_score["query"]["bool"]["must_not"] = [ {"terms":{"id":filter_topic_id_list}} ] if "must_not" in query_function_score["query"]["bool"]: query_function_score["query"]["bool"]["must_not"] += [ {"terms": {"user_id": linucb_user_id_list}} ] else: query_function_score["query"]["bool"]["must_not"] = [ {"term": {"user_id": linucb_user_id_list}} ] if query is not None: # 搜索帖子 multi_fields = { 'description': 200, 'content': 300, 'name': 400, 'tag_name_list': 300, } query_fields = ['^'.join((k, str(v))) for (k, v) in multi_fields.items()] multi_match = { 'query': query, 'type': 'cross_fields', 'operator': 'and', 'fields': query_fields, } query_function_score["query"]["bool"]["should"] = [ {'multi_match': multi_match}, {"term": {"tag_list": tag_id}} ] query_function_score["query"]["bool"]["minimum_should_match"] = 1 else: if "must_not" in query_function_score["query"]["bool"]: query_function_score["query"]["bool"]["must_not"] += [ {"term": {"is_operation_home_recommend": True}} ] else: query_function_score["query"]["bool"]["must_not"] = [ {"term": {"is_operation_home_recommend": True}} ] q["query"]["function_score"] = query_function_score q["collapse"] = { "field": "user_id" } # "includes": ["id", "pictorial_id", "offline_score", "user_id", "edit_tag_list"] q["_source"] = { "includes": ["id"] } q["sort"] = [ # { # "_script": { # "type": "number", # "script": { # "lang": "expression", # "source": "_score+doc['offline_score']" # # "lang":"painless", # # "source":"_score+params._source.offline_score" # }, # "order": "desc" # } # }, { "_score": { "order": "desc" } }, { "offline_score":{ "order": "desc" } } ] result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name=index_type, query_body=q, offset=offset, size=size,routing=routing) topic_id_list = list() for item in result_dict["hits"]: topic_id_list.append(item["_source"]["id"]) return topic_id_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list() @classmethod def get_topic_detail_recommend_list(cls, user_id, topic_id, topic_tag_list, topic_pictorial_id, topic_user_id, filter_topic_user_id, have_read_topic_list, offset, size, es_cli_obj=None,index_type="topic",routing=None): """ :remark 帖子详情页推荐列表,缺少按时间衰减 :param user_id: :param topic_tag_list: :param topic_group_id: :param topic_user_id: :param offset: :param size: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() q = dict() q["query"] = dict() functions_list = [ { "filter": {"term": { "user_id": topic_user_id}}, "weight": 1000 }, { "linear": { "create_time": { "scale": "1d", "decay": 0.5 } } } ] if isinstance(topic_pictorial_id, int) and topic_pictorial_id > 0: functions_list.append( { "filter": {"term": { "pictorial_id": topic_pictorial_id}}, "weight": 1, } ) have_read_topic_list.append(topic_id) query_function_score = { "query": { "bool": { "must": [ {"range": {"content_level": {"gte": 4, "lte": 5}}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}} ], "must_not": { "terms": { "id": have_read_topic_list } } } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } if filter_topic_user_id: query_function_score["query"]["bool"]["must"].append({"term": {"user_id": topic_user_id}}) if len(topic_tag_list) > 0: query_function_score["query"]["bool"]["should"] = { "terms": { "tag_list": topic_tag_list } } q["query"]["function_score"] = query_function_score q["_source"] = { "includes": ["id", "pictorial_id", "user_id", "_score"] } result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name=index_type, query_body=q, offset=offset, size=size,routing=routing) return result_dict["hits"] except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def top_get_topic_detail_recommend_list(cls, user_id, topic_id,have_read_topic_list, size, es_cli_obj=None, index_type="topic", routing=None,collection_topic_tag_list = [],topic_tag_list = [], topic_user_id =-1): """ :remark 帖子详情页推荐列表,缺少按时间衰减 :param user_id: :param topic_tag_list: :param topic_group_id: :param topic_user_id: :param offset: :param size: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() q = dict() q["query"] = dict() # logging.warning("topic_tag_list:%s"%str(topic_tag_list)) functions_list = [ { "linear": { "create_time": { "scale": "1d", "decay": 0.5 } } } ] # if len(topic_tag_list) > 0: # functions_list.append( # { # "filter": {"bool": { # "should": {"terms": {"tag_list": topic_tag_list}}}}, # "weight": 5000 # } # ) # if topic_user_id != -1: # functions_list.append( # { # "filter": {"bool": { # "should": {"term": {"user_id": topic_user_id}}}}, # "weight": 5000 # } # ) if len(topic_tag_list) != 0 or topic_user_id!= -1: query_function_score = { "query": { "bool": { "must": [ {"range": {"content_level": {"gte": 3, "lte": 6}}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}} ], "must_not": { "terms": { "id": have_read_topic_list } } } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } else: query_function_score = { "query": { "bool": { "must": [ {"range": {"content_level": {"gte": 4, "lte": 6}}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}} ], "must_not": { "terms": { "id": have_read_topic_list } } } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } if len(topic_tag_list) > 0: query_function_score["query"]["bool"]["filter"] = { "terms": { "tag_list": topic_tag_list } } if topic_user_id != -1: query_function_score["query"]["bool"]["filter"] = { "term": { "user_id": topic_user_id } } q["query"]["function_score"] = query_function_score if topic_user_id == -1: q["collapse"] = { "field": "user_id" } q["_source"] = { "includes": ["id", "pictorial_id", "user_id", "_score"] } # "includes": ["id", "pictorial_id", "user_id", "_score", "create_time", "content_level"] # q['sort'] = [ # {"content_level": {"order": "desc"}}, # {"create_time": {"order": "desc"}} # ] result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name=index_type, query_body=q, size=size, routing=routing) topic_id_list = list() for item in result_dict["hits"]: topic_id_list.append(item["_source"]["id"]) return topic_id_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_topic_tag_id_list(cls, topic_id, es_cli_obj=None): """ :remark 获取帖子标签列表 :param topic_id: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() q = dict() q["query"] = { "term": { "id": topic_id } } q["_source"] = { "includes": [TopicDocumentField.TAG_LIST] } result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="topic", query_body=q, size=1) tag_id_list = [] if len(result_dict["hits"]) > 0: tag_id_list = result_dict["hits"][0]["_source"][TopicDocumentField.TAG_LIST] return tag_id_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list() @classmethod def get_tag_aggregation_topic_id_list(cls, user_id, tag_id, offset, size): try: attention_user_id_list = list() pick_user_id_list = list() result_dict = TopicUtils.get_related_user_info(user_id, 0, 1) if len(result_dict["hits"]) == 0: logging.warning("not find user_id:%d in es!" % int(user_id)) else: attention_user_info_list = result_dict["hits"][0]["_source"]["attention_user_id_list"] attention_user_id_list = [item["user_id"] for item in attention_user_info_list] pick_user_info_list = result_dict["hits"][0]["_source"]["pick_user_id_list"] pick_user_id_list = [item["user_id"] for item in pick_user_info_list] functions_list = [ { "linear": { "create_time": { "scale": "1d", "decay": 0.5 } } } ] if len(attention_user_id_list) > 0: functions_list.append( { "filter": {"bool": { "should": {"terms": {"user_id": attention_user_id_list}}}}, "weight": 3, } ) if len(pick_user_id_list) > 0: functions_list.append( { "filter": {"bool": { "should": {"terms": {"user_id": pick_user_id_list}}}}, "weight": 2 } ) query_function_score = { "query": { "bool": { "must": [ # {"range": {"content_level": {"gte": 3, "lte": 5}}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}}, {"term": {"tag_list": tag_id}} ], "must_not": [ {"terms": {"content_level": [1, 2]}} ] } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } q = dict() q["query"] = { "function_score": query_function_score } q["_source"] = { "includes": ["id", "pictorial_id", "user_id", "_score", "offline_score", "manual_score"] } q["sort"] = [ { "_script": { "type": "number", "script": { "lang": "expression", "source": "_score+doc['offline_score']+doc['manual_score']" # "lang":"painless", # "source":"_score+params._source.offline_score+params._source.manual_score" }, "order": "desc" } } ] result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, offset=offset, size=size) return result_dict["hits"] except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return list() @classmethod def process_filters(cls, filters, filter_online=True): """处理过滤器部分。""" f = [ {"term": {"is_deleted": False}}, ] logging.info("get filters:%s"%filters) if not filters: return f for k, v in filters.items(): if k == "is_complaint": f.append({ "term": {k: v}, }) if k == "is_complaint": f.append({ "term": {k: v}, }) if v in (None, '', []): continue if k in ["create_time_gte", "create_time_lte"]: if k == "create_time_gte": op = "gte" elif k == "create_time_lte": op = "lte" f.append({ "range": { "create_time_val": { op: v, } } }) elif k in ["id_gte", "id_lte"]: if k == "id_gte": op = "gte" elif k == "id_lte": op = "lte" f.append({ "range": { "id": { op: v, } } }) elif k.endswith("__exclude"): filed = k[:-5] op = "lt" f.append({ "range": { filed: { op: v, } } }) op = "gt" f.append({ "range": { filed: { op: v, } } }) elif k.endswith("__gte") or k.endswith("__lte") or k.endswith("__gt") or k.endswith("__lt"): if k.endswith("__gte"): op = "gte" filed = k[:-5] elif k.endswith("__lte"): op = "lte" filed = k[:-5] elif k.endswith("__gt"): op = "gt" filed = k[:-4] elif k.endswith("__lt"): op = "lt" filed = k[:-4] f.append({ "range": { filed: { op: v, } } }) else: if isinstance(v, list): f.append({ "terms": {k: v}, }) else: f.append({ "term": {k: v}, }) if filter_online: f.append({"term": {"is_online": True}}) return f @classmethod def process_nfilters(cls, nfilters): """处理过滤器部分。""" nf = [] if not nfilters: return nf for k, v in nfilters.items(): pass return nf @classmethod def process_sort(cls, sorts_by): """处理排序部分。""" sort_rule = [] if isinstance(sorts_by, int): if sorts_by == TOPIC_SEARCH_SORT.VOTE_NUM: sort_rule.append({ "vote_num": { "order": "desc" }, "update_time": { "order": "desc" }, }) elif isinstance(sorts_by, list): for sort_by in sorts_by: if sort_by == TOPIC_SEARCH_SORT.ID_AEC: sort_rule.append({ "id": { "order": "asc" }, }) elif sort_by == TOPIC_SEARCH_SORT.ID_DESC: sort_rule.append({ "id": { "order": "desc" }, }) elif sort_by == TOPIC_SEARCH_SORT.SCORE_AEC: sort_rule.append({ "sort_score": { "order": "asc" }, }) elif sort_by == TOPIC_SEARCH_SORT.SCORE_DESC: sort_rule.append({ "sort_score": { "order": "desc" }, }) elif sort_by == TOPIC_SEARCH_SORT.VOTE_NUM_AEC: sort_rule.append({ "total_vote_num": { "order": "asc" }, }) elif sort_by == TOPIC_SEARCH_SORT.VOTE_NUM_DESC: sort_rule.append({ "total_vote_num": { "order": "desc" }, }) return sort_rule @classmethod def list_topic_ids(cls, filters, nfilters, sorts_by, offset=0, size=10, index_name="topic", filter_online=True): must = cls.process_filters(filters, filter_online=filter_online) q = { "query": { "bool": { "must": must, "must_not": cls.process_nfilters(nfilters), } } } if sorts_by: sorts = cls.process_sort(sorts_by) if sorts: q["sort"] = sorts try: result_dict = ESPerform.get_search_results( ESPerform.get_cli(), sub_index_name=index_name, query_body=q, offset=offset, size=size ) return { "hits": result_dict["hits"], "total_count": result_dict["total_count"] } except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return { "hits": [], "total_count": 0 } @classmethod def business_topic_ids(cls, filters, nfilters, sorts_by, offset=0, size=10, index_name="topic", filter_online=True): must = cls.process_filters(filters, filter_online=filter_online) query = '' for k, v in filters.items(): if k == "content": query = filters[k] q = {} q["query"] = { "function_score": { "functions": [{ "filter": { "bool": { "must": must, "must_not": cls.process_nfilters(nfilters), } }, "weight": 1 }], "query": { "multi_match": { "fields":["content"], "type": "cross_fields", "operator": "and", "query": query } } } } if query == '': q["query"] = { "bool": { "must": must, "must_not": cls.process_nfilters(nfilters), } } if sorts_by: sorts = cls.process_sort(sorts_by) if sorts: q["sort"] = sorts try: result_dict = ESPerform.get_search_results( ESPerform.get_cli(), sub_index_name=index_name, query_body=q, offset=offset, size=size ) return { "hits": result_dict["hits"], "total_count": result_dict["total_count"] } except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return { "hits": [], "total_count": 0 }