trans2es_data2es_parallel.py 2.98 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys

from libs.es import ESPerform
import trans2es.models as md
from trans2es.utils import topic_transfer
from libs.table_scan import TableSlicer,TableSlicerChunk
from trans2es.type_info import get_type_info_map,TypeInfo

class Job(object):
    __es = None

    def __init__(self, sub_index_name, type_name, chunk):
        assert isinstance(sub_index_name, six.string_types)
        assert isinstance(type_name, six.string_types)
        assert isinstance(chunk, TableSlicerChunk)
        self._sub_index_name = sub_index_name
        self._type_name = type_name
        self._chunk = chunk

    @classmethod
    def get_es(cls):
        if cls.__es is None:
            cls.__es = ESPerform().get_cli()
        return cls.__es

    def __call__(self):
        type_info = get_type_info_map()[self._type_name]
        assert isinstance(type_info, TypeInfo)

        result = type_info.insert_table_chunk(
            sub_index_name=self._sub_index_name,
            table_chunk=self._chunk,
            es=self.get_es(),
        )

class Command(BaseCommand):

    args = ''
    help = 'dump data to elasticsearch, parallel'

    from optparse import make_option

    option_list = BaseCommand.option_list + (
        make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''),
        make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
        make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
        make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
        make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
        make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
    )

    def handle(self, *args, **options):
        try:
            type_name_list = get_type_info_map().keys()
            for type_name in type_name_list:

                if len(options["type"]) and type_name!=options["type"]:
                    logging.warning("type_name:%s can not need to execute!" % type_name)
                    continue

                type_info  = get_type_info_map()[type_name]
                query_set = type_info.queryset

                slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
                for chunk in slicer.chunks():
                    job = Job(
                        sub_index_name=type_name,
                        type_name=type_name,
                        chunk=chunk,
                    )
                    job()

        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())