# coding=utf-8 from __future__ import unicode_literals, print_function, absolute_import import time import datetime import logging import traceback import django.db.models from django.conf import settings from libs.es import ESPerform import elasticsearch import elasticsearch.helpers import sys from libs.cache import redis_client import copy from django.conf import settings from trans2es.models import topic, user, pick_celebrity, group, celebrity, tag, contrast_similar, pictorial, product from trans2es.utils.user_transfer import UserTransfer from trans2es.utils.pick_celebrity_transfer import PickCelebrityTransfer from trans2es.utils.group_transfer import GroupTransfer from trans2es.utils.topic_transfer import TopicTransfer from trans2es.utils.excellect_topic_transfer import ExcellectTopicTransfer from trans2es.utils.pictorial_transfer import PictorialTransfer from trans2es.utils.celebrity_transfer import CelebrityTransfer from trans2es.utils.tag_transfer import TagTransfer from trans2es.utils.contrast_similar_transfer import Contrast_Similar_Transfer from trans2es.utils.product_transfer import ProductTransfer __es = None def get_elasticsearch_instance(): global __es if __es is None: __es = ESPerform.get_cli() return __es def get_es_list_by_type(es_type): return [get_elasticsearch_instance()] class TypeInfo(object): def __init__( self, name, type, model, query_deferred, get_data_func, bulk_insert_chunk_size, round_insert_chunk_size, round_insert_period, batch_get_data_func=None, # receive a list of pks, not instance logic_database_id=None, ): self.name = name self.type = type self.model = model self.query_deferred = query_deferred self.get_data_func = get_data_func self.batch_get_data_func = batch_get_data_func self.pk_blacklist = () self.bulk_insert_chunk_size = bulk_insert_chunk_size self.round_insert_chunk_size = round_insert_chunk_size self.round_insert_period = round_insert_period self.logic_database_id = logic_database_id self.physical_topic_star = "physical:topic_star" @property def query(self): return self.query_deferred() @property def queryset(self): return django.db.models.QuerySet(model=self.model, query=self.query) @property def pk_blacklist(self): return self.__pk_blacklist @pk_blacklist.setter def pk_blacklist(self, value): self.__pk_blacklist = frozenset(value) def bulk_get_data(self, instance_iterable): data_list = [] # 4星以上帖子单独索引 topic_data_high_star_list = list() if self.batch_get_data_func: _pk_list = [getattr(instance, 'pk', None) for instance in instance_iterable] not_found_pk_list = [] blacklisted_pk_list = [] pk_list = [] for pk in _pk_list: if pk is None: not_found_pk_list.append(pk) elif pk in self.__pk_blacklist: blacklisted_pk_list.append(pk) else: pk_list.append(pk) if not_found_pk_list: logging.exception('those pks not found for name={}, doc_type={}, pk_list={}'.format( self.name, self.type, str(not_found_pk_list), )) if blacklisted_pk_list: logging.info('those pks are in blacklist for name={}, doc_type={}, pk_list={}'.format( self.name, self.type, str(blacklisted_pk_list), )) try: data_list = self.batch_get_data_func(pk_list) except Exception: traceback.print_exc() logging.exception('bulk_get_data for name={}, doc_type={}, pk_list={}'.format( self.name, self.type, str(pk_list), )) else: for instance in instance_iterable: pk = getattr(instance, 'pk', None) try: if pk is None: raise Exception('pk not found') if pk in self.__pk_blacklist: logging.info('bulk_get_data for name={}, doc_type={}, pk={}: ignore blacklisted pk'.format( self.name, self.type, pk, )) continue data = self.get_data_func(instance) except Exception: traceback.print_exc() logging.exception('bulk_get_data for name={}, doc_type={}, pk={}'.format( self.name, self.type, pk, )) else: if data: if self.type == "topic": ori_topic_star = redis_client.hget(self.physical_topic_star, data["id"]) if ori_topic_star: ori_topic_star = str(ori_topic_star, encoding="utf-8") if not ori_topic_star: redis_client.hset(self.physical_topic_star, data["id"], data["content_level"]) else: int_ori_topic_star = int(ori_topic_star) if int_ori_topic_star != data["content_level"]: old_data = copy.deepcopy(data) old_data["is_online"] = False old_data["is_deleted"] = True old_data["content_level"] = int_ori_topic_star old_data["is_history"] = True data_list.append(old_data) if int_ori_topic_star >= 4: topic_data_high_star_list.append(old_data) redis_client.hset(self.physical_topic_star, data["id"], data["content_level"]) if data["content_level"] and int(data["content_level"]) >= 4: topic_data_high_star_list.append(data) elif self.type == "tag" or self.type == "tag_v1": (res, begin_res) = data data_list.append(res) data_list.append(begin_res) else: data_list.append(data) return (data_list, topic_data_high_star_list) def elasticsearch_bulk_insert_data(self, sub_index_name, data_list, es=None): # assert (es is not None) # index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write") # bulk_actions = [] # for data in data_list: # bulk_actions.append({ # '_op_type': 'index', # '_index': index, # '_type': "_doc", # '_id': data['id'], # '_source': data, # }) # # es_result = None # if bulk_actions: # for t in es: # try: # es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions) # except Exception as e: # traceback.print_exc() # es_result = 'error' return ESPerform.es_helpers_bulk(es, data_list, sub_index_name) def elasticsearch_bulk_insert(self, sub_index_name, instance_iterable, es=None): data_list, topic_data_high_star_list = self.bulk_get_data(instance_iterable) return self.elasticsearch_bulk_insert_data( sub_index_name=sub_index_name, data_list=data_list, es=es, ) def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False): begin = time.time() if use_batch_query_set: qs = self.queryset else: qs = self.model.objects.all() end = time.time() time0 = end - begin begin = time.time() instance_list = qs.filter(pk__in=pk_list) end = time.time() time1 = end - begin begin = time.time() data_list, topic_data_high_star_list = self.bulk_get_data(instance_list) end = time.time() time2 = end - begin begin = time.time() # logging.info("get sub_index_name:%s"%sub_index_name) # logging.info("get data_list:%s"%data_list) self.elasticsearch_bulk_insert_data( sub_index_name=sub_index_name, data_list=data_list, es=es, ) if sub_index_name == "topic": self.elasticsearch_bulk_insert_data( sub_index_name="topic-star-routing", data_list=data_list, es=es, ) # 同时写4星及以上的帖子 if len(topic_data_high_star_list) > 0: self.elasticsearch_bulk_insert_data( sub_index_name="topic-high-star", data_list=topic_data_high_star_list, es=es, ) end = time.time() time3 = end - begin logging.info("duan add,insert_table_by_pk_list time cost:%ds,%ds,%ds,%ds" % (time0, time1, time2, time3)) def insert_table_chunk(self, sub_index_name, table_chunk, es=None): try: start_clock = time.clock() start_time = time.time() instance_list = list(table_chunk) stage_1_time = time.time() data_list, topic_data_high_star_list = self.bulk_get_data(instance_list) stage_2_time = time.time() es_result = ESPerform.es_helpers_bulk( es_cli=es, data_list=data_list, sub_index_name=sub_index_name, auto_create_index=True ) logging.info("es_helpers_bulk,sub_index_name:%s,data_list len:%d" % (sub_index_name, len(data_list))) stage_3_time = time.time() end_clock = time.clock() return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} ' '{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} ' '{response}').format( datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'), index_prefix=sub_index_name, type_name=self.name, pk_start=repr(table_chunk.get_pk_start()), pk_stop=repr(table_chunk.get_pk_stop()), count=len(instance_list), stage_1_duration=stage_1_time - start_time, stage_2_duration=stage_2_time - stage_1_time, stage_3_duration=stage_3_time - stage_2_time, clock_duration=end_clock - start_clock, response=es_result, ) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) return None _get_type_info_map_result = None def get_type_info_map(): global _get_type_info_map_result if _get_type_info_map_result: return _get_type_info_map_result type_info_list = [ TypeInfo( name='topic-star', type='topic-star', model=topic.Topic, query_deferred=lambda: topic.Topic.objects.using(settings.SLAVE2_DB_NAME).all().query, # 假的 get_data_func=TopicTransfer.get_topic_data, # 假的 bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name='topic-star-routing', type='topic-star-routing', model=topic.Topic, query_deferred=lambda: topic.Topic.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TopicTransfer.get_topic_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name='topic-high-star', # >=4星日记 type='topic-high-star', model=topic.Topic, query_deferred=lambda: topic.Topic.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TopicTransfer.get_topic_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name='excellect-topic', # 优质帖子 type='excellect-topic', model=topic.ExcellentTopic, query_deferred=lambda: topic.ExcellentTopic.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=ExcellectTopicTransfer.get_excellect_topic_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name='topic', # 日记 type='topic', model=topic.Topic, query_deferred=lambda: topic.Topic.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TopicTransfer.get_topic_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name='topic-v1', # 日记 type='topic-v1', model=topic.Topic, query_deferred=lambda: topic.Topic.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TopicTransfer.get_topic_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name="user", # 用户 type="user", model=user.User, # query_deferred=lambda:user.User.objects.all().query, # query_deferred=lambda: user.User.objects.prefetch_related('myuserfollow').query, query_deferred=lambda: user.User.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=UserTransfer.get_user_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), # TypeInfo( # name="pick_celebrity", # 打榜明星 # type="pick_celebrity", # model=pick_celebrity.PickCelebrity, # # query_deferred=lambda:user.User.objects.all().query, # query_deferred=lambda: pick_celebrity.PickCelebrity.objects.all().query, # get_data_func=PickCelebrityTransfer.get_pick_celebrity_data, # bulk_insert_chunk_size=100, # round_insert_chunk_size=5, # round_insert_period=2, # ), TypeInfo( name="celebrity", # 明星 type="celebrity", model=celebrity.Celebrity, # query_deferred=lambda:user.User.objects.all().query, query_deferred=lambda: celebrity.Celebrity.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=CelebrityTransfer.get_celebrity_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name="group", # 小组 type="group", model=group.Group, # query_deferred=lambda:user.User.objects.all().query, query_deferred=lambda: group.Group.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=GroupTransfer.get_group_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name="tag", # 标签 type="tag", model=tag.Tag, query_deferred=lambda: tag.Tag.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TagTransfer.get_tag_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name="tag_v1", # 标签 type="tag_v1", model=tag.Tag, query_deferred=lambda: tag.Tag.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TagTransfer.get_tag_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name="tag-name", # 标签名字 type="tag-name", model=tag.Tag, query_deferred=lambda: tag.Tag.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=TagTransfer.get_tag_name_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), TypeInfo( name='contrast_similar', # facesimilar type='contrast_similar', model=contrast_similar.ContrastSimilar, query_deferred=lambda: contrast_similar.ContrastSimilar.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=Contrast_Similar_Transfer.get_contrast_similar_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2 ), TypeInfo( name="pictorial", # 画报 type="pictorial", model=pictorial.Pictorial, query_deferred=lambda: pictorial.Pictorial.objects.using(settings.SLAVE2_DB_NAME).all().query, get_data_func=PictorialTransfer.get_poctorial_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ), # TypeInfo( # name="account_user_tag", # 用户标签 # type="account_user_tag", # model=pictorial.Pictorial, # query_deferred=lambda: pictorial.Pictorial.objects.all().query, # get_data_func=PictorialTransfer.get_poctorial_data, # bulk_insert_chunk_size=100, # round_insert_chunk_size=5, # round_insert_period=2, # ) TypeInfo( name="product", # 商品 type="product", model=product.CommodityProduct, query_deferred=lambda: product.CommodityProduct.objects.all().query, get_data_func=ProductTransfer.get_product_data, bulk_insert_chunk_size=100, round_insert_chunk_size=5, round_insert_period=2, ) ] type_info_map = { type_info.name: type_info for type_info in type_info_list } _get_type_info_map_result = type_info_map return type_info_map