type_info.py 11.8 KB
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import

import time
import datetime
import logging
import traceback
import django.db.models
from django.conf import settings
from libs.es import ESPerform
import elasticsearch
import elasticsearch.helpers
import sys

from trans2es.models import topic, user, pick_celebrity, group, celebrity, tag, contrast_similar
from trans2es.utils.user_transfer import UserTransfer
from trans2es.utils.pick_celebrity_transfer import PickCelebrityTransfer
from trans2es.utils.group_transfer import GroupTransfer
from trans2es.utils.topic_transfer import TopicTransfer
from trans2es.utils.celebrity_transfer import CelebrityTransfer
from trans2es.utils.tag_transfer import TagTransfer
from trans2es.utils.contrast_similar_transfer import Contrast_Similar_Transfer
from libs.es import ESPerform

__es = None


def get_elasticsearch_instance():
    global __es
    if __es is None:
        __es = ESPerform.get_cli()
    return __es


def get_es_list_by_type(es_type):
    return [get_elasticsearch_instance()]


class TypeInfo(object):
    def __init__(
            self,
            name,
            type,
            model,
            query_deferred,
            get_data_func,
            bulk_insert_chunk_size,
            round_insert_chunk_size,
            round_insert_period,
            batch_get_data_func=None,  # receive a list of pks, not instance
            logic_database_id=None,
    ):
        self.name = name
        self.type = type
        self.model = model
        self.query_deferred = query_deferred
        self.get_data_func = get_data_func
        self.batch_get_data_func = batch_get_data_func
        self.pk_blacklist = ()
        self.bulk_insert_chunk_size = bulk_insert_chunk_size
        self.round_insert_chunk_size = round_insert_chunk_size
        self.round_insert_period = round_insert_period
        self.logic_database_id = logic_database_id

    @property
    def query(self):
        return self.query_deferred()

    @property
    def queryset(self):
        return django.db.models.QuerySet(model=self.model, query=self.query)

    @property
    def pk_blacklist(self):
        return self.__pk_blacklist

    @pk_blacklist.setter
    def pk_blacklist(self, value):
        self.__pk_blacklist = frozenset(value)

    def bulk_get_data(self, instance_iterable):
        data_list = []
        if self.batch_get_data_func:
            _pk_list = [getattr(instance, 'pk', None) for instance in instance_iterable]
            not_found_pk_list = []
            blacklisted_pk_list = []
            pk_list = []
            for pk in _pk_list:
                if pk is None:
                    not_found_pk_list.append(pk)
                elif pk in self.__pk_blacklist:
                    blacklisted_pk_list.append(pk)
                else:
                    pk_list.append(pk)
            if not_found_pk_list:
                logging.exception('those pks not found for name={}, doc_type={}, pk_list={}'.format(
                    self.name,
                    self.type,
                    str(not_found_pk_list),
                ))
            if blacklisted_pk_list:
                logging.info('those pks are in blacklist for name={}, doc_type={}, pk_list={}'.format(
                    self.name,
                    self.type,
                    str(blacklisted_pk_list),
                ))
            try:
                data_list = self.batch_get_data_func(pk_list)
            except Exception:
                traceback.print_exc()
                logging.exception('bulk_get_data for name={}, doc_type={}, pk_list={}'.format(
                    self.name,
                    self.type,
                    str(pk_list),
                ))
        else:
            for instance in instance_iterable:
                pk = getattr(instance, 'pk', None)
                try:
                    if pk is None:
                        raise Exception('pk not found')
                    if pk in self.__pk_blacklist:
                        logging.info('bulk_get_data for name={}, doc_type={}, pk={}: ignore blacklisted pk'.format(
                            self.name,
                            self.type,
                            pk,
                        ))
                        continue
                    data = self.get_data_func(instance)
                except Exception:
                    traceback.print_exc()
                    logging.exception('bulk_get_data for name={}, doc_type={}, pk={}'.format(
                        self.name,
                        self.type,
                        pk,
                    ))
                else:
                    data_list.append(data)
        return data_list

    def elasticsearch_bulk_insert_data(self, sub_index_name, data_list, es=None):

        # assert (es is not None)
        # index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write")
        # bulk_actions = []
        # for data in data_list:
        #     bulk_actions.append({
        #         '_op_type': 'index',
        #         '_index': index,
        #         '_type': "_doc",
        #         '_id': data['id'],
        #         '_source': data,
        #     })
        #
        # es_result = None
        # if bulk_actions:
        #     for t in es:
        #         try:
        #             es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions)
        #         except Exception as e:
        #             traceback.print_exc()
        #             es_result = 'error'

        return ESPerform.es_helpers_bulk(es, data_list, sub_index_name, True)

    def elasticsearch_bulk_insert(self, sub_index_name, instance_iterable, es=None):
        data_list = self.bulk_get_data(instance_iterable)
        return self.elasticsearch_bulk_insert_data(
            sub_index_name=sub_index_name,
            data_list=data_list,
            es=es,
        )

    def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False):

        begin = time.time()
        if use_batch_query_set:
            qs = self.queryset
        else:
            qs = self.model.objects.all()
        end = time.time()
        time0=end-begin

        begin = time.time()
        instance_list = qs.filter(pk__in=pk_list)
        end = time.time()
        time1=end-begin

        begin = time.time()
        data_list = self.bulk_get_data(instance_list)
        end = time.time()
        time2=end-begin

        begin = time.time()
        self.elasticsearch_bulk_insert_data(
            sub_index_name=sub_index_name,
            data_list=data_list,
            es=es,
        )
        end = time.time()
        time3=end-begin
        logging.info("duan add,insert_table_by_pk_list time cost:%ds,%ds,%ds,%ds" % (time0,time1,time2,time3))


    def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
        try:
            start_clock = time.clock()
            start_time = time.time()

            instance_list = list(table_chunk)

            stage_1_time = time.time()

            data_list = self.bulk_get_data(instance_list)

            stage_2_time = time.time()

            es_result = ESPerform.es_helpers_bulk(
                es_cli=es,
                data_list=data_list,
                sub_index_name=sub_index_name,
                auto_create_index=True
            )

            logging.info("es_helpers_bulk,sub_index_name:%s,data_list len:%d" % (sub_index_name,len(data_list)))

            stage_3_time = time.time()
            end_clock = time.clock()

            return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
                    '{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
                    '{response}').format(
                datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
                index_prefix=sub_index_name,
                type_name=self.name,
                pk_start=repr(table_chunk.get_pk_start()),
                pk_stop=repr(table_chunk.get_pk_stop()),
                count=len(instance_list),
                stage_1_duration=stage_1_time - start_time,
                stage_2_duration=stage_2_time - stage_1_time,
                stage_3_duration=stage_3_time - stage_2_time,
                clock_duration=end_clock - start_clock,
                response=es_result,
            )
        except:
            logging.error("catch exception,err_msg:%s" % traceback.format_exc())
            return None


