Commit cea5c194 authored by 段英荣's avatar 段英荣

modify

parent 484e67c9
...@@ -8,6 +8,7 @@ import os.path ...@@ -8,6 +8,7 @@ import os.path
import re import re
import json import json
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
import elasticsearch.helpers
class ESPerform(object): class ESPerform(object):
...@@ -112,6 +113,37 @@ class ESPerform(object): ...@@ -112,6 +113,37 @@ class ESPerform(object):
mapping_dict = cls.__load_mapping(sub_index_name) mapping_dict = cls.__load_mapping(sub_index_name)
es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type) es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
@classmethod
def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"):
try:
assert (es_cli is not None)
official_index_name = cls.get_official_index_name(sub_index_name)
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
if not auto_create_index:
logging.error("index:%s is not existing,bulk data error!" % official_index_name)
return False
else:
cls.create_index(es_cli,sub_index_name)
cls.put_index_mapping(es_cli,sub_index_name)
bulk_actions = []
for data in data_list:
bulk_actions.append({
'_op_type': 'index',
'_index': official_index_name,
'_type': doc_type,
'_id': data['id'],
'_source': data,
})
elasticsearch.helpers.bulk(es_cli,bulk_actions)
return True return True
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
...@@ -7,6 +7,7 @@ import logging ...@@ -7,6 +7,7 @@ import logging
import traceback import traceback
import django.db.models import django.db.models
from django.conf import settings from django.conf import settings
from libs.es import ESPerform
import elasticsearch import elasticsearch
import elasticsearch.helpers import elasticsearch.helpers
import sys import sys
...@@ -196,10 +197,11 @@ class TypeInfo(object): ...@@ -196,10 +197,11 @@ class TypeInfo(object):
stage_2_time = time.time() stage_2_time = time.time()
es_result = self.elasticsearch_bulk_insert_data( es_result = ESPerform.es_helpers_bulk(
sub_index_name=sub_index_name, es_cli=es,
data_list=data_list, data_list=data_list,
es=es, sub_index_name=sub_index_name,
auto_create_index=True
) )
stage_3_time = time.time() stage_3_time = time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment