Commit bbc4af25 authored by 段英荣's avatar 段英荣

modify

parent c20466d3
......@@ -18,11 +18,11 @@ from trans2es.type_info import get_type_info_map,TypeInfo
class Job(object):
__es = None
def __init__(self, index_prefix, type_name, chunk):
assert isinstance(index_prefix, six.string_types)
def __init__(self, sub_index_name, type_name, chunk):
assert isinstance(sub_index_name, six.string_types)
assert isinstance(type_name, six.string_types)
assert isinstance(chunk, TableSlicerChunk)
self._index_prefix = index_prefix
self._sub_index_name = sub_index_name
self._type_name = type_name
self._chunk = chunk
......@@ -37,7 +37,7 @@ class Job(object):
assert isinstance(type_info, TypeInfo)
result = type_info.insert_table_chunk(
index_prefix=self._index_prefix,
sub_index_name=self._sub_index_name,
table_chunk=self._chunk,
es=self.get_es(),
)
......@@ -60,7 +60,6 @@ class Command(BaseCommand):
def handle(self, *args, **options):
try:
index_prefix = "gm-dbmw"
#get_data_func = diary_transfer.DiaryTransfer
#query_set = django.db.models.QuerySet(model=md.Diary,query=md.Diary.objects.all().query)
......@@ -73,7 +72,7 @@ class Command(BaseCommand):
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
job = Job(
index_prefix=index_prefix,
sub_index_name=type_name,
type_name=type_name,
chunk=chunk,
)
......
......@@ -132,14 +132,14 @@ class TypeInfo(object):
data_list.append(data)
return data_list
def elasticsearch_bulk_insert_data(self, index_prefix, data_list, es=None):
def elasticsearch_bulk_insert_data(self, sub_index_name, data_list, es=None):
if es is None:
es = get_es_list_by_type(self.type)
if not isinstance(es, (list, tuple,)):
es = [es]
index = ESPerform.get_official_index_name(sub_index_name=index_prefix,index_flag="write")
index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write")
bulk_actions = []
for data in data_list:
bulk_actions.append({
......@@ -161,15 +161,15 @@ class TypeInfo(object):
return es_result
def elasticsearch_bulk_insert(self, index_prefix, instance_iterable, es=None):
def elasticsearch_bulk_insert(self, sub_index_name, instance_iterable, es=None):
data_list = self.bulk_get_data(instance_iterable)
return self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
def insert_table_by_pk_list(self, index_prefix, pk_list, es=None, use_batch_query_set=False):
def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False):
if use_batch_query_set:
qs = self.queryset
else:
......@@ -178,12 +178,12 @@ class TypeInfo(object):
instance_list = qs.filter(pk__in=pk_list)
data_list = self.bulk_get_data(instance_list)
self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
def insert_table_chunk(self, index_prefix, table_chunk, es=None):
def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
start_clock = time.clock()
start_time = time.time()
......@@ -197,7 +197,7 @@ class TypeInfo(object):
stage_2_time = time.time()
es_result = self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment