#!/usr/bin/env python # -*- coding: utf-8 -*- # 获取问题相关的基础模块 from __future__ import unicode_literals, absolute_import, print_function from collections import defaultdict from django.db.models import Q from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE from qa.models.answer import ( Question, QuestionImage, QuestionTag, QuestionTagV3, ) from utils.base_manager import BaseManager from communal.normal_manager import ( tag_manager, ) class QuestionManager(BaseManager): model = Question base_query = Q(is_online=True) @staticmethod def get_base_info(question): """ 获取问题的基本信息 :param question: :return: """ return { "id": question.id, "question_id": question.id, "user_id": question.user_id, "title": question.title, # 注意这是原始数据 "question_content": question.content, # 注意这是原始数据 "content_type": question.content_type, "platform": question.platform, "is_online": question.is_online, "is_recommend": question.is_recommend, "create_timestamp": int(question.create_time.timestamp()), } @staticmethod def get_tags_by_question_ids(question_ids): return list(QuestionTag.objects.filter(question_id__in=question_ids)) @staticmethod def get_tags_info_by_question_tags(question_tags): tag_ids = [item.tag_id for item in question_tags] tags_info_dic = tag_manager.get_tags_info_by_ids(tag_ids) _result = defaultdict(list) for item in question_tags: _tag_info = tags_info_dic.get(item.tag_id, {}) if _tag_info: _result[item.question_id].append(_tag_info) return dict(_result) @staticmethod def get_tags_info_by_question_ids(question_ids): """ 通过问题id获取标签信息 :param question_ids: :return: """ question_tags = QuestionManager.get_tags_by_question_ids(question_ids) return QuestionManager.get_tags_info_by_question_tags(question_tags) @staticmethod def get_tags_v3_by_question_ids(question_ids): return list(QuestionTagV3.objects.filter(question_id__in=question_ids)) @staticmethod def get_tags_v3_info_by_question_tags(question_tags_v3): tag_v3_ids = [item.tag_v3_id for item in question_tags_v3] tags_info_dic = tag_manager.get_tags_v3_info_by_ids(tag_v3_ids) _result = defaultdict(list) for item in question_tags_v3: _tag_info = tags_info_dic.get(item.tag_v3_id, {}) if _tag_info: _result[item.question_id].append(_tag_info) return dict(_result) @staticmethod def get_tags_v3_info_by_question_ids(question_ids): """ 通过问题id获取标签3.0的信息 :param question_ids: :return: """ question_tags = QuestionManager.get_tags_v3_by_question_ids(question_ids) return QuestionManager.get_tags_v3_info_by_question_tags(question_tags) def get_question_base_list_info_by_obj_list(self, question_obj_list): """ :param question_obj_list: 问题对象列表 :return: """ result = { "questions": {}, "valid_user_ids": [], "raw_medias": {}, } if not question_obj_list: return result _user_ids = [] q_text_dic = {} for question in question_obj_list: _id = question.id _user_ids.append(question.user_id) q_text_dic[_id] = question.content result["questions"].update({ _id: self.get_base_info(question), }) result["valid_user_ids"] = _user_ids result["raw_medias"] = q_text_dic return result @classmethod def get_header_imgs_by_ids(answer_ids): images = QuestionImage.objects.filter(image_url_source=MEDIA_IMAGE_URL_SOURCE.HEAD).order_by('id') result = defaultdict(list) for image in images: result[image.answer_id].append(image.image_info_data) return result