type_info.py 10.1 KB
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import

import time
import datetime
import logging
import traceback
import django.db.models
from django.conf import settings
from libs.es import ESPerform
import elasticsearch
import elasticsearch.helpers
import sys

from trans2es.models import topic,user,pick_celebrity,group,celebrity
from trans2es.utils.user_transfer import UserTransfer
from trans2es.utils.pick_celebrity_transfer import PickCelebrityTransfer
from trans2es.utils.group_transfer import GroupTransfer
from trans2es.utils.topic_transfer import TopicTransfer
from trans2es.utils.celebrity_transfer import CelebrityTransfer
from libs.es import ESPerform

__es = None


def get_elasticsearch_instance():
    global __es
    if __es is None:
        __es = ESPerform.get_cli()
    return __es


def get_es_list_by_type(es_type):
    return [get_elasticsearch_instance()]


class TypeInfo(object):
    def __init__(
            self,
            name,
            type,
            model,
            query_deferred,
            get_data_func,
            bulk_insert_chunk_size,
            round_insert_chunk_size,
            round_insert_period,
            batch_get_data_func=None,  # receive a list of pks, not instance
            logic_database_id=None,
    ):
        self.name = name
        self.type = type
        self.model = model
        self.query_deferred = query_deferred
        self.get_data_func = get_data_func
        self.batch_get_data_func = batch_get_data_func
        self.pk_blacklist = ()
        self.bulk_insert_chunk_size = bulk_insert_chunk_size
        self.round_insert_chunk_size = round_insert_chunk_size
        self.round_insert_period = round_insert_period
        self.logic_database_id = logic_database_id

    @property
    def query(self):
        return self.query_deferred()

    @property
    def queryset(self):
        return django.db.models.QuerySet(model=self.model, query=self.query)

    @property
    def pk_blacklist(self):
        return self.__pk_blacklist

    @pk_blacklist.setter
    def pk_blacklist(self, value):
        self.__pk_blacklist = frozenset(value)

    def bulk_get_data(self, instance_iterable):
        data_list = []
        if self.batch_get_data_func:
            _pk_list = [getattr(instance, 'pk', None) for instance in instance_iterable]
            not_found_pk_list = []
            blacklisted_pk_list = []
            pk_list = []
            for pk in _pk_list:
                if pk is None:
                    not_found_pk_list.append(pk)
                elif pk in self.__pk_blacklist:
                    blacklisted_pk_list.append(pk)
                else:
                    pk_list.append(pk)
            if not_found_pk_list:
                logging.exception('those pks not found for name={}, doc_type={}, pk_list={}'.format(
                    self.name,
                    self.type,
                    str(not_found_pk_list),
                ))
            if blacklisted_pk_list:
                logging.info('those pks are in blacklist for name={}, doc_type={}, pk_list={}'.format(
                    self.name,
                    self.type,
                    str(blacklisted_pk_list),
                ))
            try:
                data_list = self.batch_get_data_func(pk_list)
            except Exception:
                traceback.print_exc()
                logging.exception('bulk_get_data for name={}, doc_type={}, pk_list={}'.format(
                    self.name,
                    self.type,
                    str(pk_list),
                ))
        else:
            for instance in instance_iterable:
                pk = getattr(instance, 'pk', None)
                try:
                    if pk is None:
                        raise Exception('pk not found')
                    if pk in self.__pk_blacklist:
                        logging.info('bulk_get_data for name={}, doc_type={}, pk={}: ignore blacklisted pk'.format(
                            self.name,
                            self.type,
                            pk,
                        ))
                        continue
                    data = self.get_data_func(instance)
                except Exception:
                    traceback.print_exc()
                    logging.exception('bulk_get_data for name={}, doc_type={}, pk={}'.format(
                        self.name,
                        self.type,
                        pk,
                    ))
                else:
                    data_list.append(data)
        return data_list

    def elasticsearch_bulk_insert_data(self, sub_index_name, data_list, es=None):
        if es is None:
            es = get_es_list_by_type(self.type)

        if not isinstance(es, (list, tuple,)):
            es = [es]

        index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write")
        bulk_actions = []
        for data in data_list:
            bulk_actions.append({
                '_op_type': 'index',
                '_index': index,
                '_type': self.type,
                '_id': data['id'],
                '_source': data,
            })

        es_result = None
        if bulk_actions:
            for t in es:
                try:
                    es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions)
                except Exception as e:
                    traceback.print_exc()
                    es_result = 'error'

        return es_result

    def elasticsearch_bulk_insert(self, sub_index_name, instance_iterable, es=None):
        data_list = self.bulk_get_data(instance_iterable)
        return self.elasticsearch_bulk_insert_data(
            sub_index_name=sub_index_name,
            data_list=data_list,
            es=es,
        )

    def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False):
        if use_batch_query_set:
            qs = self.queryset
        else:
            qs = self.model.objects.all()

        instance_list = qs.filter(pk__in=pk_list)
        data_list = self.bulk_get_data(instance_list)
        self.elasticsearch_bulk_insert_data(
            sub_index_name=sub_index_name,
            data_list=data_list,
            es=es,
        )

    def insert_table_chunk(self, sub_index_name, table_chunk, es=None):

        start_clock = time.clock()
        start_time = time.time()

        instance_list = list(table_chunk)

        stage_1_time = time.time()

        data_list = self.bulk_get_data(instance_list)

        stage_2_time = time.time()

        es_result = ESPerform.es_helpers_bulk(
            es_cli=es,
            data_list=data_list,
            sub_index_name=sub_index_name,
            auto_create_index=True
        )

        stage_3_time = time.time()
        end_clock = time.clock()

        return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
                '{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
                '{response}').format(
            datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
            index_prefix=sub_index_name,
            type_name=self.name,
            pk_start=repr(table_chunk.get_pk_start()),
            pk_stop=repr(table_chunk.get_pk_stop()),
            count=len(instance_list),
            stage_1_duration=stage_1_time - start_time,
            stage_2_duration=stage_2_time - stage_1_time,
            stage_3_duration=stage_3_time - stage_2_time,
            clock_duration=end_clock - start_clock,
            response=es_result,
        )