_get_type_info_map_result = None


def get_type_info_map():
    global _get_type_info_map_result
    if _get_type_info_map_result:
        return _get_type_info_map_result

    type_info_list = [
        TypeInfo(
            name='topic',  # 日记
            type='topic',
            model=topic.Topic,
            query_deferred=lambda: topic.Topic.objects.all().query,
            get_data_func=TopicTransfer.get_topic_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name="user",  # 用户
            type="user",
            model=user.User,
            # query_deferred=lambda:user.User.objects.all().query,
            # query_deferred=lambda: user.User.objects.prefetch_related('myuserfollow').query,
            query_deferred=lambda: user.User.objects.all().query,
            get_data_func=UserTransfer.get_user_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        # TypeInfo(
        #     name="pick_celebrity",  # 打榜明星
        #     type="pick_celebrity",
        #     model=pick_celebrity.PickCelebrity,
        #     # query_deferred=lambda:user.User.objects.all().query,
        #     query_deferred=lambda: pick_celebrity.PickCelebrity.objects.all().query,
        #     get_data_func=PickCelebrityTransfer.get_pick_celebrity_data,
        #     bulk_insert_chunk_size=100,
        #     round_insert_chunk_size=5,
        #     round_insert_period=2,
        # ),
        TypeInfo(
            name="celebrity",  # 明星
            type="celebrity",
            model=celebrity.Celebrity,
            # query_deferred=lambda:user.User.objects.all().query,
            query_deferred=lambda: celebrity.Celebrity.objects.all().query,
            get_data_func=CelebrityTransfer.get_celebrity_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name="group",  # 小组
            type="group",
            model=group.Group,
            # query_deferred=lambda:user.User.objects.all().query,
            query_deferred=lambda: group.Group.objects.all().query,
            get_data_func=GroupTransfer.get_group_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name="tag",  # 标签
            type="tag",
            model=tag.Tag,
            query_deferred=lambda: tag.Tag.objects.all().query,
            get_data_func=TagTransfer.get_tag_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2,
        ),
        TypeInfo(
            name='contrast_similar',  # facesimilar
            type='contrast_similar',
            model=contrast_similar.ContrastSimilar,
            query_deferred=lambda: contrast_similar.ContrastSimilar.objects.all().query,
            get_data_func=Contrast_Similar_Transfer.get_contrast_similar_data,
            bulk_insert_chunk_size=100,
            round_insert_chunk_size=5,
            round_insert_period=2
        )
    ]

    type_info_map = {
        type_info.name: type_info
        for type_info in type_info_list
    }

    _get_type_info_map_result = type_info_map
    return type_info_map