Commit ac2a1e01 authored by 段英荣's avatar 段英荣

modify es connection mem leak

parent 57791ec8
......@@ -21,7 +21,11 @@ class ESPerform(object):
@classmethod
def get_cli(cls):
try:
cls.cli_obj = Elasticsearch(cls.cli_info_list)
init_args = {
'sniff_on_start': False,
'sniff_on_connection_fail': False,
}
cls.cli_obj = Elasticsearch(hosts=cls.cli_info_list, **init_args)
return cls.cli_obj
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -47,8 +47,11 @@ class GroupUtils(object):
return {"total_count":0, "hits":[]}
@classmethod
def get_hot_group_recommend_result_list(cls,offset,size):
def get_hot_group_recommend_result_list(cls,offset,size,es_cli_obj=None):
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["query"] = {
"bool":{
......@@ -65,7 +68,7 @@ class GroupUtils(object):
"include":["id"]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),"group",q,offset,size)
result_dict = ESPerform.get_search_results(es_cli_obj,"group",q,offset,size)
group_ids_list = []
if len(result_dict["hits"]) > 0:
......@@ -77,12 +80,15 @@ class GroupUtils(object):
return []
@classmethod
def get_user_attention_group_list(cls,user_id,offset=0,size=10):
def get_user_attention_group_list(cls,user_id,offset=0,size=10,es_cli_obj=None):
"""
:remark: 获取用户关注小组列表
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["query"] = {
"bool":{
......@@ -97,7 +103,7 @@ class GroupUtils(object):
"include":["attention_group_id_list"]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),"user",q,offset,size)
result_dict = ESPerform.get_search_results(es_cli_obj,"user",q,offset,size)
if len(result_dict["hits"])>0:
return result_dict["hits"][0]["_source"]["attention_group_id_list"]
else:
......@@ -107,13 +113,16 @@ class GroupUtils(object):
return []
@classmethod
def get_group_ids_by_aggs(cls,group_id_list):
def get_group_ids_by_aggs(cls,group_id_list,es_cli_obj=None):
"""
:remark:聚合查询获取小组列表
:param group_id_list:
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["size"]=0
q["query"] = {
......@@ -136,7 +145,7 @@ class GroupUtils(object):
}
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),"topic",q,aggregations_query=True)
result_dict = ESPerform.get_search_results(es_cli_obj,"topic",q,aggregations_query=True)
buckets_list = result_dict["aggregations"]["group_ids"]["buckets"]
sorted_buckets_list = sorted(buckets_list,key=lambda item:item["max_date"]["value"],reverse=True)
......
......@@ -247,7 +247,7 @@ class TopicUtils(object):
return []
@classmethod
def get_topic_detail_recommend_list(cls,user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size):
def get_topic_detail_recommend_list(cls,user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size,es_cli_obj=None):
"""
:remark 帖子详情页推荐列表,缺少按时间衰减
:param user_id:
......@@ -259,6 +259,9 @@ class TopicUtils(object):
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["query"] = dict()
......@@ -318,7 +321,7 @@ class TopicUtils(object):
"include":["id","group_id","user_id","_score"]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="topic", query_body=q,
result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="topic", query_body=q,
offset=offset, size=size)
return result_dict["hits"]
......@@ -327,13 +330,16 @@ class TopicUtils(object):
return []
@classmethod
def get_topic_tag_id_list(cls,topic_id):
def get_topic_tag_id_list(cls,topic_id,es_cli_obj=None):
"""
:remark 获取帖子标签列表
:param topic_id:
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["query"] = {
"term":{
......@@ -344,7 +350,7 @@ class TopicUtils(object):
"include":[TopicDocumentField.TAG_LIST]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(),sub_index_name="topic",query_body=q,size=1)
result_dict = ESPerform.get_search_results(es_cli_obj,sub_index_name="topic",query_body=q,size=1)
tag_id_list = []
if len(result_dict["hits"])>0:
......
......@@ -11,7 +11,7 @@ from libs.es import ESPerform
class UserUtils(object):
@classmethod
def get_batch_attention_user_list(cls,user_id_list,self_user_id):
def get_batch_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None):
"""
:remark 批量用户 关注的 用户列表
:param user_id_list:
......@@ -19,6 +19,9 @@ class UserUtils(object):
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
user_id_list.append(self_user_id)
q = dict()
......@@ -31,7 +34,7 @@ class UserUtils(object):
"include":["attention_user_id_list","user_id"]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset=0, size=len(user_id_list))
result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list))
self_attention_user_id_list = []
attention_user_dict_list = list()
......@@ -57,13 +60,17 @@ class UserUtils(object):
@classmethod
def get_attention_user_list(cls,user_id_list,self_user_id):
def get_attention_user_list(cls,user_id_list,self_user_id,es_cli_obj=None):
"""
:remark 获取指定用户列表 关注的 用户列表
:param user_id:
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["query"] = {
"terms":{
......@@ -74,7 +81,7 @@ class UserUtils(object):
"include":["attention_user_id_list","user_id"]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), "user", q, offset=0, size=len(user_id_list))
result_dict = ESPerform.get_search_results(es_cli_obj, "user", q, offset=0, size=len(user_id_list))
self_attention_user_id_list = []
recursion_attention_user_id_list = []
......@@ -106,7 +113,7 @@ class UserUtils(object):
return []
@classmethod
def get_recommend_user_list(cls,self_attention_user_id_list,recursion_attention_user_id_list,offset,size):
def get_recommend_user_list(cls,self_attention_user_id_list,recursion_attention_user_id_list,offset,size,es_cli_obj=None):
"""
:remark 获取推荐用户列表
:param attention_user_id_list:
......@@ -114,6 +121,9 @@ class UserUtils(object):
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
q = dict()
q["query"] = dict()
......@@ -167,7 +177,7 @@ class UserUtils(object):
q["_source"] = {
"include":["user_id"]
}
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="user", query_body=q,
result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=q,
offset=offset, size=size)
recommend_user_list = list()
......@@ -181,7 +191,7 @@ class UserUtils(object):
@classmethod
def get_batch_recommend_user_dict(cls,need_filter_attention_user_id_list,attention_user_id_list,attention_user_dict_list,self_user_id,offset,size):
def get_batch_recommend_user_dict(cls,need_filter_attention_user_id_list,attention_user_id_list,attention_user_dict_list,self_user_id,offset,size,es_cli_obj=None):
"""
:remark 获取批量推荐用户
:param need_filter_attention_user_id_list:
......@@ -192,6 +202,9 @@ class UserUtils(object):
:return:
"""
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
batch_query_list = list()
for interesting_user_item_dict in attention_user_dict_list:
for interesting_user_id in interesting_user_item_dict:
......@@ -262,7 +275,7 @@ class UserUtils(object):
for query_item in batch_query_list:
query_body += "{}\n{}\n".format(json.dumps(search_header_dict),json.dumps(query_item))
result_dict = ESPerform.get_search_results(ESPerform.get_cli(), sub_index_name="user", query_body=query_body,
result_dict = ESPerform.get_search_results(es_cli_obj, sub_index_name="user", query_body=query_body,
batch_search=True)
ret_dict = dict()
user_index = 0
......
......@@ -9,6 +9,7 @@ from libs.cache import redis_client
from libs.es import ESPerform
from search.utils.group import GroupUtils
from search.utils.common import GroupSortTypes
from libs.es import ESPerform
@bind("physical/search/query_group")
......@@ -46,18 +47,21 @@ def group_sort(user_id=-1,sort_type=GroupSortTypes.HOT_RECOMMEND,offset=0,size=1
if not isinstance(user_id,int):
user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
if sort_type==GroupSortTypes.HOT_RECOMMEND:
group_ids_list = GroupUtils.get_hot_group_recommend_result_list(offset,size)
group_ids_list = GroupUtils.get_hot_group_recommend_result_list(offset,size,es_cli_obj)
return {"group_recommend_ids":group_ids_list}
elif sort_type==GroupSortTypes.ATTENTION_RECOMMEND:
attention_group_list = GroupUtils.get_user_attention_group_list(user_id,offset=0,size=1)
attention_group_list = GroupUtils.get_user_attention_group_list(user_id,offset=0,size=1,es_cli_obj=es_cli_obj)
if len(attention_group_list)==0:
return {"group_recommend_ids": []}
else:
attention_group_id_list = [item["group_id"] for item in attention_group_list]
sorted_group_ids_list = GroupUtils.get_group_ids_by_aggs(attention_group_id_list)
sorted_group_ids_list = GroupUtils.get_group_ids_by_aggs(attention_group_id_list,es_cli_obj)
group_recommend_ids_list = sorted_group_ids_list
#if len(group_recommend_ids_list) < size and len(group_recommend_ids_list)<len(attention_group_list):
......
......@@ -10,6 +10,7 @@ from search.utils.topic import TopicUtils
from libs.es import ESPerform
from libs.cache import redis_client
from search.utils.common import *
from libs.es import ESPerform
def get_discover_page_topic_ids(user_id,device_id,size,query_type=TopicPageType.FIND_PAGE):
......@@ -211,10 +212,13 @@ def topic_detail_page_recommend(user_id=-1,topic_id=-1,topic_group_id=-1,topic_u
if not isinstance(user_id,int):
user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
# 获取帖子标签列表
topic_tag_list = TopicUtils.get_topic_tag_id_list(topic_id)
topic_tag_list = TopicUtils.get_topic_tag_id_list(topic_id,es_cli_obj)
result_list = TopicUtils.get_topic_detail_recommend_list(user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size)
result_list = TopicUtils.get_topic_detail_recommend_list(user_id,topic_id,topic_tag_list,topic_group_id,topic_user_id,filter_topic_user_id,offset,size,es_cli_obj)
recommend_topic_ids_list = list()
if len(result_list)>0:
recommend_topic_ids_list = [item["_source"]["id"] for item in result_list]
......
......@@ -10,6 +10,7 @@ from libs.es import ESPerform
from libs.cache import redis_client
from search.utils.user import UserUtils
from search.utils.common import *
from libs.es import ESPerform
@bind("physical/search/recommend_user")
......@@ -26,14 +27,17 @@ def recommend_user(self_user_id,interesting_user_id,offset=0,size=10):
if not isinstance(self_user_id,int):
self_user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
#获取关注用户列表
(self_attention_user_id_list,recursion_attention_user_id_list) = UserUtils.get_attention_user_list([self_user_id,interesting_user_id],self_user_id)
(self_attention_user_id_list,recursion_attention_user_id_list) = UserUtils.get_attention_user_list([self_user_id,interesting_user_id],self_user_id,es_cli_obj)
#去除自身及感兴趣的用户ID
self_attention_user_id_list.append(self_user_id)
self_attention_user_id_list.append(interesting_user_id)
recommend_user_list = UserUtils.get_recommend_user_list(self_attention_user_id_list,recursion_attention_user_id_list,offset,size)
recommend_user_list = UserUtils.get_recommend_user_list(self_attention_user_id_list,recursion_attention_user_id_list,offset,size,es_cli_obj)
return recommend_user_list
except:
......@@ -54,14 +58,17 @@ def batch_recommend_user(self_user_id,interesting_user_id_list,offset=0,size=10)
if not isinstance(self_user_id,int):
self_user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
#获取关注用户列表
(need_filter_attention_user_id_list, attention_user_dict_list,attention_user_id_list) = UserUtils.get_batch_attention_user_list(interesting_user_id_list,self_user_id)
(need_filter_attention_user_id_list, attention_user_dict_list,attention_user_id_list) = UserUtils.get_batch_attention_user_list(interesting_user_id_list,self_user_id,es_cli_obj)
#去除自身及感兴趣的用户ID
need_filter_attention_user_id_list.append(self_user_id)
recommend_user_dict = UserUtils.get_batch_recommend_user_dict(need_filter_attention_user_id_list=need_filter_attention_user_id_list,attention_user_id_list=attention_user_id_list,attention_user_dict_list=attention_user_dict_list,self_user_id=self_user_id,offset=offset,size=size)
recommend_user_dict = UserUtils.get_batch_recommend_user_dict(need_filter_attention_user_id_list=need_filter_attention_user_id_list,attention_user_id_list=attention_user_id_list,attention_user_dict_list=attention_user_dict_list,self_user_id=self_user_id,offset=offset,size=size,es_cli_obj=es_cli_obj)
logging.info("duan add,recommend_user_dict:%s" % str(recommend_user_dict))
return recommend_user_dict
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment