# -*- coding: UTF-8 -*- import elasticsearch import elasticsearch.helpers from django.conf import settings from data_sync.question.connections import pk_data_source as question_pk_data_source from data_sync.question.tran2es import get_questions from data_sync.qa_top import get_qa_tops, pk_data_source as qa_top_pk_data_source from data_sync.topic import get_problems from data_sync.topic import pk_data_source as topic_pk_data_source from data_sync.topic import index_data_source as topic_index_data_source from data_sync.answer import pk_data_source as answer_pk_data_source from data_sync.answer import get_answers from data_sync.article import pk_data_source as article_pk_data_source from data_sync.article import get_articles from data_sync.utils import get_es_instance from data_sync.utils import es_index_adapt from qa.models import Question, Answer, AnswerTop from talos.models.topic import Problem from talos.models.topic.column import Article from utils.rpc import logging_exception from data_sync.tractate import get_tractate from data_sync.doctor_tractate import get_soft_article from talos.models.tractate import Tractate from talos.models.soft_article import SoftArticle from data_sync.tractate import pk_data_source as tractate_pk_data_source from data_sync.principal_page import get_word_tractate, get_video_tractate, get_live_stream, get_article, get_activity, \ get_live_notice from data_sync.principal_page import pk_data_source as principal_pk_data_source from talos.models.subscript_article import SubscriptArticle from data_sync.subscript_article import pk_data_source as subscript_article_pk_data_source, \ get_subscript_article from live.models import LiveStream, LiveChannel, ZhiboConfig from talos.models.topic.column import Article from talos.models.topic.topic import Problem from talos.models.topic.activity import Activity from data_sync.doctor_tractate import pk_data_source as soft_article_pk_data_source import logging __type_info_map = None __type_info_map = None def get_type_info_map(): global __type_info_map if __type_info_map is not None: return __type_info_map __type_info_map = { 'question': TypeInfo( name='question', # 问答 type='question', model=Question, bulk_insert_chunk_size=50, round_insert_chunk_size=50, round_insert_period=12, logic_database_id=settings.HERA_READ_DB, batch_get_data_func=get_questions, pk_data_source=question_pk_data_source, ), 'topic': TypeInfo( name='topic', # 帖子 type='topic', model=Problem, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_problems, pk_data_source=topic_pk_data_source, index_data_source=topic_index_data_source, ), 'answer': TypeInfo( name="answer", type="answer", model=Answer, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_answers, pk_data_source=answer_pk_data_source ), 'article': TypeInfo( name="article", type="article", model=Article, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_articles, pk_data_source=article_pk_data_source ), 'subscript_article': TypeInfo( name="subscript_article", type="subscript_article", model=SubscriptArticle, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_subscript_article, pk_data_source=subscript_article_pk_data_source ), 'qa_top': TypeInfo( name="qa_top", type="qa_top", model=AnswerTop, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_qa_tops, pk_data_source=qa_top_pk_data_source ), 'tractate': TypeInfo( name="tractate", type="tractate", model=Tractate, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_tractate, pk_data_source=tractate_pk_data_source ), 'live_stream': TypeInfo( name="live_stream", type="live_stream", model=LiveStream, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_live_stream, pk_data_source=principal_pk_data_source ), 'live_notice': TypeInfo( name="live_notice", type="live_notice", model=ZhiboConfig, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_live_notice, pk_data_source=principal_pk_data_source ), 'principal_activity': TypeInfo( name="principal_activity", type="principal_activity", model=Activity, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_activity, pk_data_source=principal_pk_data_source ), 'principal_article': TypeInfo( name="principal_article", type="principal_article", model=Problem, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_article, pk_data_source=principal_pk_data_source ), 'video_tractate': TypeInfo( name="video_tractate", type="video_tractate", model=SoftArticle, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_video_tractate, pk_data_source=principal_pk_data_source ), 'word_tractate': TypeInfo( name="word_tractate", type="word_tractate", model=SoftArticle, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_word_tractate, pk_data_source=principal_pk_data_source ), 'doctortractate': TypeInfo( name="doctortractate", type="doctortractate", model=SoftArticle, bulk_insert_chunk_size=500, round_insert_chunk_size=50, round_insert_period=12, batch_get_data_func=get_soft_article, pk_data_source=soft_article_pk_data_source ), } return __type_info_map class TypeInfo(object): def __init__( self, name, type, model, bulk_insert_chunk_size, round_insert_chunk_size, round_insert_period, batch_get_data_func, pk_data_source, index_data_source=None, gm_mq_endpoint=None, logic_database_id=None ): self.name = name self.type = type self.model = model self.batch_get_data_func = batch_get_data_func self.pk_blacklist = () self.bulk_insert_chunk_size = bulk_insert_chunk_size self.round_insert_chunk_size = round_insert_chunk_size self.round_insert_period = round_insert_period self.gm_mq_endpoint = gm_mq_endpoint self.logic_database_id = logic_database_id self.pk_data_source = pk_data_source self.index_data_source = index_data_source def elasticsearch_bulk_insert_data(self, index_prefix, data_list, es=None): index_type = self.type if self.type == 'principal_article' or self.type == 'principal_activity' \ or self.type == 'live_notice' or self.type == 'live_stream' \ or self.type == "word_tractate" or self.type == "video_tractate": index_type = 'principal' index = es_index_adapt( index_prefix=index_prefix, doc_type=index_type, rw='write', ) bulk_actions = [] if self.type == 'principal_article' or self.type == 'principal_activity' \ or self.type == 'live_notice' or self.type == 'live_stream' \ or self.type == "word_tractate" or self.type == "video_tractate": for data in data_list: bulk_actions.append({ '_op_type': 'index', '_index': index, '_type': "_doc", '_id': data['id'] + data["principal_type"] * 10000000, '_source': data, }) else: for data in data_list: bulk_actions.append({ '_op_type': 'index', '_index': index, '_type': "_doc", '_id': data['id'], '_source': data, }) if es is None: es = get_es_instance() es_result = None if bulk_actions: try: es_result = elasticsearch.helpers.bulk(client=es, actions=bulk_actions) except Exception as e: logging_exception() es_result = 'error' return es_result def insert_table_by_pk_list(self, index_prefix, pk_list, es=None): data_list = self.batch_get_data_func(pk_list) self.elasticsearch_bulk_insert_data( index_prefix=index_prefix, data_list=data_list, es=es, )