es.py 6.12 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import logging
import traceback
import os.path
import re
import json
from elasticsearch import Elasticsearch
import elasticsearch.helpers


class ESPerform(object):
    cli_obj = None
    cli_info_list = [
        {
            "host": "10.29.130.141",
            "port": 9200
        }
    ]
    index_prefix = "gm-dbmw"


    @classmethod
    def get_cli(cls):
        try:
            cls.cli_obj = Elasticsearch(cls.cli_info_list)
            return cls.cli_obj
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None

    @classmethod
    def get_official_index_name(cls,sub_index_name,index_flag=None):
        """
        :remark:get official es index name
        :param sub_index_name:
        :param index_flag:
        :return:
        """
        try:
            assert (index_flag in [None,"read","write"])

            official_index_name = cls.index_prefix + "-" + sub_index_name
            if index_flag:
                official_index_name += "-" + index_flag

            return official_index_name
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None

    @classmethod
    def __load_mapping(cls,doc_type):
        try:
            mapping_file_path = os.path.join(
                os.path.dirname(__file__),
                '..', 'trans2es','mapping', '%s.json' % (doc_type,))
            mapping = ''
            with open(mapping_file_path, 'r') as f:
                for line in f:
                    # 去掉注释
                    mapping += re.sub(r'//.*$', '', line)
            mapping = json.loads(mapping)
            return mapping
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None

    @classmethod
    def create_index(cls,es_cli,sub_index_name):
        """
        :remark: create es index,alias index
        :param sub_index_name:
        :return:
        """
        try:
            assert (es_cli is not None)

            official_index_name = cls.get_official_index_name(sub_index_name)
            index_exist = es_cli.indices.exists(official_index_name)
            if not index_exist:
                es_cli.indices.create(official_index_name)
                read_alias_name = cls.get_official_index_name(sub_index_name,"read")
                es_cli.indices.put_alias(official_index_name,read_alias_name)

                write_alias_name = cls.get_official_index_name(sub_index_name,"write")
                es_cli.indices.put_alias(official_index_name,write_alias_name)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc"):
        """
        :remark: put index mapping
        :param es_cli:
        :param sub_index_name:
        :param mapping_type:
        :return:
        """
        try:
            assert (es_cli is not None)

            write_alias_name = cls.get_official_index_name(sub_index_name,"write")
            index_exist = es_cli.indices.exists(write_alias_name)
            if not index_exist:
                return False

            mapping_dict = cls.__load_mapping(sub_index_name)
            es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"):
        try:
            assert (es_cli is not None)

            official_index_name = cls.get_official_index_name(sub_index_name)
            index_exists = es_cli.indices.exists(official_index_name)
            if not index_exists:
                if not auto_create_index:
                    logging.error("index:%s is not existing,bulk data error!" % official_index_name)
                    return False
                else:
                    cls.create_index(es_cli,sub_index_name)
                    cls.put_index_mapping(es_cli,sub_index_name)

            bulk_actions = []
            for data in data_list:
                bulk_actions.append({
                    '_op_type': 'index',
                    '_index': official_index_name,
                    '_type': doc_type,
                    '_id': data['id'],
                    '_source': data,
                })
            elasticsearch.helpers.bulk(es_cli,bulk_actions)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10,
                           auto_create_index=False,doc_type="_doc",aggregations_query=False):
        try:
            assert (es_cli is not None)

            official_index_name = cls.get_official_index_name(sub_index_name,"read")
            index_exists = es_cli.indices.exists(official_index_name)
            if not index_exists:
                if not auto_create_index:
                    logging.error("index:%s is not existing,get_search_results error!" % official_index_name)
                    return None
                else:
                    cls.create_index(es_cli,sub_index_name)
                    cls.put_index_mapping(es_cli,sub_index_name)

            logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8"))
            res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size)
            result_dict = {
                "total_count":res["hits"]["total"],
                "hits":res["hits"]["hits"]
            }
            if aggregations_query:
                result_dict["aggregations"] = res["aggregations"]

            return result_dict
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return {"total_count":0,"hits":[]}