#!/usr/bin/env python # -*- coding: utf-8 -*- # 获取回答的基础模块 from __future__ import unicode_literals, absolute_import, print_function from collections import defaultdict from operator import itemgetter from django.db.models import Q from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE from qa.models.answer import ( Answer, AnswerImage, AnswerTagV3, AnswerTag, AnswerVote, ) from utils.base_manager import ( BaseManager, ) from communal.normal_manager import ( tag_manager, ) class AnswerManager(BaseManager): model = Answer base_query = Q(is_online=True, question__is_online=True) @staticmethod def get_base_info(answer): """ 获取回答的基本信息 :param answer: :return: """ return { "id": answer.id, "answer_id": answer.id, "question_id": answer.question_id, "user_id": answer.user_id, "content": answer.content, # 注意这是原始数据 "content_type": answer.content_type, "content_level": answer.level, "platform": answer.platform, "vote_num": answer.like_num, # 点赞数 "is_online": answer.is_online, "is_recommend": answer.is_recommend, "create_timestamp": int(answer.create_time.timestamp()), "cover_url": answer.cover_url, } @staticmethod def is_voted_by_ids(user_id, answer_ids): if not user_id: return {} voted = {} for item in AnswerVote.objects.filter(user=user_id, answer_id__in=answer_ids): voted[item.answer_id] = True return voted @staticmethod def get_tags_v3_info_by_answer_ids(answer_ids): """ 通过回答id获取标签3.0的信息 :param answer_ids: :return: """ a_tags = AnswerTagV3.objects.filter( answer_id__in=answer_ids ).values_list("answer_id", "tag_v3_id") tag_v3_ids = list(map(itemgetter(1), a_tags)) tags_info_dic = tag_manager.get_tags_v3_info_by_ids(tag_v3_ids) _result = defaultdict(list) for answer_id, tag_id in a_tags: _tag_info = tags_info_dic.get(tag_id, {}) if _tag_info: _result[answer_id].append(_tag_info) return dict(_result) @staticmethod def get_tags_info_by_answer_ids(answer_ids): """通过回答id获取标签1.0的信息""" a_tags = AnswerTag.objects.filter( answer_id__in=answer_ids ).values_list("answer_id", "tag_id") tag_ids = list(map(itemgetter(1), a_tags)) tags_info_dic = tag_manager.get_tags_info_by_ids(tag_ids) _result = defaultdict(list) for answer_id, tag_id in a_tags: _tag_info = tags_info_dic.get(tag_id, {}) if _tag_info: _result[answer_id].append(_tag_info) return dict(_result) def get_answer_base_list_info_by_obj_list(self, answer_obj_list): """ :param answer_obj_list: 回答对象列表 :return: """ result = { "answers": {}, "valid_user_ids": [], "raw_medias": {}, } if not answer_obj_list: return result _user_ids = [] a_text_dic = {} for answer in answer_obj_list: _id = answer.id _user_ids.append(answer.user_id) a_text_dic[_id] = answer.content _data = self.get_base_info(answer) _data.update({ "comment_num": 0, # 先写死这样 "first_reply_num": 0, # 先写死这样 "view_num": 0, # 先写死这样 "is_voted": False, # 是否点过赞 }) result["answers"].update({ _data["id"]: _data, }) result["valid_user_ids"] = _user_ids result["raw_medias"] = a_text_dic return result @staticmethod def get_header_imgs_by_ids(answer_ids): images = AnswerImage.objects.filter(answer_id__in=answer_ids, image_url_source=MEDIA_IMAGE_URL_SOURCE.HEAD).order_by('id') result = defaultdict(list) for image in images: result[image.answer_id].append(image.image_info_data) return result