from elasticsearch import Elasticsearch as Es import json def get_es(): init_args = {'sniff_on_start': False, 'sniff_on_connection_fail': False,} new_hosts = [{'host': '172.16.31.17', 'port': 9000}, {'host': '172.16.31.11', 'port': 9000}, {'host': '172.16.31.13', 'port': 9000}] new_es = Es(hosts=new_hosts, **init_args) return new_es def es_index_adapt(index_prefix, doc_type, rw=None): """get the adapted index name """ assert rw in [None, 'read', 'write'] index = '-'.join((index_prefix, doc_type)) if rw: index = '-'.join((index, rw)) return index def es_query(doc, body, offset, size, es=None): if es is None: es = get_es() index = es_index_adapt(index_prefix='gm-dbmw', doc_type=doc, rw='read') res = es.search( index=index, doc_type=doc, timeout='10s', body=body, from_=offset, size=size) return res def es_mquery(doc, body, es=None): if es is None: es = get_es() index = es_index_adapt(index_prefix='gm-dbmw', doc_type=doc, rw='read') res = es.msearch(body, index=index) return res def es_insert_device_info(device_id, body, es=None, rw=None): if es is None: es = get_es() index = es_index_adapt(index_prefix="gm-dbmw", doc_type="device", rw=None) bulk_head = '{"index": {"_id": "%s"}}' % device_id data_str = json.dumps(body, ensure_ascii=False) bulk_one_body = bulk_head + "\n" + data_str + "\n" return es.bulk(index=index, doc_type="device", body=bulk_one_body)