Commit 85aa31b6 authored by 段英荣's avatar 段英荣

add

parent d6acc0fb
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys
from libs.es import ESPerform
import trans2es.models as md
from libs.table_scan import TableSlicer,TableSlicerChunk
from trans2es.type_info import get_type_info_map,TypeInfo
from libs.cache import redis_client
import json
class Job(object):
__es = None
def __init__(self, sub_index_name, type_name, chunk):
assert isinstance(sub_index_name, six.string_types)
assert isinstance(type_name, six.string_types)
assert isinstance(chunk, TableSlicerChunk)
self._sub_index_name = sub_index_name
self._type_name = type_name
self._chunk = chunk
@classmethod
def get_es(cls):
if cls.__es is None:
cls.__es = ESPerform().get_cli()
return cls.__es
def __call__(self):
type_info = get_type_info_map()[self._type_name]
assert isinstance(type_info, TypeInfo)
result = type_info.insert_table_chunk(
sub_index_name=self._sub_index_name,
table_chunk=self._chunk,
es=self.get_es(),
)
class Command(BaseCommand):
args = ''
help = 'dump data to elasticsearch, parallel'
from optparse import make_option
option_list = BaseCommand.option_list + (
make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''),
make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default='')
)
def __sync_data_by_type(self, type_name):
try:
type_info = get_type_info_map()[type_name]
query_set = type_info.queryset
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
job = Job(
sub_index_name=type_name,
type_name=type_name,
chunk=chunk,
)
job()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def handle(self, *args, **options):
try:
type_name_list = get_type_info_map().keys()
for type_name in type_name_list:
if len(options["type"]):
if options["type"] == "all" or type_name==options["type"]:
logging.info("begin sync [%s] data to es!" % type_name)
self.__sync_data_by_type(type_name)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -131,7 +131,6 @@ class TypeInfo(object):
"offline_score": suggest_item["word_weight"]
}
data_list.append(suggest_dict)
except Exception:
traceback.print_exc()
logging.exception('bulk_get_data for name={}, doc_type={}, pk={}'.format(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment