trans2es_data2es_parallel.py 5.14 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys

from libs.es import ESPerform
import trans2es.models as md
from trans2es.utils import topic_transfer
from libs.table_scan import TableSlicer,TableSlicerChunk
from trans2es.type_info import get_type_info_map,TypeInfo

from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
import json


class Job(object):
    __es = None

    def __init__(self, sub_index_name, type_name, chunk):
        assert isinstance(sub_index_name, six.string_types)
        assert isinstance(type_name, six.string_types)
        assert isinstance(chunk, TableSlicerChunk)
        self._sub_index_name = sub_index_name
        self._type_name = type_name
        self._chunk = chunk

    @classmethod
    def get_es(cls):
        if cls.__es is None:
            cls.__es = ESPerform().get_cli()
        return cls.__es

    def __call__(self):
        type_info = get_type_info_map()[self._type_name]
        assert isinstance(type_info, TypeInfo)

        result = type_info.insert_table_chunk(
            sub_index_name=self._sub_index_name,
            table_chunk=self._chunk,
            es=self.get_es(),
        )


class SyncDataToRedis(object):

    @classmethod
    def sync_face_similar_data_to_redis(cls):
        try:
            result_items = FaceUserContrastSimilar.objects.filter(is_online=True,
                                                                  is_deleted=False).distinct().values(
                "participant_user_id").values_list("participant_user_id", flat=True)

            logging.info("duan add,begin sync_face_similar_data_to_redis!")

            redis_key_prefix = "physical:user_similar:participant_user_id:"
            for participant_user_id in result_items:

                redis_key = redis_key_prefix + str(participant_user_id)
                similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False,
                                                                              participant_user_id=participant_user_id,
                                                                              similarity__gt=0.4).order_by(
                    "-similarity").limit(100)

                item_list = list()
                for item in similar_result_items:
                    item_list.append(
                        {
                            "contrast_user_id": item.contrast_user_id,
                            "similarity": item.similarity
                        }
                    )
                redis_client.set(redis_key, json.dumps(item_list))

                logging.info("duan add,participant_user_id:%d set data done!" % participant_user_id)
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())


class Command(BaseCommand):

    args = ''
    help = 'dump data to elasticsearch, parallel'

    from optparse import make_option

    option_list = BaseCommand.option_list + (
        make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''),
        make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
        make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
        make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
        make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
        make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
        make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default='')
    )

    def __sync_data_by_type(self, type_name):
        try:
            type_info = get_type_info_map()[type_name]
            query_set = type_info.queryset

            slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
            for chunk in slicer.chunks():
                job = Job(
                    sub_index_name=type_name,
                    type_name=type_name,
                    chunk=chunk,
                )
                job()
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())

    def handle(self, *args, **options):
        try:
            type_name_list = get_type_info_map().keys()
            for type_name in type_name_list:

                if len(options["type"]):
                    if options["type"] == "all" or type_name==options["type"]:
                        logging.info("begin sync [%s] data to es!" % type_name)
                        self.__sync_data_by_type(type_name)


            if len(options["sync_type"]) and options["sync_type"]=="sync_data_to_es":
                SyncDataToRedis.sync_face_similar_data_to_redis()

        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())