#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import logging import traceback import os.path import re import json from elasticsearch import Elasticsearch import elasticsearch.helpers from django.conf import settings class ESPerform(object): cli_obj = None cli_info_list = settings.ES_INFO_LIST index_prefix = settings.ES_INDEX_PREFIX @classmethod def get_cli(cls): try: init_args = { 'sniff_on_start': False, 'sniff_on_connection_fail': False, } cls.cli_obj = Elasticsearch(hosts=cls.cli_info_list, **init_args) return cls.cli_obj except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def get_official_index_name(cls,sub_index_name,index_flag=None): """ :remark:get official es index name :param sub_index_name: :param index_flag: :return: """ try: assert (index_flag in [None,"read","write"]) official_index_name = cls.index_prefix + "-" + sub_index_name if index_flag: official_index_name += "-" + index_flag return official_index_name except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def __load_mapping(cls,doc_type): try: mapping_file_path = os.path.join( os.path.dirname(__file__), '..', 'trans2es','mapping', '%s.json' % (doc_type,)) mapping = '' with open(mapping_file_path, 'r') as f: for line in f: # 去掉注释 mapping += re.sub(r'//.*$', '', line) mapping = json.loads(mapping) return mapping except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None @classmethod def create_index(cls,es_cli,sub_index_name): """ :remark: create es index,alias index :param sub_index_name: :return: """ try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name) index_exist = es_cli.indices.exists(official_index_name) if not index_exist: es_cli.indices.create(official_index_name) read_alias_name = cls.get_official_index_name(sub_index_name,"read") es_cli.indices.put_alias(official_index_name,read_alias_name) write_alias_name = cls.get_official_index_name(sub_index_name,"write") es_cli.indices.put_alias(official_index_name,write_alias_name) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc",force_sync=False): """ :remark: put index mapping :param es_cli: :param sub_index_name: :param mapping_type: :return: """ try: assert (es_cli is not None) write_alias_name = cls.get_official_index_name(sub_index_name,"write") index_exist = es_cli.indices.exists(write_alias_name) if not index_exist and not force_sync: return False mapping_dict = cls.__load_mapping(sub_index_name) es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def put_indices_template(cls,es_cli,template_file_name, template_name): """ :remark put index template :param es_cli: :param template_file_name: :param template_name: :return: """ try: assert (es_cli is not None) mapping_dict = cls.__load_mapping(template_file_name) es_cli.indices.put_template(name=template_name,body=mapping_dict) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"): try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name) index_exists = es_cli.indices.exists(official_index_name) if not index_exists: if not auto_create_index: logging.error("index:%s is not existing,bulk data error!" % official_index_name) return False else: cls.create_index(es_cli,sub_index_name) cls.put_index_mapping(es_cli,sub_index_name) bulk_actions = [] for data in data_list: bulk_actions.append({ '_op_type': 'index', '_index': official_index_name, '_type': doc_type, '_id': data['id'], '_source': data, }) elasticsearch.helpers.bulk(es_cli,bulk_actions) return True except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return False @classmethod def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10, auto_create_index=False,doc_type="_doc",aggregations_query=False,is_suggest_request=False,batch_search=False): try: assert (es_cli is not None) official_index_name = cls.get_official_index_name(sub_index_name,"read") index_exists = es_cli.indices.exists(official_index_name) if not index_exists: if not auto_create_index: logging.error("index:%s is not existing,get_search_results error!" % official_index_name) return None else: cls.create_index(es_cli,sub_index_name) cls.put_index_mapping(es_cli,sub_index_name) logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8")) if not batch_search: res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size) if is_suggest_request: return res else: result_dict = { "total_count":res["hits"]["total"], "hits":res["hits"]["hits"] } if aggregations_query: result_dict["aggregations"] = res["aggregations"] return result_dict else: res = es_cli.msearch(body=query_body,index=official_index_name, doc_type=doc_type) logging.info("duan add,msearch res:%s" % str(res)) return res except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return {"total_count":0,"hits":[]}