topic.py 10.6 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import traceback
import json
from libs.es import ESPerform
from .common import TopicDocumentField


class TopicUtils(object):

    @classmethod
    def get_related_user_info(cls, user_id, offset=0, size=10):
        """
        :remark:获取指定用户相关用户列表
        :param user_id:
        :param offset:
        :param size:
        :return:
        """
        try:
            q = dict()
            q["query"] = {
                "term":{
                    "user_id": user_id
                }
            }

            q["_source"] = ["tag_list","attention_user_id_list", "pick_user_id_list", "same_group_user_id_list"]

            result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset, size)

            return result_dict
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return {"total_count":0,"hits":[]}


    @classmethod
    def analyze_related_user_id_list(cls,related_user_id_list):
        """
        :remark:获取指定用户关联的 用户列表
        :param related_user_id_list:
        :return:
        """
        try:
            chinese_user_id_list = list()
            japan_user_id_list = list()
            korea_user_id_list = list()

            for item in related_user_id_list:
                if item["country_id"] == 0:
                    chinese_user_id_list.append(item["user_id"])
                elif item["country_id"] == 1:
                    japan_user_id_list.append(item["user_id"])
                elif item["country_id"] == 2:
                    korea_user_id_list.append(item["user_id"])

            return (chinese_user_id_list,japan_user_id_list,korea_user_id_list)
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return ([],[],[])

    @classmethod
    def refresh_redis_hash_data(cls, redis_cli,redis_key,redis_data_dict):
        try:
            redis_cli.hmset(redis_key, redis_data_dict)
            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def ___get_should_term_list(cls,ori_list):
        try:
            should_term_list = list()
            for term_id in ori_list:
                term_dict = {
                    "term":{
                        "tag_list":{"value":term_id}
                    }
                }
                should_term_list.append(term_dict)
            return should_term_list
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return []

    @classmethod
    def get_recommend_topic_ids(cls,user_id,offset,size,query=None):
        """
        :需增加打散逻辑
        :remark:获取首页推荐帖子列表
        :param user_id:
        :param offset:
        :param size:
        :param is_first_time:
        :return:
        """
        try:
            attention_user_id_term_list = list()
            pick_user_id_term_list = list()
            same_group_user_id_term_list = list()
            user_tag_list = list()

            result_dict = TopicUtils.get_related_user_info(user_id, 0, 1)
            if len(result_dict["hits"]) == 0:
                logging.warning("not find user_id:%d in es!" % int(user_id))
            else:
                attention_user_info_list = result_dict["hits"][0]["_source"]["attention_user_id_list"]
                (attention_chinese_user_id_list, attention_japan_user_id_list,
                 attention_korea_user_id_list) = TopicUtils.analyze_related_user_id_list(
                    related_user_id_list=attention_user_info_list)

                pick_user_info_list = result_dict["hits"][0]["_source"]["pick_user_id_list"]
                (pick_chinese_user_id_list, pick_japan_user_id_list,
                 pick_korea_user_id_list) = TopicUtils.analyze_related_user_id_list(pick_user_info_list)

                same_group_user_info_list = result_dict["hits"][0]["_source"]["same_group_user_id_list"]
                (same_group_chinese_user_id_list, same_group_japan_user_id_list,
                 same_group_korea_user_id_list) = TopicUtils.analyze_related_user_id_list(same_group_user_info_list)

                user_tag_list = result_dict["hits"][0]["_source"]["tag_list"]

                attention_user_id_term_list = cls.___get_should_term_list(attention_chinese_user_id_list + attention_japan_user_id_list + attention_korea_user_id_list)
                pick_user_id_term_list = cls.___get_should_term_list(pick_chinese_user_id_list + pick_japan_user_id_list + pick_korea_user_id_list)
                same_group_user_id_term_list = cls.___get_should_term_list(same_group_chinese_user_id_list + same_group_japan_user_id_list + same_group_korea_user_id_list)

            q = dict()
            q["query"] = dict()

            functions_list = [
                {
                    "filter": {"bool": {
                        "should": attention_user_id_term_list}},
                    "weight": 5,
                },
                {
                    "filter": {"bool": {
                        "should": pick_user_id_term_list}},
                    "weight": 3
                },
                {
                    "filter": {"bool": {
                        "should": same_group_user_id_term_list}},
                    "weight": 2
                },
                {
                    "gauss": {
                        "update_time": {
                            "scale": "1d",
                            "decay": 0.5
                        }
                    }
                }
            ]

            query_tag_term_list = cls.___get_should_term_list(user_tag_list)
            query_function_score = {
                "query": {
                    "bool": {
                        "should": query_tag_term_list,
                        "must": {
                            "range": {"content_level": {"gte": 3, "lte": 5}}
                        }
                    }
                },
                "score_mode": "sum",
                "boost_mode": "sum",
                "functions": functions_list
            }

            if query is not None:#搜索帖子
                multi_fields = {
                    'description': 2,
                    'name': 4,
                }
                query_fields = ['^'.join((k, str(v))) for (k, v) in multi_fields.items()]
                multi_match = {
                    'query': query,
                    'type': 'cross_fields',
                    'operator': 'and',
                    'fields': query_fields,
                }
                query_function_score["query"]["bool"]["should"].append({'multi_match': multi_match})

            q["query"]["function_score"] = query_function_score
            q["_source"] = {
                "include":["id","group_id"]
            }

            result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q,
                                                       offset=offset, size=size)

            if len(result_dict["hits"])>0:
                return [item["_source"] for item in result_dict["hits"]]
            else:
                return []
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return []

    @classmethod
    def get_topic_detail_recommend_list(cls,user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,offset,size):
        """
        :remark 帖子详情页推荐列表,缺少按时间衰减
        :param user_id:
        :param topic_tag_list:
        :param topic_group_id:
        :param topic_user_id:
        :param offset:
        :param size:
        :return:
        """
        try:
            q = dict()
            q["query"] = dict()

            functions_list = [
                {
                    "filter": {"term": {
                        "user_id": topic_user_id}},
                    "weight": 1000
                },
                {
                     "gauss": {
                         "update_time": {
                             "scale": "1d",
                             "decay": 0.5
                         }
                     }
                 }
            ]
            if isinstance(topic_group_id,int) and topic_group_id > 0:
                functions_list.append(
                    {
                        "filter": {"term": {
                            "group_id": topic_group_id}},
                        "weight": 1,
                    }
                )

            query_tag_term_list = cls.___get_should_term_list(topic_tag_list)
            query_function_score = {
                "query":{
                    "bool":{
                        "should": query_tag_term_list,
                        "must": {
                            "range": {"content_level": {"gte": 3, "lte": 5}}
                        },
                        "must_not":{
                            "term":{
                                "id":topic_id
                            }
                        }
                    }
                },
                "score_mode": "sum",
                "boost_mode": "sum",
                "functions": functions_list
            }
            q["query"]["function_score"] = query_function_score
            q["_source"] = {
                "include":["id","group_id","user_id","_score"]
            }

            result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q,
                                                       offset=offset, size=size)

            return result_dict["hits"]
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return []

    @classmethod
    def get_topic_tag_id_list(cls,topic_id):
        """
        :remark 获取帖子标签列表
        :param topic_id:
        :return:
        """
        try:
            q = dict()
            q["query"] = {
                "term":{
                    "id": topic_id
                }
            }
            q["_source"] = {
                "include":[TopicDocumentField.TAG_LIST]
            }

            result_dict = ESPerform.get_search_results(ESPerform.get_cli(),sub_index_name="topic",query_body=q,size=1)

            tag_id_list = []
            if len(result_dict["hits"])>0:
                tag_id_list = result_dict["hits"][0]["_source"][TopicDocumentField.TAG_LIST]

            return tag_id_list
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return list()