import json import datetime from lxml import html from bs4 import BeautifulSoup from urllib.parse import urljoin from django.conf import settings from gm_upload.utils.image_utils import Picture from gm_upload import get_video_base_info from gm_types.gaia import MIXED_TYPE from gm_types.mimas import GRABBING_PLATFORM, QA_CONTENT_TYPE from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE, IMAGE_TYPE from qa.utils.image import handle_image_type from qa.models import AnswerReply, Answer, QuestionInviter, Question, AnswerVote, AnswerVoteReply from qa.utils.image import get_w_path, get_image_base_info from django.db.models import Q from utils.rpc import logging_exception QA_CONTENT_CONVERT_TO_RICH_TEXT_PLATFORM = [ # 问答内容转换富文本的平台来源 GRABBING_PLATFORM.GM, GRABBING_PLATFORM.KYC, ] def get_unread_count_by_user_id(user_id): """ 获取未读的回答数 :param user_id: 用户id :return: """ if not user_id: return 0 query = Q(is_online=True, is_read=False) # 担心有慢查询,写成两个查询 answer_query = query & Q(answer__user=user_id, commented_reply_id__isnull=True) # v 7.7.05 计算该时间之后的新逻辑 start_time = datetime.datetime.strptime(settings.COUNT_ANSWER_COMMENTED_REPLY_START_TIME, '%Y-%m-%d %H:%M:%S') sub_reply_query = query & Q(commented_reply__user=user_id, create_time__gte=start_time) def _reply_ids(query): """ 获取评论的id 可能会有重复数据,以 id 维度去重 :param query: :return: """ return list(AnswerReply.objects.filter(query).exclude(user=user_id).values_list("id", flat=True)) replies_count = len(set(_reply_ids(answer_query) + _reply_ids(sub_reply_query))) return replies_count def get_answers_by_ids(ids): """get answers list by answer ids. :param ids: answer ids list :return: list of dict { 'answer_id': int, 'answer_content': str, 'question_id': int, 'question_title': str, } """ answers = Answer.objects.select_related('question').filter( id__in=ids, is_online=True, question__is_online=True ) result = [] for answer in answers: answer_data = { 'answer_id': answer.id, 'answer_content': answer.content, 'question_id': answer.question.id, 'question_title': answer.question.title, } result.append(answer_data) return result def get_answer_reply_by_pks(pks): """get answer reply by pks. :param pks: :return: list of ::qa.models.AnswerReply```get_reply_for_mime``` """ return [ reply.get_reply_for_mine() for reply in AnswerReply.objects.filter(id__in=pks, is_online=True) ] def get_answer_reply_new(user_id=None, start_num=0, count=10): """ 根据新需求 过滤自己的回复 回复只通知给被回复人 无被回复人就通知给回答者 :param user_id: :param start_num: :param count: :return: """ replies = AnswerReply.objects.filter( Q(is_online=True), Q(answer__user=user_id, commented_reply=None) | Q(commented_reply__user=user_id) ).exclude(user=user_id).order_by('-create_time')[start_num:start_num+count] return [reply.get_reply_for_mine() for reply in replies] def update_answer_reply_read_status(pks): AnswerReply.objects.filter(id__in=pks, is_read=False).update(is_read=True) def get_unread_answer_info_by_user_id(user_id): """ 通过 user_id 获取用户的未读回答信息 :param user_id: :return: result dict类型 count,user_info 未读数,最新的用户信息 """ result = { "count": 0, "last_user_info": {}, } if not user_id: return result # 当前我的问题,被回答但是没有查看的数据 question_ids = list(Question.objects.filter(user=user_id, is_online=True).values_list("id", flat=True)) # 过滤自己回复自己的内容 answer = Answer.objects.filter( question_id__in=question_ids, is_online=True, questioner_read=False).exclude(user=user_id) # 被我邀请的人回答了我邀请的问题,但我还未查看的数据 排除邀请者邀请他人回答自己的问题 inviters = QuestionInviter.objects.filter(user=user_id, answer_id__isnull=False, user_read=False).exclude(question_id__in=question_ids) if not answer and not inviters: return result result["count"] = answer.count() + inviters.count() invite_answer_ids = list(inviters.values_list("answer_id", flat=True)) invite_answer = Answer.objects.filter(id__in=invite_answer_ids, is_online=True).order_by("-create_time").first() lastest_answer = None if answer and invite_answer: lastest_answer = answer.last() if answer.last().create_time > invite_answer.create_time else invite_answer elif answer and not invite_answer: lastest_answer = answer.last() elif not answer and invite_answer: lastest_answer = invite_answer #answer为空,invite_answer存在 result["last_user_info"] = { "nickname": lastest_answer and lastest_answer.user.nickname or "", "portrait": lastest_answer and lastest_answer.user.portrait or "", } return result def get_unread_answer_vote_num_by_user_id(user_id): """ 通过 user_id 获取用户未读得回答点赞数 :param user_id: user_id :return: int count """ count = 0 if not user_id: return count votes = AnswerVote.objects.filter(answer__user=user_id, unread=True, is_fake=False) count = votes.count() return count def get_unread_answer_reply_vote_num_by_user_id(user_id): """ 通过 user_id 获取用户未读得回答评论点赞数 :param user_id: user_id :return: int count """ count = 0 if not user_id: return count votes = AnswerVoteReply.objects.filter(answerreply__user=user_id, unread=True) count = votes.count() return count def get_answer_vote_infos_by_ids(vote_ids): """ 通过 answer_vote_id 获取models 信息 :param vote_ids: list [] :return: dict {} """ result = {} if not vote_ids: return result votes = AnswerVote.objects.filter(id__in=vote_ids) return {str(vote.id): vote.to_dict() for vote in votes} def get_answer_replies_by_ids(reply_ids): """ 通过 answer_reply_id 获取 models 信息 :param reply_ids: list [] :return: dict {} """ result = {} if not reply_ids: return result replies = AnswerReply.objects.filter(id__in=reply_ids) return {str(reply.id): reply for reply in replies} def get_qa_content_text(qa_platform, qa_content): ''' 7675 问答支持图文混排,content内容格式变化为jsonsting :param qa_platform: :param qa_content: 可为空 :return: ''' if qa_platform == GRABBING_PLATFORM.GM and qa_content: content = '' original_content = json.loads(qa_content) for c in original_content: if int(c.get('type', -1)) == MIXED_TYPE.WORDS: content += c.get('content') else: content = qa_content return content def format_qa_content(qa_platform, content): ''' 格式化问答内容,7675后支持图文混排 v 7.7.10 update 新增问答内容类型 (视频...) :param qa_platform: :param content: :return: str update dict key为表中字段 ''' content_type = QA_CONTENT_TYPE.ORDINARY if qa_platform == GRABBING_PLATFORM.GM and content: original_content = json.loads(content) richtext_content = '' for c in original_content: if int(c.get('type', -1)) == MIXED_TYPE.WORDS: richtext_content += '
'+c.get('content')+'
' elif int(c.get('type', -1)) == MIXED_TYPE.IMAGE: richtext_content += ''+c.get('content')+'
' elif int(c.get('type', -1)) == MIXED_TYPE.IMAGE: richtext_content += '