Commit 9204b809 authored by 段英荣's avatar 段英荣

Update es.py

parent 21d6ec93
...@@ -17,17 +17,22 @@ class ESPerform(object): ...@@ -17,17 +17,22 @@ class ESPerform(object):
cli_info_list = settings.ES_INFO_LIST cli_info_list = settings.ES_INFO_LIST
index_prefix = settings.ES_INDEX_PREFIX index_prefix = settings.ES_INDEX_PREFIX
@classmethod @classmethod
def get_cli(cls): def get_cli(cls):
try: try:
cls.cli_obj = Elasticsearch(cls.cli_info_list) init_args = {
'sniff_on_start': False,
'sniff_on_connection_fail': False,
}
cls.cli_obj = Elasticsearch(hosts=cls.cli_info_list, **init_args)
return cls.cli_obj return cls.cli_obj
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None return None
@classmethod @classmethod
def get_official_index_name(cls, sub_index_name, index_flag=None): def get_official_index_name(cls,sub_index_name,index_flag=None):
""" """
:remark:get official es index name :remark:get official es index name
:param sub_index_name: :param sub_index_name:
...@@ -35,7 +40,7 @@ class ESPerform(object): ...@@ -35,7 +40,7 @@ class ESPerform(object):
:return: :return:
""" """
try: try:
assert (index_flag in [None, "read", "write"]) assert (index_flag in [None,"read","write"])
official_index_name = cls.index_prefix + "-" + sub_index_name official_index_name = cls.index_prefix + "-" + sub_index_name
if index_flag: if index_flag:
...@@ -47,11 +52,11 @@ class ESPerform(object): ...@@ -47,11 +52,11 @@ class ESPerform(object):
return None return None
@classmethod @classmethod
def __load_mapping(cls, doc_type): def __load_mapping(cls,doc_type):
try: try:
mapping_file_path = os.path.join( mapping_file_path = os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
'..', 'trans2es', 'mapping', '%s.json' % (doc_type,)) '..', 'trans2es','mapping', '%s.json' % (doc_type,))
mapping = '' mapping = ''
with open(mapping_file_path, 'r') as f: with open(mapping_file_path, 'r') as f:
for line in f: for line in f:
...@@ -64,7 +69,7 @@ class ESPerform(object): ...@@ -64,7 +69,7 @@ class ESPerform(object):
return None return None
@classmethod @classmethod
def create_index(cls, es_cli, sub_index_name): def create_index(cls,es_cli,sub_index_name):
""" """
:remark: create es index,alias index :remark: create es index,alias index
:param sub_index_name: :param sub_index_name:
...@@ -77,11 +82,11 @@ class ESPerform(object): ...@@ -77,11 +82,11 @@ class ESPerform(object):
index_exist = es_cli.indices.exists(official_index_name) index_exist = es_cli.indices.exists(official_index_name)
if not index_exist: if not index_exist:
es_cli.indices.create(official_index_name) es_cli.indices.create(official_index_name)
read_alias_name = cls.get_official_index_name(sub_index_name, "read") read_alias_name = cls.get_official_index_name(sub_index_name,"read")
es_cli.indices.put_alias(official_index_name, read_alias_name) es_cli.indices.put_alias(official_index_name,read_alias_name)
write_alias_name = cls.get_official_index_name(sub_index_name, "write") write_alias_name = cls.get_official_index_name(sub_index_name,"write")
es_cli.indices.put_alias(official_index_name, write_alias_name) es_cli.indices.put_alias(official_index_name,write_alias_name)
return True return True
except: except:
...@@ -89,7 +94,7 @@ class ESPerform(object): ...@@ -89,7 +94,7 @@ class ESPerform(object):
return False return False
@classmethod @classmethod
def put_index_mapping(cls, es_cli, sub_index_name, mapping_type="_doc"): def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc"):
""" """
:remark: put index mapping :remark: put index mapping
:param es_cli: :param es_cli:
...@@ -100,13 +105,13 @@ class ESPerform(object): ...@@ -100,13 +105,13 @@ class ESPerform(object):
try: try:
assert (es_cli is not None) assert (es_cli is not None)
write_alias_name = cls.get_official_index_name(sub_index_name, "write") write_alias_name = cls.get_official_index_name(sub_index_name,"write")
index_exist = es_cli.indices.exists(write_alias_name) index_exist = es_cli.indices.exists(write_alias_name)
if not index_exist: if not index_exist:
return False return False
mapping_dict = cls.__load_mapping(sub_index_name) mapping_dict = cls.__load_mapping(sub_index_name)
es_cli.indices.put_mapping(index=write_alias_name, body=mapping_dict, doc_type=mapping_type) es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)
return True return True
except: except:
...@@ -114,7 +119,7 @@ class ESPerform(object): ...@@ -114,7 +119,7 @@ class ESPerform(object):
return False return False
@classmethod @classmethod
def put_indices_template(cls, es_cli, template_file_name, template_name): def put_indices_template(cls,es_cli,template_file_name, template_name):
""" """
:remark put index template :remark put index template
:param es_cli: :param es_cli:
...@@ -126,7 +131,7 @@ class ESPerform(object): ...@@ -126,7 +131,7 @@ class ESPerform(object):
assert (es_cli is not None) assert (es_cli is not None)
mapping_dict = cls.__load_mapping(template_file_name) mapping_dict = cls.__load_mapping(template_file_name)
es_cli.indices.put_template(name=template_name, body=mapping_dict) es_cli.indices.put_template(name=template_name,body=mapping_dict)
return True return True
except: except:
...@@ -134,7 +139,7 @@ class ESPerform(object): ...@@ -134,7 +139,7 @@ class ESPerform(object):
return False return False
@classmethod @classmethod
def es_helpers_bulk(cls, es_cli, data_list, sub_index_name, auto_create_index=False, doc_type="_doc"): def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"):
try: try:
assert (es_cli is not None) assert (es_cli is not None)
...@@ -145,8 +150,8 @@ class ESPerform(object): ...@@ -145,8 +150,8 @@ class ESPerform(object):
logging.error("index:%s is not existing,bulk data error!" % official_index_name) logging.error("index:%s is not existing,bulk data error!" % official_index_name)
return False return False
else: else:
cls.create_index(es_cli, sub_index_name) cls.create_index(es_cli,sub_index_name)
cls.put_index_mapping(es_cli, sub_index_name) cls.put_index_mapping(es_cli,sub_index_name)
bulk_actions = [] bulk_actions = []
for data in data_list: for data in data_list:
...@@ -157,7 +162,7 @@ class ESPerform(object): ...@@ -157,7 +162,7 @@ class ESPerform(object):
'_id': data['id'], '_id': data['id'],
'_source': data, '_source': data,
}) })
elasticsearch.helpers.bulk(es_cli, bulk_actions) elasticsearch.helpers.bulk(es_cli,bulk_actions)
return True return True
except: except:
...@@ -165,35 +170,41 @@ class ESPerform(object): ...@@ -165,35 +170,41 @@ class ESPerform(object):
return False return False
@classmethod @classmethod
def get_search_results(cls, es_cli, sub_index_name, query_body, offset=0, size=10, def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10,
auto_create_index=False, doc_type="_doc", aggregations_query=False, auto_create_index=False,doc_type="_doc",aggregations_query=False,is_suggest_request=False,batch_search=False):
is_suggest_request=False):
try: try:
assert (es_cli is not None) assert (es_cli is not None)
official_index_name = cls.get_official_index_name(sub_index_name, "read") official_index_name = cls.get_official_index_name(sub_index_name,"read")
index_exists = es_cli.indices.exists(official_index_name) index_exists = es_cli.indices.exists(official_index_name)
if not index_exists: if not index_exists:
if not auto_create_index: if not auto_create_index:
logging.error("index:%s is not existing,get_search_results error!" % official_index_name) logging.error("index:%s is not existing,get_search_results error!" % official_index_name)
return None return None
else: else:
cls.create_index(es_cli, sub_index_name) cls.create_index(es_cli,sub_index_name)
cls.put_index_mapping(es_cli, sub_index_name) cls.put_index_mapping(es_cli,sub_index_name)
logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8")) logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8"))
res = es_cli.search(index=official_index_name, doc_type=doc_type, body=query_body, from_=offset, size=size)
if is_suggest_request: if not batch_search:
return res res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size)
if is_suggest_request:
return res
else:
result_dict = {
"total_count":res["hits"]["total"],
"hits":res["hits"]["hits"]
}
if aggregations_query:
result_dict["aggregations"] = res["aggregations"]
return result_dict
else: else:
result_dict = { res = es_cli.msearch(body=query_body,index=official_index_name, doc_type=doc_type)
"total_count": res["hits"]["total"],
"hits": res["hits"]["hits"] logging.info("duan add,msearch res:%s" % str(res))
} return res
if aggregations_query:
result_dict["aggregations"] = res["aggregations"]
return result_dict
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"total_count": 0, "hits": []} return {"total_count":0,"hits":[]}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment