from django.db.models import Count, Sum from django.conf import settings from collections import defaultdict from gm_types.gaia import USER_TYPE from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE from social.models import SocialInfo from qa.models.answer import Answer, AnswerVote, AnswerReply, AnswerImage, QuestionTag from qa.portal import update_answer_reply_read_status from qa.tasks import qa_image_add_base_info from utils.common import get_data_from_rich_text from utils.user import get_user_gm_url from utils.protocol import gm_protocol class AnswerTools(object): @classmethod def get_author(cls, obj): return { 'gm_url': get_user_gm_url(obj.user.id), 'user_id': obj.user.id, 'user_type': USER_TYPE.NORMAL, 'user_name': obj.user.nickname, 'user_portrait': obj.user.portrait, 'membership_level': obj.user.membership_level, 'user_level': { 'membership_icon': obj.user.membership_icon, 'level_icon': obj.user.level_icon, 'constellation_icon': obj.user.constellation_icon, }, "college_id": obj.user.get("college_id", 0), "college_name": obj.user.get("college_name", ""), } @classmethod def get_replies(cls, answer): replies = AnswerReply.objects.filter(answer=answer, first_reply__isnull=True, is_online=True).order_by("create_time")[:2] return [reply.get_data_for_reply(new_order=True) for reply in replies] @classmethod def get_questions_data(cls, current_user, answers): answer_ids = [answer.id for answer in answers] uids = [answer.user_id for answer in answers] is_following_users = SocialInfo(current_user.id).is_following_users(uids=uids) if current_user else {} answers_dict = {} for answer in answers: answers_dict[answer.id] = { "id": answer.id, "content": answer.content, "create_time": answer.create_time.strftime('%y-%m-%d %H:%M'), "is_recommend": answer.is_recommend, "platform": answer.platform, "comment_count": answer.comment_num, "user_type": USER_TYPE.NORMAL, "like_num": answer.like_num, "timestamp": int(answer.create_time.strftime("%s")), "replies": cls.get_replies(answer), "author": cls.get_author(answer), } images_info = cls.get_answers_images_info(answer_ids) replies_count_info = cls.replies_count_info(answer_ids) answers_vote_info = cls.get_answers_vote_info(answer_ids, current_user) for answer_id, item in answers_dict.items(): item["author"]["is_following"] = is_following_users.get(item["author"]["user_id"], False) item["images"] = images_info.get(answer_id, []) item["first_reply_count"] = replies_count_info.get(answer_id, 0) item["is_voted"] = answers_vote_info.get(answer_id, False) # 还原排序 return sorted(list(answers_dict.values()), key=lambda i: answer_ids.index(i["id"])) @classmethod def get_answers_vote_info(cls, answer_ids, current_user): # 获取点赞相关信息 if not current_user: return {} answers_vote_dict = {} answers_vote = AnswerVote.objects.filter(answer_id__in=answer_ids, user=current_user.id). \ values_list('answer_id').annotate(vote_num=Count('id')) for answer_id, vote_num in answers_vote: answers_vote_dict[answer_id] = True if vote_num else False return answers_vote_dict @classmethod def replies_count_info(cls, answer_ids): # 获取并设置一级评论数images_dict{answer_id: first_reply_num]} replies_count_dict = {} replies_count = AnswerReply.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, answer_id__in=answer_ids, first_reply=None). \ values_list('answer_id').annotate(first_reply_num=Count('id')) for answer_id, first_reply_num in replies_count: replies_count_dict[answer_id] = first_reply_num return replies_count_dict @classmethod def get_answers_images_info(cls, answer_ids): # 获取回答的图片: images_dict{answer_id: [urls]} images_dict = {} answers_images = AnswerImage.objects.filter( answer_id__in=answer_ids, image_url_source=MEDIA_IMAGE_URL_SOURCE.CREATE ).values_list("answer_id", "image_url") for answer_id, image_url in answers_images: if answer_id not in images_dict: images_dict[answer_id] = [] images_dict[answer_id].append(image_url) return images_dict @classmethod def get_answer_replies(cls, answer_reply_ids): # 我收到的评论 answer_replies = AnswerReply.objects.select_related("commented_reply", "answer").filter( id__in=answer_reply_ids, is_online=True ) answer_replies_dict = {reply.id: reply for reply in answer_replies} update_answer_reply_read_status(answer_reply_ids) # 更新回答回复的已读状态 return answer_replies_dict @classmethod def get_user_publish_answer_nums(cls, user_ids): """ 获取用户发布的总回答数 :param user_ids: :return: """ _nums_dic = {} answer_nums = Answer.objects.filter( user__in=user_ids, is_online=True ).values_list("user").annotate(cnt=Count("id")) for user_id, cnt in answer_nums: _nums_dic[user_id] = cnt return _nums_dic @classmethod def get_user_publish_answer_vote_nums(cls, user_ids): """ 获取用户发布的所有回答的点赞数 :param user_ids: :return: """ _nums_dic = {} favor_nums = Answer.objects.filter( user__in=user_ids, is_online=True ).values_list("user").annotate(snt=Sum("like_num")) for user_id, snt in favor_nums: _nums_dic[user_id] = snt return _nums_dic @classmethod def get_question_all_answer_vote_nums(cls, question_ids): """ 获取问题下所有回答的点赞数总和 :param question_ids: :return: """ _nums_dic = {} favor_nums = Answer.objects.filter( question_id__in=question_ids, is_online=True ).values_list("question_id").annotate(snt=Sum("like_num")) for question_id, snt in favor_nums: _nums_dic[question_id] = snt return _nums_dic @classmethod def get_header_images_by_answer_ids(cls, answer_ids, image_count=9): """ 7.22.0 add 获取回答、问题头图 :param answer_ids: :param image_count: 头图个数,创建时最多9张 :return: """ _images = defaultdict(list) if not answer_ids: return _images images = AnswerImage.objects.filter( image_url_source=MEDIA_IMAGE_URL_SOURCE.HEAD, answer_id__in=answer_ids ).order_by('id') for image in images: if len(_images[image.answer_id]) < image_count: _images[image.answer_id].append(image.image_info_data) return _images class QaTool(object): @staticmethod def _sorted_qa_images(content_images, need_sorted_images): """ 仅在获取新的图片数据中有用 :param content_images: 富文本内容中的图片列表 :param need_sorted_images:待排序的图片数据列表 :return: """ can_sorted_images, others = [], [] content_images = list(map(lambda item: item.split("-")[0], content_images)) for image_item in need_sorted_images: if image_item.get("image", "") in content_images: can_sorted_images.append(image_item) else: others.append(image_item) _images_list = sorted(can_sorted_images, key=lambda item: content_images.index(item.get("image", ""))) _images_list.extend(others) return _images_list @classmethod def get_intact_images(cls, obj, model_name): """ 获取问答相关的新图片逻辑 :param obj: :param model_name: 表名 question_image or answer_image :return: """ _, content_images = get_data_from_rich_text(obj.content, u'//img/@src') # 如果存在富文本里有图,但数据库里没图,则走创建逻辑 if not obj.images.filter( image_url_source=MEDIA_IMAGE_URL_SOURCE.RICH_TEXT ).exists() and content_images: qa_image_add_base_info.delay( images_list=list(map(lambda item: item.split("-")[0], content_images)), params_info={ "model": model_name, "id": obj.id, }, ) image_list = [] for image in obj.images.all(): image_list.append(image.image_info_data) image_list = cls._sorted_qa_images(content_images, image_list) return image_list @classmethod def get_following_info(cls, user_ids, current_user_id): """ 获取当前用户的关注信息 :param user_ids: :param current_user_id: :return: """ following_dic = {} if current_user_id: following_dic = SocialInfo(current_user_id).is_following_users(uids=user_ids) return following_dic @classmethod def get_tags_info_by_question_ids(cls, question_ids): """ 通过问题id,获取对应的标签信息 :param question_ids: :return: """ result = defaultdict(list) if question_ids: q_tag_list = QuestionTag.objects.filter(question_id__in=question_ids) for q_tag in q_tag_list: result[q_tag.question_id].append(q_tag.tag) return dict(result) @classmethod def get_header_images(cls, obj, image_count=9): """ 7.22.0 add 获取回答、问题头图 :param obj: answer question :param image_count: 头图个数,创建时最多9张 :return: """ images = obj.images.filter( image_url_source=MEDIA_IMAGE_URL_SOURCE.HEAD ).order_by('id')[:image_count] _images = [] for image in images: _images.append(image.image_info_data) return _images