1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# -*- coding: UTF-8 -*-
import elasticsearch
import elasticsearch.helpers
from django.conf import settings
from data_sync.question.connections import pk_data_source as question_pk_data_source
from data_sync.question.tran2es import get_questions
from data_sync.qa_top import get_qa_tops, pk_data_source as qa_top_pk_data_source
from data_sync.topic import get_problems
from data_sync.topic import pk_data_source as topic_pk_data_source
from data_sync.topic import index_data_source as topic_index_data_source
from data_sync.answer import pk_data_source as answer_pk_data_source
from data_sync.answer import get_answers
from data_sync.article import pk_data_source as article_pk_data_source
from data_sync.article import get_articles
from data_sync.utils import get_es_instance
from data_sync.utils import es_index_adapt
from qa.models import Question, Answer, AnswerTop
from talos.models.topic import Problem
from talos.models.topic.column import Article
from utils.rpc import logging_exception
from data_sync.tractate import get_tractate
from data_sync.doctor_tractate import get_soft_article
from talos.models.tractate import Tractate
from talos.models.soft_article import SoftArticle
from data_sync.tractate import pk_data_source as tractate_pk_data_source
from data_sync.principal_page import get_word_tractate, get_video_tractate, get_live_stream, get_article, get_activity, \
get_live_notice
from data_sync.principal_page import pk_data_source as principal_pk_data_source
from talos.models.subscript_article import SubscriptArticle
from data_sync.subscript_article import pk_data_source as subscript_article_pk_data_source, \
get_subscript_article
from live.models import LiveStream, LiveChannel, ZhiboConfig
from talos.models.topic.column import Article
from talos.models.topic.topic import Problem
from talos.models.topic.activity import Activity
from data_sync.doctor_tractate import pk_data_source as soft_article_pk_data_source
import logging
__type_info_map = None
__type_info_map = None
def get_type_info_map():
global __type_info_map
if __type_info_map is not None:
return __type_info_map
__type_info_map = {
'question': TypeInfo(
name='question', # 问答
type='question',
model=Question,
bulk_insert_chunk_size=50,
round_insert_chunk_size=50,
round_insert_period=12,
logic_database_id=settings.HERA_READ_DB,
batch_get_data_func=get_questions,
pk_data_source=question_pk_data_source,
),
'topic': TypeInfo(
name='topic', # 帖子
type='topic',
model=Problem,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_problems,
pk_data_source=topic_pk_data_source,
index_data_source=topic_index_data_source,
),
'answer': TypeInfo(
name="answer",
type="answer",
model=Answer,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_answers,
pk_data_source=answer_pk_data_source
),
'article': TypeInfo(
name="article",
type="article",
model=Article,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_articles,
pk_data_source=article_pk_data_source
),
'subscript_article': TypeInfo(
name="subscript_article",
type="subscript_article",
model=SubscriptArticle,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_subscript_article,
pk_data_source=subscript_article_pk_data_source
),
'qa_top': TypeInfo(
name="qa_top",
type="qa_top",
model=AnswerTop,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_qa_tops,
pk_data_source=qa_top_pk_data_source
),
'tractate': TypeInfo(
name="tractate",
type="tractate",
model=Tractate,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_tractate,
pk_data_source=tractate_pk_data_source
),
'live_stream': TypeInfo(
name="live_stream",
type="live_stream",
model=LiveStream,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_live_stream,
pk_data_source=principal_pk_data_source
),
'live_notice': TypeInfo(
name="live_notice",
type="live_notice",
model=ZhiboConfig,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_live_notice,
pk_data_source=principal_pk_data_source
),
'principal_activity': TypeInfo(
name="principal_activity",
type="principal_activity",
model=Activity,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_activity,
pk_data_source=principal_pk_data_source
),
'principal_article': TypeInfo(
name="principal_article",
type="principal_article",
model=Problem,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_article,
pk_data_source=principal_pk_data_source
),
'video_tractate': TypeInfo(
name="video_tractate",
type="video_tractate",
model=SoftArticle,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_video_tractate,
pk_data_source=principal_pk_data_source
),
'word_tractate': TypeInfo(
name="word_tractate",
type="word_tractate",
model=SoftArticle,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_word_tractate,
pk_data_source=principal_pk_data_source
),
'doctortractate': TypeInfo(
name="doctortractate",
type="doctortractate",
model=SoftArticle,
bulk_insert_chunk_size=500,
round_insert_chunk_size=50,
round_insert_period=12,
batch_get_data_func=get_soft_article,
pk_data_source=soft_article_pk_data_source
),
}
return __type_info_map
class TypeInfo(object):
def __init__(
self,
name,
type,
model,
bulk_insert_chunk_size,
round_insert_chunk_size,
round_insert_period,
batch_get_data_func,
pk_data_source,
index_data_source=None,
gm_mq_endpoint=None,
logic_database_id=None
):
self.name = name
self.type = type
self.model = model
self.batch_get_data_func = batch_get_data_func
self.pk_blacklist = ()
self.bulk_insert_chunk_size = bulk_insert_chunk_size
self.round_insert_chunk_size = round_insert_chunk_size
self.round_insert_period = round_insert_period
self.gm_mq_endpoint = gm_mq_endpoint
self.logic_database_id = logic_database_id
self.pk_data_source = pk_data_source
self.index_data_source = index_data_source
def elasticsearch_bulk_insert_data(self, index_prefix, data_list, es=None):
index_type = self.type
if self.type == 'principal_article' or self.type == 'principal_activity' \
or self.type == 'live_notice' or self.type == 'live_stream' \
or self.type == "word_tractate" or self.type == "video_tractate":
index_type = 'principal'
index = es_index_adapt(
index_prefix=index_prefix,
doc_type=index_type,
rw='write',
)
bulk_actions = []
if self.type == 'principal_article' or self.type == 'principal_activity' \
or self.type == 'live_notice' or self.type == 'live_stream' \
or self.type == "word_tractate" or self.type == "video_tractate":
for data in data_list:
bulk_actions.append({
'_op_type': 'index',
'_index': index,
'_type': "_doc",
'_id': data['id'] + data["principal_type"] * 10000000,
'_source': data,
})
else:
for data in data_list:
bulk_actions.append({
'_op_type': 'index',
'_index': index,
'_type': "_doc",
'_id': data['id'],
'_source': data,
})
if es is None:
es = get_es_instance()
es_result = None
if bulk_actions:
try:
es_result = elasticsearch.helpers.bulk(client=es, actions=bulk_actions)
except Exception as e:
logging_exception()
es_result = 'error'
return es_result
def insert_table_by_pk_list(self, index_prefix, pk_list, es=None):
data_list = self.batch_get_data_func(pk_list)
self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
data_list=data_list,
es=es,
)