# coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch.helpers import scan, bulk from django.conf import settings from pytz import timezone def es_index_adapt(index_prefix, doc_type, rw=None): """get the adapted index name """ assert rw in [None, 'read', 'write'] index = '-'.join((index_prefix, doc_type)) if rw: index = '-'.join((index, rw)) return index def tzlc(dt, truncate_to_sec=True): if dt is None: return None if truncate_to_sec: dt = dt.replace(microsecond=0) return timezone(settings.TIME_ZONE).localize(dt) def to_epoch(dt): return (dt - datetime(year=1970, month=1, day=1, tzinfo=timezone('UTC'))).total_seconds() def reindex(es, old_index='', new_index='', chunk_size=500): docs = scan(client=es, scroll='1m', index=old_index) def _change_op(docs): for d in docs: d['_op_type'] = 'create' d['_index'] = new_index yield d return bulk(client=es, actions=_change_op(docs), chunk_size=chunk_size) def create_index(es, index_prefix, doc_type): cl = es.indices index = es_index_adapt( index_prefix=index_prefix, doc_type=doc_type ) if not cl.exists(index=index): cl.create(index=index) return True else: return False def delete_index(es, index_prefix, doc_type): cl = es.indices index = es_index_adapt( index_prefix=index_prefix, doc_type=doc_type ) if cl.exists(index=index): cl.delete(index=index) return True else: return False def load_mapping(doc_type): import os.path import re import json mapping_file_path = os.path.join( os.path.dirname(__file__), '..', 'mapping', '%s.json' % (doc_type,)) mapping = '' with open(mapping_file_path, 'r') as f: for line in f: # 去掉注释 mapping += re.sub(r'//.*$', '', line) mapping = json.loads(mapping) return mapping def put_mapping(es, index_prefix, doc_type, mapping, delete=False): cl = es.indices index = es_index_adapt( index_prefix=index_prefix, doc_type=doc_type, rw='write' ) # delete mapping first it type exists if cl.exists_type(index=index, doc_type=doc_type) and delete: cl.delete_mapping(index=index, doc_type=doc_type) # there is no "index_analyzer" option in ver 2.x # but only "analyzer" and "search_analyzer" def _mapping_opt_modify(o): if 'analyzer' in o and 'search_analyzer' in o: o['index_analyzer'] = o['analyzer'] del o['analyzer'] for v in o.values(): if isinstance(v, dict): _mapping_opt_modify(v) return o # mapping body needs to be modified # as there is a difference between ver 1.x and 2.x def _mapping_modify(es, doc_type, mp): es_info = es.info() es_ver = es_info['version']['number'] from distutils.version import LooseVersion if LooseVersion(es_ver) < LooseVersion('2.0.0'): return { doc_type: _mapping_opt_modify(mp) } else: return mp import copy mapping_copy = copy.deepcopy(mapping) mapping_copy = _mapping_modify(es, doc_type, mapping_copy) return cl.put_mapping(index=index, doc_type=doc_type, body=mapping_copy) def alias_shift(es, alias, old_index, new_index): cl = es.indices if cl.exists(index=old_index) and cl.exists(index=new_index): q = { 'actions': [ {'remove': {'index': old_index, 'alias': alias}}, {'add': {'index': new_index, 'alias': alias}}, ] } cl.update_aliases(body=q) def init_alias(es, index_prefix, doc_type): cl = es.indices index = es_index_adapt( index_prefix=index_prefix, doc_type=doc_type ) index_read = es_index_adapt( index_prefix=index_prefix, doc_type=doc_type, rw='read', ) index_write = es_index_adapt( index_prefix=index_prefix, doc_type=doc_type, rw='write', ) content = { 'actions': [ {'add': {'index': index, 'alias': index_read}}, {'add': {'index': index, 'alias': index_write}}, ] } cl.update_aliases(body=content) def area_tag_id_filter(prefix_list, value): return { 'bool': { 'should': [ {'term': {prefix + field: value}} for prefix in prefix_list for field in ['city_tag_id', 'city_province_tag_id', 'city_province_country_tag_id', ] ] } } __es_instance = None def get_es_instance(): global __es_instance if __es_instance is not None: return __es_instance init_args = { 'sniff_on_start': False, 'sniff_on_connection_fail': False, } new_hosts = settings.ES7_HOSTS __es_instance = Elasticsearch(hosts=new_hosts, http_auth=(settings.HTTP_AUTH_NAME, settings.HTTP_AUTH_PWD), **init_args) return __es_instance