1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# -*- coding: UTF-8 -*-
from celery import shared_task
from django.conf import settings
import datetime
from data_sync.type_info import get_type_info_map
from data_sync.diary.tasks import update_extra_info
from data_sync.user.user import sync_user_level
from qa.models.answer import Answer
from search.models import MixIndex, MixIndexTag
from talos.models.topic import Problem, Article
from gm_types.gaia import INDEX_CARD_TYPE
from gm_types.mimas.qa import CONTENT_CLASS
import logging
@shared_task
def write_to_es(es_type, pk_list):
pk_list = list(frozenset(pk_list))
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
logging.info("get pk_list:%s" % pk_list)
type_info.insert_table_by_pk_list(
index_prefix=settings.ES_INDEX_PREFIX,
pk_list=pk_list,
)
class AnswerScore(object):
'''
计算回答的分值,计算规则类似data_sync.question.tran2es.Score
http://wiki.wanmeizhensuo.com/pages/viewpage.action?pageId=4441797
'''
@classmethod
def get_score(cls, answer):
now = datetime.datetime.now()
vote_num = answer.answervote_set.filter(is_fake=False).count()
content_score = cls.get_answer_content_score(answer.level)
social_score = cls.get_social_score(vote_num, answer.replys.count())
time_score = (now - answer.question.create_time).seconds / 3600 * 0.03 * 0.7 + \
(now - answer.create_time).seconds / 3600 * 0.06 * 1.5
answer_score = 0.8 * content_score + 0.2 * social_score - time_score
return max(0.0, answer_score)
@staticmethod
def get_answer_content_score(level):
if level < 2:
return 0
elif level < 3:
return 5
elif level < 4:
return 10
elif level < 5:
return 70
else:
return 100
@staticmethod
def get_social_score(likes_num, reply_num):
likes_score = AnswerScore.get_likes_score(likes_num)
reply_score = AnswerScore.get_reply_score(reply_num)
return 0.4 * likes_score + 0.6 * reply_score
@staticmethod
def get_likes_score(likes_num):
if likes_num <= 5:
return 10
elif likes_num <= 20:
return 20
elif likes_num <= 50:
return 30
elif likes_num <= 70:
return 60
elif likes_num <= 100:
return 70
else:
return 100
@staticmethod
def get_reply_score(reply_num):
if reply_num <= 5:
return 10
elif reply_num <= 20:
return 20
elif reply_num <= 50:
return 30
elif reply_num <= 70:
return 60
elif reply_num <= 100:
return 70
else:
return 100
@shared_task
def update_knowledge(model_type, pk_list):
'''
知识数据同步
:param model_type:
:param pk_list:
:return:
'''
if not pk_list:
return
if model_type == 'answer':
answers = Answer.objects.filter(id__in=pk_list)
answer_in_mix = MixIndex.objects.filter(original_id__in=pk_list, original_type=INDEX_CARD_TYPE.ANSWER)
answer_in_mix_dict = {mix.original_id: mix for mix in answer_in_mix}
for item in answers:
new_tags = item.question.tags
if item.id in answer_in_mix_dict:
mix_obj = answer_in_mix_dict[item.id]
if item.level < CONTENT_CLASS.FINE or not item.is_online or not item.question.is_online:
mix_obj.delete()
continue
# 重新计算分值
answer_score = AnswerScore.get_score(item) if item.is_recommend else mix_obj.answer_score
mix_obj.answer_score = answer_score
mix_obj.answer_is_recommend = item.is_recommend
mix_obj.save()
# 更新tag
new_tags_set = set()
old_tags_dict = {tag.tag_id: tag for tag in mix_obj.mixindextag_set.all()}
for _tag in new_tags:
if not _tag.tag_id in old_tags_dict:
# 新增tag关系
MixIndexTag.objects.create(mix_index=mix_obj, tag_id=_tag.tag_id)
new_tags_set.add(_tag.tag_id)
# 删除已未关联的tag关系
remove_tag = old_tags_dict.keys() - new_tags_set
if remove_tag:
for t_id in remove_tag:
old_tags_dict[t_id].delete()
else:
if item.level < CONTENT_CLASS.FINE or not item.is_online or not item.question.is_online:
continue
answer_score = AnswerScore.get_score(item) if item.is_recommend else 0.0
# 新增记录
mix_index_obj = MixIndex.objects.create(original_id=item.id, original_type=INDEX_CARD_TYPE.ANSWER,
answer_score=answer_score,
original_create_time=item.create_time,
answer_is_recommend=item.is_recommend)
insert_lst = []
for n_tag in new_tags:
insert_lst.append(MixIndexTag(mix_index=mix_index_obj, tag_id=n_tag.tag_id))
MixIndexTag.objects.bulk_create(insert_lst)
elif model_type == 'article':
articles = Article.objects.filter(id__in=pk_list)
articles_in_mix = MixIndex.objects.filter(original_id__in=pk_list, original_type=INDEX_CARD_TYPE.ARTICLE)
articles_in_mix_dict = {mix.original_id: mix for mix in articles_in_mix}
for item in articles:
new_tags = Problem.objects.get(id=item.article_id).problemtag_set.all() # 如果下线,93 line 直接continue,不会更新tag
if item.id in articles_in_mix_dict:
mix_obj = articles_in_mix_dict[item.id]
if not item.is_online:
mix_obj.delete() # 关联外键是否删除
continue
# 更新tag
new_tags_set = set()
old_tags_dict = {tag.tag_id: tag for tag in mix_obj.mixindextag_set.all()}
for _tag in new_tags:
if not _tag.tag_id in old_tags_dict:
# 新增tag关系
MixIndexTag.objects.create(mix_index=mix_obj, tag_id=_tag.tag_id)
new_tags_set.add(_tag.tag_id)
# 删除已未关联的tag关系
remove_tag = old_tags_dict.keys() - new_tags_set
if remove_tag:
for t_id in remove_tag:
old_tags_dict[t_id].delete()
else:
if not item.is_online:
continue
# 新增记录
mix_index_obj = MixIndex.objects.create(original_id=item.id, original_type=INDEX_CARD_TYPE.ARTICLE,
original_create_time=item.created_time)
insert_lst = []
for n_tag in new_tags:
insert_lst.append(MixIndexTag(mix_index=mix_index_obj, tag_id=n_tag.tag_id))
MixIndexTag.objects.bulk_create(insert_lst)
@shared_task
def sync_diary_extra_info(diary_ids: list):
"""同步日记本相关统计数据。
更具变更的日记本 id 列表从三个方面进行统计表的更新:点赞总数,另外贴子总数在其他地方
有更新,回复总数api_diary中有字段进行更新
"""
if not diary_ids:
return
update_extra_info(diary_ids)
@shared_task
def sync_user_level_to_gaia(user_ids: list):
"""同步用户等级信息到 gaia。"""
sync_user_level(user_ids)