1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 获取回答的基础模块
from __future__ import unicode_literals, absolute_import, print_function
from collections import defaultdict
from operator import itemgetter
from django.db.models import Q
from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE
from qa.models.answer import (
Answer, AnswerImage,
AnswerTagV3,
AnswerTag,
AnswerVote,
)
from utils.base_manager import (
BaseManager,
)
from communal.normal_manager import (
tag_manager,
)
class AnswerManager(BaseManager):
model = Answer
base_query = Q(is_online=True, question__is_online=True)
@staticmethod
def get_base_info(answer):
"""
获取回答的基本信息
:param answer:
:return:
"""
return {
"id": answer.id,
"answer_id": answer.id,
"question_id": answer.question_id,
"user_id": answer.user_id,
"content": answer.content, # 注意这是原始数据
"content_type": answer.content_type,
"content_level": answer.level,
"platform": answer.platform,
"vote_num": answer.like_num, # 点赞数
"is_online": answer.is_online,
"is_recommend": answer.is_recommend,
"create_timestamp": int(answer.create_time.timestamp()),
"cover_url": answer.cover_url,
}
@staticmethod
def is_voted_by_ids(user_id, answer_ids):
if not user_id:
return {}
voted = {}
for item in AnswerVote.objects.filter(user=user_id, answer_id__in=answer_ids):
voted[item.answer_id] = True
return voted
@staticmethod
def get_tags_v3_info_by_answer_ids(answer_ids):
"""
通过回答id获取标签3.0的信息
:param answer_ids:
:return:
"""
a_tags = AnswerTagV3.objects.filter(
answer_id__in=answer_ids
).values_list("answer_id", "tag_v3_id")
tag_v3_ids = list(map(itemgetter(1), a_tags))
tags_info_dic = tag_manager.get_tags_v3_info_by_ids(tag_v3_ids)
_result = defaultdict(list)
for answer_id, tag_id in a_tags:
_tag_info = tags_info_dic.get(tag_id, {})
if _tag_info:
_result[answer_id].append(_tag_info)
return dict(_result)
@staticmethod
def get_tags_info_by_answer_ids(answer_ids):
"""通过回答id获取标签1.0的信息"""
a_tags = AnswerTag.objects.filter(
answer_id__in=answer_ids
).values_list("answer_id", "tag_id")
tag_ids = list(map(itemgetter(1), a_tags))
tags_info_dic = tag_manager.get_tags_info_by_ids(tag_ids)
_result = defaultdict(list)
for answer_id, tag_id in a_tags:
_tag_info = tags_info_dic.get(tag_id, {})
if _tag_info:
_result[answer_id].append(_tag_info)
return dict(_result)
def get_answer_base_list_info_by_obj_list(self, answer_obj_list):
"""
:param answer_obj_list: 回答对象列表
:return:
"""
result = {
"answers": {},
"valid_user_ids": [],
"raw_medias": {},
}
if not answer_obj_list:
return result
_user_ids = []
a_text_dic = {}
for answer in answer_obj_list:
_id = answer.id
_user_ids.append(answer.user_id)
a_text_dic[_id] = answer.content
_data = self.get_base_info(answer)
_data.update({
"comment_num": 0, # 先写死这样
"first_reply_num": 0, # 先写死这样
"view_num": 0, # 先写死这样
"is_voted": False, # 是否点过赞
})
result["answers"].update({
_data["id"]: _data,
})
result["valid_user_ids"] = _user_ids
result["raw_medias"] = a_text_dic
return result
@staticmethod
def get_header_imgs_by_ids(answer_ids):
images = AnswerImage.objects.filter(answer_id__in=answer_ids, image_url_source=MEDIA_IMAGE_URL_SOURCE.HEAD).order_by('id')
result = defaultdict(list)
for image in images:
result[image.answer_id].append(image.image_info_data)
return result