es.py 8.64 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import logging
import traceback
import os.path
import re
import json
from elasticsearch import Elasticsearch
import elasticsearch.helpers
from django.conf import settings


class ESPerform(object):
    cli_obj = None
    cli_info_list = settings.ES_INFO_LIST
    index_prefix = settings.ES_INDEX_PREFIX


    @classmethod
    def get_cli(cls):
        try:
            init_args = {
                'sniff_on_start': False,
                'sniff_on_connection_fail': False,
            }
            cls.cli_obj = Elasticsearch(hosts=cls.cli_info_list, **init_args)
            return cls.cli_obj
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None

    @classmethod
    def get_official_index_name(cls,sub_index_name,index_flag=None):
        """
        :remark:get official es index name
        :param sub_index_name:
        :param index_flag:
        :return:
        """
        try:
            assert (index_flag in [None,"read","write"])

            official_index_name = cls.index_prefix + "-" + sub_index_name
            if index_flag:
                official_index_name += "-" + index_flag

            return official_index_name
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None

    @classmethod
    def __load_mapping(cls,doc_type):
        try:
            mapping_file_path = os.path.join(
                os.path.dirname(__file__),
                '..', 'trans2es','mapping', '%s.json' % (doc_type,))
            mapping = ''
            with open(mapping_file_path, 'r') as f:
                for line in f:
                    # 去掉注释
                    mapping += re.sub(r'//.*$', '', line)
            mapping = json.loads(mapping)
            return mapping
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None

    @classmethod
    def create_index(cls,es_cli,sub_index_name):
        """
        :remark: create es index,alias index
        :param sub_index_name:
        :return:
        """
        try:
            assert (es_cli is not None)

            official_index_name = cls.get_official_index_name(sub_index_name)
            index_exist = es_cli.indices.exists(official_index_name)
            if not index_exist:
                es_cli.indices.create(official_index_name)
                read_alias_name = cls.get_official_index_name(sub_index_name,"read")
                es_cli.indices.put_alias(official_index_name,read_alias_name)

                write_alias_name = cls.get_official_index_name(sub_index_name,"write")
                es_cli.indices.put_alias(official_index_name,write_alias_name)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc",force_sync=False):
        """
        :remark: put index mapping
        :param es_cli:
        :param sub_index_name:
        :param mapping_type:
        :return:
        """
        try:
            assert (es_cli is not None)

            write_alias_name = cls.get_official_index_name(sub_index_name,"write")
            index_exist = es_cli.indices.exists(write_alias_name)
            if not index_exist and not force_sync:
                return False

            mapping_dict = cls.__load_mapping(sub_index_name)
            es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def put_indices_template(cls,es_cli,template_file_name, template_name):
        """
        :remark put index template
        :param es_cli:
        :param template_file_name:
        :param template_name:
        :return:
        """
        try:
            assert (es_cli is not None)

            mapping_dict = cls.__load_mapping(template_file_name)
            es_cli.indices.put_template(name=template_name,body=mapping_dict)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"):
        try:
            assert (es_cli is not None)

            official_index_name = cls.get_official_index_name(sub_index_name, "write")
            index_exists = es_cli.indices.exists(official_index_name)
            if not index_exists:
                if not auto_create_index:
                    logging.error("index:%s is not existing,bulk data error!" % official_index_name)
                    return False
                else:
                    cls.create_index(es_cli,sub_index_name)
                    cls.put_index_mapping(es_cli,sub_index_name)

            bulk_actions = []
            for data in data_list:
                bulk_actions.append({
                    '_op_type': 'index',
                    '_index': official_index_name,
                    '_type': doc_type,
                    '_id': data['id'],
                    '_source': data,
                })
            elasticsearch.helpers.bulk(es_cli,bulk_actions)

            return True
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return False

    @classmethod
    def get_search_results(cls, es_cli,sub_index_name,query_body,offset=0,size=10,
                           auto_create_index=False,doc_type="_doc",aggregations_query=False,is_suggest_request=False,batch_search=False):
        try:
            assert (es_cli is not None)

            official_index_name = cls.get_official_index_name(sub_index_name,"read")
            index_exists = es_cli.indices.exists(official_index_name)
            if not index_exists:
                if not auto_create_index:
                    logging.error("index:%s is not existing,get_search_results error!" % official_index_name)
                    return None
                else:
                    cls.create_index(es_cli,sub_index_name)
                    cls.put_index_mapping(es_cli,sub_index_name)

            logging.info("duan add,query_body:%s" % str(query_body).encode("utf-8"))

            if not batch_search:
                res = es_cli.search(index=official_index_name,doc_type=doc_type,body=query_body,from_=offset,size=size)

                if is_suggest_request:
                    return res
                else:
                    result_dict = {
                        "total_count":res["hits"]["total"],
                        "hits":res["hits"]["hits"]
                    }
                    if aggregations_query:
                        result_dict["aggregations"] = res["aggregations"]
                    return result_dict
            else:
                res = es_cli.msearch(body=query_body,index=official_index_name, doc_type=doc_type)

                logging.info("duan add,msearch res:%s" % str(res))
                return res
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return {"total_count":0,"hits":[]}


    @classmethod
    def if_es_node_load_high(cls, es_cli):
        try:
            assert (es_cli is not None)

            high_num = 0
            es_nodes_list = list()
            es_nodes_ori_info = es_cli.cat.nodes()
            es_nodes_info_list = es_nodes_ori_info.split("\n")
            for item in es_nodes_info_list:
                try:
                    item_list = item.split(" ")
                    if len(item_list)==11:
                        cpu_load = item_list[4]
                    elif len(item_list)==10:
                        cpu_load = item_list[3]
                    else:
                        continue
                    int_cpu_load = int(cpu_load)
                    if int_cpu_load > 60:
                        high_num += 1
                    es_nodes_list.append(int_cpu_load)
                except:
                    logging.error("catch exception,item:%s,err_msg:%s" % (str(item),traceback.format_exc()))
                    return True

            if high_num > 3:
                logging.info("check es_nodes_load high,cpu load:%s,ori_cpu_info:%s" % (str(es_nodes_list), str(es_nodes_info_list)))
                return True
            else:
                return False
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return True