user.py 11.2 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import traceback
import json
import time
import copy
from libs.es import ESPerform


class UserUtils(object):
    @classmethod
    def get_batch_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None):
        """
        :remark 批量用户 关注的 用户列表
        :param user_id_list:
        :param self_user_id:
        :return:
        """
        try:
            if not es_cli_obj:
                es_cli_obj = ESPerform.get_cli()

            user_id_list.append(self_user_id)

            q = dict()
            q["query"] = {
                "terms":{
                    "user_id":user_id_list
                }
            }
            q["_source"] = {
                "includes":["attention_user_id_list","user_id"]
            }

            result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list))

            self_attention_user_id_list = []
            attention_user_dict_list = list()
            ret_attention_user_id_list = list()
            for hit_item in result_dict["hits"]:

                attention_user_dict = dict()
                user_id = hit_item["_source"]["user_id"]

                if user_id == self_user_id:
                    self_attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]]
                else:
                    attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]]

                    attention_user_dict[user_id] = attention_user_id_list
                    attention_user_dict_list.append(attention_user_dict)
                    ret_attention_user_id_list.append(user_id)

            return [self_attention_user_id_list,attention_user_dict_list,ret_attention_user_id_list]
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return ([],[],[])


    @classmethod
    def get_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None):
        """
        :remark 获取指定用户列表 关注的 用户列表
        :param user_id:
        :return:
        """
        try:
            if not es_cli_obj:
                es_cli_obj = ESPerform.get_cli()


            q = dict()
            q["query"] = {
                "terms":{
                    "user_id":user_id_list
                }
            }
            q["_source"] = {
                "includes":["attention_user_id_list","user_id"]
            }

            result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list))

            self_attention_user_id_list = []
            recursion_attention_user_id_list = []
            for hit_item in result_dict["hits"]:
                if hit_item["_source"]["user_id"] == self_user_id:
                    self_attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]]
                else:
                    recursion_attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]]

            return (self_attention_user_id_list,recursion_attention_user_id_list)
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return ([],[])

    @classmethod
    def ___get_should_term_list(cls,ori_list,field_name="tag_list"):
        try:
            should_term_list = list()
            for term_id in ori_list:
                term_dict = {
                    "term":{
                        field_name:{"value":term_id}
                    }
                }
                should_term_list.append(term_dict)
            return should_term_list
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return []

    @classmethod
    def get_recommend_user_list(cls,self_attention_user_id_list,recursion_attention_user_id_list,offset,size,es_cli_obj=None):
        """
        :remark 获取推荐用户列表
        :param attention_user_id_list:
        :param recursion_attention_user_id_list:
        :return:
        """
        try:
            if not es_cli_obj:
                es_cli_obj = ESPerform.get_cli()

            q = dict()
            q["query"] = dict()

            # recursion_attention_user_list = cls.___get_should_term_list(recursion_attention_user_id_list,field_name="user_id")

            functions_list = [
                {
                    "gauss": {
                        "latest_topic_time_val": {
                            "origin": int(time.time()),
                            "scale": "600",
                            "decay": 0.1
                        }
                    }
                }
            ]

            if len(recursion_attention_user_id_list) > 0:
                functions_list.append(
                    {
                        "filter":{
                            "bool":{
                                "should":{"terms":{"user_id":recursion_attention_user_id_list}}
                            }
                        },
                        "weight":10
                    }
                )

            query_function_score = {
                "query": {
                    "bool": {
                        "must": [
                            {"term": {"is_recommend": True}},
                            {"term": {"is_online": True}},
                            {"term": {"is_deleted": False}},
                            {"term": {"is_shadow": False}}
                        ],
                        "must_not":{
                            "terms":{
                                "user_id":self_attention_user_id_list
                            }
                        }
                    }
                },
                "score_mode": "sum",
                "boost_mode": "sum",
                "functions": functions_list
            }
            q["query"]["function_score"] = query_function_score
            q["_source"] = {
                "includes":["user_id"]
            }
            result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=q,
                                                       offset=offset, size=size)

            recommend_user_list = list()
            for item in result_dict["hits"]:
                recommend_user_list.append(item["_source"]["user_id"])

            return recommend_user_list
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return []


    @classmethod
    def get_batch_recommend_user_dict(cls,need_filter_attention_user_id_list,attention_user_id_list,attention_user_dict_list,self_user_id,offset,size,es_cli_obj=None):
        """
        :remark 获取批量推荐用户
        :param need_filter_attention_user_id_list:
        :param attention_user_dict:
        :param self_user_id:
        :param offset:
        :param size:
        :return:
        """
        try:
            if not es_cli_obj:
                es_cli_obj = ESPerform.get_cli()

            batch_query_list = list()
            for interesting_user_item_dict in attention_user_dict_list:
                for interesting_user_id in interesting_user_item_dict:
                    if interesting_user_id != self_user_id:
                        logging.info("duan add,interesting_user_id:%d" % interesting_user_id)
                        filter_user_id_list = copy.deepcopy(need_filter_attention_user_id_list)
                        filter_user_id_list.append(interesting_user_id)

                        q = dict()
                        q["from"] = offset
                        q["size"] = size
                        q["query"] = dict()

                        functions_list = [
                            {
                                "gauss": {
                                    "latest_topic_time_val": {
                                        "origin": int(time.time()),
                                        "scale": "600",
                                        "decay": 0.1
                                    }
                                }
                            }
                        ]

                        if len(interesting_user_item_dict[interesting_user_id]) > 0:
                            functions_list.append(
                                {
                                    "filter":{
                                        "bool":{
                                            "should":{"terms":{"user_id":interesting_user_item_dict[interesting_user_id]}}
                                        }
                                    },
                                    "weight":10
                                }
                            )

                        query_function_score = {
                            "query": {
                                "bool": {
                                    "must": [
                                        {"term": {"is_recommend": True}},
                                        {"term": {"is_online": True}},
                                        {"term": {"is_deleted": False}},
                                        {"term": {"is_shadow": False}}
                                    ],
                                    "must_not":{
                                        "terms":{
                                            "user_id":filter_user_id_list
                                        }
                                    }
                                }
                            },
                            "score_mode": "sum",
                            "boost_mode": "sum",
                            "functions": functions_list
                        }
                        q["query"]["function_score"] = query_function_score
                        q["_source"] = {
                            "includes":["user_id"]
                        }

                        batch_query_list.append(q)

            index_name = ESPerform.get_official_index_name("user","read")
            search_header_dict = {'index': index_name, 'type': "_doc"}
            query_body = ""
            for query_item in batch_query_list:
                query_body += "{}\n{}\n".format(json.dumps(search_header_dict),json.dumps(query_item))

            result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=query_body,
                                                       batch_search=True)
            ret_dict = dict()
            user_index = 0
            for res_item in result_dict["responses"]:
                recommend_user_list = list()
                for item in res_item["hits"]["hits"]:
                    recommend_user_list.append(item["_source"]["user_id"])

                logging.info("duan add,attention_user_id_list:%s,user_index:%d" % (str(attention_user_id_list), user_index))
                ret_dict[str(attention_user_id_list[user_index])] = recommend_user_list
                user_index += 1


            logging.info("duan add,ret_dict:%s" % str(ret_dict))
            return ret_dict
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return dict()