#!/usr/bin/env python # -*- coding: utf-8 -*- import logging import traceback import json import time import copy from libs.es import ESPerform class UserUtils(object): @classmethod def get_batch_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None): """ :remark 批量用户 关注的 用户列表 :param user_id_list: :param self_user_id: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() user_id_list.append(self_user_id) q = dict() q["query"] = { "terms":{ "user_id":user_id_list } } q["_source"] = { "includes":["attention_user_id_list","user_id"] } result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list)) self_attention_user_id_list = [] attention_user_dict_list = list() ret_attention_user_id_list = list() for hit_item in result_dict["hits"]: attention_user_dict = dict() user_id = hit_item["_source"]["user_id"] if user_id == self_user_id: self_attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]] else: attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]] attention_user_dict[user_id] = attention_user_id_list attention_user_dict_list.append(attention_user_dict) ret_attention_user_id_list.append(user_id) return [self_attention_user_id_list,attention_user_dict_list,ret_attention_user_id_list] except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return ([],[],[]) @classmethod def get_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None): """ :remark 获取指定用户列表 关注的 用户列表 :param user_id: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() q = dict() q["query"] = { "terms":{ "user_id":user_id_list } } q["_source"] = { "includes":["attention_user_id_list","user_id"] } result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list)) self_attention_user_id_list = [] recursion_attention_user_id_list = [] for hit_item in result_dict["hits"]: if hit_item["_source"]["user_id"] == self_user_id: self_attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]] else: recursion_attention_user_id_list = [item["user_id"] for item in hit_item["_source"]["attention_user_id_list"]] return (self_attention_user_id_list,recursion_attention_user_id_list) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return ([],[]) @classmethod def ___get_should_term_list(cls,ori_list,field_name="tag_list"): try: should_term_list = list() for term_id in ori_list: term_dict = { "term":{ field_name:{"value":term_id} } } should_term_list.append(term_dict) return should_term_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_recommend_user_list(cls,self_attention_user_id_list,recursion_attention_user_id_list,offset,size,es_cli_obj=None): """ :remark 获取推荐用户列表 :param attention_user_id_list: :param recursion_attention_user_id_list: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() q = dict() q["query"] = dict() # recursion_attention_user_list = cls.___get_should_term_list(recursion_attention_user_id_list,field_name="user_id") functions_list = [ { "gauss": { "latest_topic_time_val": { "origin": int(time.time()), "scale": "600", "decay": 0.1 } } } ] if len(recursion_attention_user_id_list) > 0: functions_list.append( { "filter":{ "bool":{ "should":{"terms":{"user_id":recursion_attention_user_id_list}} } }, "weight":10 } ) query_function_score = { "query": { "bool": { "must": [ {"term": {"is_recommend": True}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}}, {"term": {"is_shadow": False}} ], "must_not":{ "terms":{ "user_id":self_attention_user_id_list } } } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } q["query"]["function_score"] = query_function_score q["_source"] = { "includes":["user_id"] } result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=q, offset=offset, size=size) recommend_user_list = list() for item in result_dict["hits"]: recommend_user_list.append(item["_source"]["user_id"]) return recommend_user_list except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return [] @classmethod def get_batch_recommend_user_dict(cls,need_filter_attention_user_id_list,attention_user_id_list,attention_user_dict_list,self_user_id,offset,size,es_cli_obj=None): """ :remark 获取批量推荐用户 :param need_filter_attention_user_id_list: :param attention_user_dict: :param self_user_id: :param offset: :param size: :return: """ try: if not es_cli_obj: es_cli_obj = ESPerform.get_cli() batch_query_list = list() for interesting_user_item_dict in attention_user_dict_list: for interesting_user_id in interesting_user_item_dict: if interesting_user_id != self_user_id: logging.info("duan add,interesting_user_id:%d" % interesting_user_id) filter_user_id_list = copy.deepcopy(need_filter_attention_user_id_list) filter_user_id_list.append(interesting_user_id) q = dict() q["from"] = offset q["size"] = size q["query"] = dict() functions_list = [ { "gauss": { "latest_topic_time_val": { "origin": int(time.time()), "scale": "600", "decay": 0.1 } } } ] if len(interesting_user_item_dict[interesting_user_id]) > 0: functions_list.append( { "filter":{ "bool":{ "should":{"terms":{"user_id":interesting_user_item_dict[interesting_user_id]}} } }, "weight":10 } ) query_function_score = { "query": { "bool": { "must": [ {"term": {"is_recommend": True}}, {"term": {"is_online": True}}, {"term": {"is_deleted": False}}, {"term": {"is_shadow": False}} ], "must_not":{ "terms":{ "user_id":filter_user_id_list } } } }, "score_mode": "sum", "boost_mode": "sum", "functions": functions_list } q["query"]["function_score"] = query_function_score q["_source"] = { "includes":["user_id"] } batch_query_list.append(q) index_name = ESPerform.get_official_index_name("user","read") search_header_dict = {'index': index_name, 'type': "_doc"} query_body = "" for query_item in batch_query_list: query_body += "{}\n{}\n".format(json.dumps(search_header_dict),json.dumps(query_item)) ret_dict = dict() if len(query_body)>0: result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=query_body, batch_search=True) user_index = 0 for res_item in result_dict["responses"]: recommend_user_list = list() for item in res_item["hits"]["hits"]: recommend_user_list.append(item["_source"]["user_id"]) logging.info("duan add,attention_user_id_list:%s,user_index:%d" % (str(attention_user_id_list), user_index)) ret_dict[str(attention_user_id_list[user_index])] = recommend_user_list user_index += 1 return ret_dict except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return dict()