_get_type_info_map_result = None


def get_type_info_map():
    global _get_type_info_map_result
    if _get_type_info_map_result:
        return _get_type_info_map_result

    type_info_list = [
        TypeInfo(
            name='topic',  # 日记
            type='topic',
            model=topic.Topic,
            query_deferred=lambda: topic.Topic.objects.all().query,
            get_data_func=TopicTransfer.get_topic_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name = "user", #用户
            type="user",
            model=user.User,
            #query_deferred=lambda:user.User.objects.all().query,
            #query_deferred=lambda: user.User.objects.prefetch_related('myuserfollow').query,
            query_deferred=lambda: user.User.objects.all().query,
            get_data_func=UserTransfer.get_user_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name="pick_celebrity",  # 打榜明星
            type="pick_celebrity",
            model=pick_celebrity.PickCelebrity,
            # query_deferred=lambda:user.User.objects.all().query,
            query_deferred=lambda: pick_celebrity.PickCelebrity.objects.all().query,
            get_data_func=PickCelebrityTransfer.get_pick_celebrity_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name="celebrity",  # 明星
            type="celebrity",
            model=celebrity.Celebrity,
            # query_deferred=lambda:user.User.objects.all().query,
            query_deferred=lambda: celebrity.Celebrity.objects.all().query,
            get_data_func=CelebrityTransfer.get_celebrity_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name="group",  # 小组
            type="group",
            model=group.Group,
            # query_deferred=lambda:user.User.objects.all().query,
            query_deferred=lambda: group.Group.objects.all().query,
            get_data_func=GroupTransfer.get_group_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        )
    ]

    type_info_map = {
        type_info.name: type_info
        for type_info in type_info_list
    }

    _get_type_info_map_result = type_info_map
    return type_info_map