Commit d48972a4 authored by 段英荣's avatar 段英荣

Merge branch 'duanyr' into 'master'

modify es connection mem leak

See merge request alpha/physical!73
parents 07223a5b ac2a1e01
...@@ -21,7 +21,11 @@ class ESPerform(object): ...@@ -21,7 +21,11 @@ class ESPerform(object):
@classmethod @classmethod
def get_cli(cls): def get_cli(cls):
try: try:
cls.cli_obj = Elasticsearch(cls.cli_info_list) init_args = {
'sniff_on_start': False,
'sniff_on_connection_fail': False,
}
cls.cli_obj = Elasticsearch(hosts=cls.cli_info_list, **init_args)
return cls.cli_obj return cls.cli_obj
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
...@@ -47,8 +47,11 @@ class GroupUtils(object): ...@@ -47,8 +47,11 @@ class GroupUtils(object):
return {"total_count":0, "hits":[]} return {"total_count":0, "hits":[]}
@classmethod @classmethod
def get_hot_group_recommend_result_list(cls,offset,size): def get_hot_group_recommend_result_list(cls,offset,size,es_cli_obj=None):
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["query"] = { q["query"] = {
"bool":{ "bool":{
...@@ -65,7 +68,7 @@ class GroupUtils(object): ...@@ -65,7 +68,7 @@ class GroupUtils(object):
"include":["id"] "include":["id"]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),"group",q,offset,size) result_dict = ESPerform.get_search_results(es_cli_obj,"group",q,offset,size)
group_ids_list = [] group_ids_list = []
if len(result_dict["hits"]) > 0: if len(result_dict["hits"]) > 0:
...@@ -77,12 +80,15 @@ class GroupUtils(object): ...@@ -77,12 +80,15 @@ class GroupUtils(object):
return [] return []
@classmethod @classmethod
def get_user_attention_group_list(cls,user_id,offset=0,size=10): def get_user_attention_group_list(cls,user_id,offset=0,size=10,es_cli_obj=None):
""" """
:remark: 获取用户关注小组列表 :remark: 获取用户关注小组列表
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["query"] = { q["query"] = {
"bool":{ "bool":{
...@@ -97,7 +103,7 @@ class GroupUtils(object): ...@@ -97,7 +103,7 @@ class GroupUtils(object):
"include":["attention_group_id_list"] "include":["attention_group_id_list"]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),"user",q,offset,size) result_dict = ESPerform.get_search_results(es_cli_obj,"user",q,offset,size)
if len(result_dict["hits"])>0: if len(result_dict["hits"])>0:
return result_dict["hits"][0]["_source"]["attention_group_id_list"] return result_dict["hits"][0]["_source"]["attention_group_id_list"]
else: else:
...@@ -107,13 +113,16 @@ class GroupUtils(object): ...@@ -107,13 +113,16 @@ class GroupUtils(object):
return [] return []
@classmethod @classmethod
def get_group_ids_by_aggs(cls,group_id_list): def get_group_ids_by_aggs(cls,group_id_list,es_cli_obj=None):
""" """
:remark:聚合查询获取小组列表 :remark:聚合查询获取小组列表
:param group_id_list: :param group_id_list:
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["size"]=0 q["size"]=0
q["query"] = { q["query"] = {
...@@ -136,7 +145,7 @@ class GroupUtils(object): ...@@ -136,7 +145,7 @@ class GroupUtils(object):
} }
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),"topic",q,aggregations_query=True) result_dict = ESPerform.get_search_results(es_cli_obj,"topic",q,aggregations_query=True)
buckets_list = result_dict["aggregations"]["group_ids"]["buckets"] buckets_list = result_dict["aggregations"]["group_ids"]["buckets"]
sorted_buckets_list = sorted(buckets_list,key=lambda item:item["max_date"]["value"],reverse=True) sorted_buckets_list = sorted(buckets_list,key=lambda item:item["max_date"]["value"],reverse=True)
......
...@@ -247,7 +247,7 @@ class TopicUtils(object): ...@@ -247,7 +247,7 @@ class TopicUtils(object):
return [] return []
@classmethod @classmethod
def get_topic_detail_recommend_list(cls,user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size): def get_topic_detail_recommend_list(cls,user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size,es_cli_obj=None):
""" """
:remark 帖子详情页推荐列表,缺少按时间衰减 :remark 帖子详情页推荐列表,缺少按时间衰减
:param user_id: :param user_id:
...@@ -259,6 +259,9 @@ class TopicUtils(object): ...@@ -259,6 +259,9 @@ class TopicUtils(object):
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["query"] = dict() q["query"] = dict()
...@@ -318,7 +321,7 @@ class TopicUtils(object): ...@@ -318,7 +321,7 @@ class TopicUtils(object):
"include":["id","group_id","user_id","_score"] "include":["id","group_id","user_id","_score"]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q, result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="topic", query_body=q,
offset=offset, size=size) offset=offset, size=size)
return result_dict["hits"] return result_dict["hits"]
...@@ -327,13 +330,16 @@ class TopicUtils(object): ...@@ -327,13 +330,16 @@ class TopicUtils(object):
return [] return []
@classmethod @classmethod
def get_topic_tag_id_list(cls,topic_id): def get_topic_tag_id_list(cls,topic_id,es_cli_obj=None):
""" """
:remark 获取帖子标签列表 :remark 获取帖子标签列表
:param topic_id: :param topic_id:
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["query"] = { q["query"] = {
"term":{ "term":{
...@@ -344,7 +350,7 @@ class TopicUtils(object): ...@@ -344,7 +350,7 @@ class TopicUtils(object):
"include":[TopicDocumentField.TAG_LIST] "include":[TopicDocumentField.TAG_LIST]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),sub_index_name="topic",query_body=q,size=1) result_dict = ESPerform.get_search_results(es_cli_obj,sub_index_name="topic",query_body=q,size=1)
tag_id_list = [] tag_id_list = []
if len(result_dict["hits"])>0: if len(result_dict["hits"])>0:
......
...@@ -11,7 +11,7 @@ from libs.es import ESPerform ...@@ -11,7 +11,7 @@ from libs.es import ESPerform
class UserUtils(object): class UserUtils(object):
@classmethod @classmethod
def get_batch_attention_user_list(cls,user_id_list,self_user_id): def get_batch_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None):
""" """
:remark 批量用户 关注的 用户列表 :remark 批量用户 关注的 用户列表
:param user_id_list: :param user_id_list:
...@@ -19,6 +19,9 @@ class UserUtils(object): ...@@ -19,6 +19,9 @@ class UserUtils(object):
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
user_id_list.append(self_user_id) user_id_list.append(self_user_id)
q = dict() q = dict()
...@@ -31,7 +34,7 @@ class UserUtils(object): ...@@ -31,7 +34,7 @@ class UserUtils(object):
"include":["attention_user_id_list","user_id"] "include":["attention_user_id_list","user_id"]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset=0, size=len(user_id_list)) result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list))
self_attention_user_id_list = [] self_attention_user_id_list = []
attention_user_dict_list = list() attention_user_dict_list = list()
...@@ -57,13 +60,17 @@ class UserUtils(object): ...@@ -57,13 +60,17 @@ class UserUtils(object):
@classmethod @classmethod
def get_attention_user_list(cls,user_id_list,self_user_id): def get_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None):
""" """
:remark 获取指定用户列表 关注的 用户列表 :remark 获取指定用户列表 关注的 用户列表
:param user_id: :param user_id:
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["query"] = { q["query"] = {
"terms":{ "terms":{
...@@ -74,7 +81,7 @@ class UserUtils(object): ...@@ -74,7 +81,7 @@ class UserUtils(object):
"include":["attention_user_id_list","user_id"] "include":["attention_user_id_list","user_id"]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset=0, size=len(user_id_list)) result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list))
self_attention_user_id_list = [] self_attention_user_id_list = []
recursion_attention_user_id_list = [] recursion_attention_user_id_list = []
...@@ -106,7 +113,7 @@ class UserUtils(object): ...@@ -106,7 +113,7 @@ class UserUtils(object):
return [] return []
@classmethod @classmethod
def get_recommend_user_list(cls,self_attention_user_id_list,recursion_attention_user_id_list,offset,size): def get_recommend_user_list(cls,self_attention_user_id_list,recursion_attention_user_id_list,offset,size,es_cli_obj=None):
""" """
:remark 获取推荐用户列表 :remark 获取推荐用户列表
:param attention_user_id_list: :param attention_user_id_list:
...@@ -114,6 +121,9 @@ class UserUtils(object): ...@@ -114,6 +121,9 @@ class UserUtils(object):
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict() q = dict()
q["query"] = dict() q["query"] = dict()
...@@ -167,7 +177,7 @@ class UserUtils(object): ...@@ -167,7 +177,7 @@ class UserUtils(object):
q["_source"] = { q["_source"] = {
"include":["user_id"] "include":["user_id"]
} }
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="user", query_body=q, result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=q,
offset=offset, size=size) offset=offset, size=size)
recommend_user_list = list() recommend_user_list = list()
...@@ -181,7 +191,7 @@ class UserUtils(object): ...@@ -181,7 +191,7 @@ class UserUtils(object):
@classmethod @classmethod
def get_batch_recommend_user_dict(cls,need_filter_attention_user_id_list,attention_user_id_list,attention_user_dict_list,self_user_id,offset,size): def get_batch_recommend_user_dict(cls,need_filter_attention_user_id_list,attention_user_id_list,attention_user_dict_list,self_user_id,offset,size,es_cli_obj=None):
""" """
:remark 获取批量推荐用户 :remark 获取批量推荐用户
:param need_filter_attention_user_id_list: :param need_filter_attention_user_id_list:
...@@ -192,6 +202,9 @@ class UserUtils(object): ...@@ -192,6 +202,9 @@ class UserUtils(object):
:return: :return:
""" """
try: try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
batch_query_list = list() batch_query_list = list()
for interesting_user_item_dict in attention_user_dict_list: for interesting_user_item_dict in attention_user_dict_list:
for interesting_user_id in interesting_user_item_dict: for interesting_user_id in interesting_user_item_dict:
...@@ -262,7 +275,7 @@ class UserUtils(object): ...@@ -262,7 +275,7 @@ class UserUtils(object):
for query_item in batch_query_list: for query_item in batch_query_list:
query_body += "{}\n{}\n".format(json.dumps(search_header_dict),json.dumps(query_item)) query_body += "{}\n{}\n".format(json.dumps(search_header_dict),json.dumps(query_item))
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="user", query_body=query_body, result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=query_body,
batch_search=True) batch_search=True)
ret_dict = dict() ret_dict = dict()
user_index = 0 user_index = 0
......
...@@ -9,6 +9,7 @@ from libs.cache import redis_client ...@@ -9,6 +9,7 @@ from libs.cache import redis_client
from libs.es import ESPerform from libs.es import ESPerform
from search.utils.group import GroupUtils from search.utils.group import GroupUtils
from search.utils.common import GroupSortTypes from search.utils.common import GroupSortTypes
from libs.es import ESPerform
@bind("physical/search/query_group") @bind("physical/search/query_group")
...@@ -46,18 +47,21 @@ def group_sort(user_id=-1,sort_type=GroupSortTypes.HOT_RECOMMEND,offset=0,size=1 ...@@ -46,18 +47,21 @@ def group_sort(user_id=-1,sort_type=GroupSortTypes.HOT_RECOMMEND,offset=0,size=1
if not isinstance(user_id,int): if not isinstance(user_id,int):
user_id = -1 user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
if sort_type==GroupSortTypes.HOT_RECOMMEND: if sort_type==GroupSortTypes.HOT_RECOMMEND:
group_ids_list = GroupUtils.get_hot_group_recommend_result_list(offset,size) group_ids_list = GroupUtils.get_hot_group_recommend_result_list(offset,size,es_cli_obj)
return {"group_recommend_ids":group_ids_list} return {"group_recommend_ids":group_ids_list}
elif sort_type==GroupSortTypes.ATTENTION_RECOMMEND: elif sort_type==GroupSortTypes.ATTENTION_RECOMMEND:
attention_group_list = GroupUtils.get_user_attention_group_list(user_id,offset=0,size=1) attention_group_list = GroupUtils.get_user_attention_group_list(user_id,offset=0,size=1,es_cli_obj=es_cli_obj)
if len(attention_group_list)==0: if len(attention_group_list)==0:
return {"group_recommend_ids": []} return {"group_recommend_ids": []}
else: else:
attention_group_id_list = [item["group_id"] for item in attention_group_list] attention_group_id_list = [item["group_id"] for item in attention_group_list]
sorted_group_ids_list = GroupUtils.get_group_ids_by_aggs(attention_group_id_list) sorted_group_ids_list = GroupUtils.get_group_ids_by_aggs(attention_group_id_list,es_cli_obj)
group_recommend_ids_list = sorted_group_ids_list group_recommend_ids_list = sorted_group_ids_list
#if len(group_recommend_ids_list) < size and len(group_recommend_ids_list)<len(attention_group_list): #if len(group_recommend_ids_list) < size and len(group_recommend_ids_list)<len(attention_group_list):
......
...@@ -10,6 +10,7 @@ from search.utils.topic import TopicUtils ...@@ -10,6 +10,7 @@ from search.utils.topic import TopicUtils
from libs.es import ESPerform from libs.es import ESPerform
from libs.cache import redis_client from libs.cache import redis_client
from search.utils.common import * from search.utils.common import *
from libs.es import ESPerform
def get_discover_page_topic_ids(user_id,device_id,size,query_type=TopicPageType.FIND_PAGE): def get_discover_page_topic_ids(user_id,device_id,size,query_type=TopicPageType.FIND_PAGE):
...@@ -211,10 +212,13 @@ def topic_detail_page_recommend(user_id=-1,topic_id=-1,topic_group_id=-1,topic_u ...@@ -211,10 +212,13 @@ def topic_detail_page_recommend(user_id=-1,topic_id=-1,topic_group_id=-1,topic_u
if not isinstance(user_id,int): if not isinstance(user_id,int):
user_id = -1 user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
# 获取帖子标签列表 # 获取帖子标签列表
topic_tag_list = TopicUtils.get_topic_tag_id_list(topic_id) topic_tag_list = TopicUtils.get_topic_tag_id_list(topic_id,es_cli_obj)
result_list = TopicUtils.get_topic_detail_recommend_list(user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size) result_list = TopicUtils.get_topic_detail_recommend_list(user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size,es_cli_obj)
recommend_topic_ids_list = list() recommend_topic_ids_list = list()
if len(result_list)>0: if len(result_list)>0:
recommend_topic_ids_list = [item["_source"]["id"] for item in result_list] recommend_topic_ids_list = [item["_source"]["id"] for item in result_list]
......
...@@ -10,6 +10,7 @@ from libs.es import ESPerform ...@@ -10,6 +10,7 @@ from libs.es import ESPerform
from libs.cache import redis_client from libs.cache import redis_client
from search.utils.user import UserUtils from search.utils.user import UserUtils
from search.utils.common import * from search.utils.common import *
from libs.es import ESPerform
@bind("physical/search/recommend_user") @bind("physical/search/recommend_user")
...@@ -26,14 +27,17 @@ def recommend_user(self_user_id,interesting_user_id,offset=0,size=10): ...@@ -26,14 +27,17 @@ def recommend_user(self_user_id,interesting_user_id,offset=0,size=10):
if not isinstance(self_user_id,int): if not isinstance(self_user_id,int):
self_user_id = -1 self_user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
#获取关注用户列表 #获取关注用户列表
(self_attention_user_id_list,recursion_attention_user_id_list) = UserUtils.get_attention_user_list([self_user_id,interesting_user_id],self_user_id) (self_attention_user_id_list,recursion_attention_user_id_list) = UserUtils.get_attention_user_list([self_user_id,interesting_user_id],self_user_id,es_cli_obj)
#去除自身及感兴趣的用户ID #去除自身及感兴趣的用户ID
self_attention_user_id_list.append(self_user_id) self_attention_user_id_list.append(self_user_id)
self_attention_user_id_list.append(interesting_user_id) self_attention_user_id_list.append(interesting_user_id)
recommend_user_list = UserUtils.get_recommend_user_list(self_attention_user_id_list,recursion_attention_user_id_list,offset,size) recommend_user_list = UserUtils.get_recommend_user_list(self_attention_user_id_list,recursion_attention_user_id_list,offset,size,es_cli_obj)
return recommend_user_list return recommend_user_list
except: except:
...@@ -54,14 +58,17 @@ def batch_recommend_user(self_user_id,interesting_user_id_list,offset=0,size=10) ...@@ -54,14 +58,17 @@ def batch_recommend_user(self_user_id,interesting_user_id_list,offset=0,size=10)
if not isinstance(self_user_id,int): if not isinstance(self_user_id,int):
self_user_id = -1 self_user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
#获取关注用户列表 #获取关注用户列表
(need_filter_attention_user_id_list, attention_user_dict_list,attention_user_id_list) = UserUtils.get_batch_attention_user_list(interesting_user_id_list,self_user_id) (need_filter_attention_user_id_list, attention_user_dict_list,attention_user_id_list) = UserUtils.get_batch_attention_user_list(interesting_user_id_list,self_user_id,es_cli_obj)
#去除自身及感兴趣的用户ID #去除自身及感兴趣的用户ID
need_filter_attention_user_id_list.append(self_user_id) need_filter_attention_user_id_list.append(self_user_id)
recommend_user_dict = UserUtils.get_batch_recommend_user_dict(need_filter_attention_user_id_list=need_filter_attention_user_id_list,attention_user_id_list=attention_user_id_list,attention_user_dict_list=attention_user_dict_list,self_user_id=self_user_id,offset=offset,size=size) recommend_user_dict = UserUtils.get_batch_recommend_user_dict(need_filter_attention_user_id_list=need_filter_attention_user_id_list,attention_user_id_list=attention_user_id_list,attention_user_dict_list=attention_user_dict_list,self_user_id=self_user_id,offset=offset,size=size,es_cli_obj=es_cli_obj)
logging.info("duan add,recommend_user_dict:%s" % str(recommend_user_dict)) logging.info("duan add,recommend_user_dict:%s" % str(recommend_user_dict))
return recommend_user_dict return recommend_user_dict
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment