import json import datetime from lxml import html from bs4 import BeautifulSoup from urllib.parse import urljoin from django.conf import settings from gm_upload.utils.image_utils import Picture from gm_upload import get_video_base_info from gm_types.gaia import MIXED_TYPE from gm_types.mimas import GRABBING_PLATFORM, QA_CONTENT_TYPE from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE, IMAGE_TYPE from qa.utils.image import handle_image_type from qa.models import AnswerReply, Answer, QuestionInviter, Question, AnswerVote, AnswerVoteReply from qa.utils.image import get_w_path, get_image_base_info from django.db.models import Q from utils.rpc import logging_exception QA_CONTENT_CONVERT_TO_RICH_TEXT_PLATFORM = [ # 问答内容转换富文本的平台来源 GRABBING_PLATFORM.GM, GRABBING_PLATFORM.KYC, ] def get_unread_count_by_user_id(user_id): """ 获取未读的回答数 :param user_id: 用户id :return: """ if not user_id: return 0 query = Q(is_online=True, is_read=False) # 担心有慢查询,写成两个查询 answer_query = query & Q(answer__user=user_id, commented_reply_id__isnull=True) # v 7.7.05 计算该时间之后的新逻辑 start_time = datetime.datetime.strptime(settings.COUNT_ANSWER_COMMENTED_REPLY_START_TIME, '%Y-%m-%d %H:%M:%S') sub_reply_query = query & Q(commented_reply__user=user_id, create_time__gte=start_time) def _reply_ids(query): """ 获取评论的id 可能会有重复数据,以 id 维度去重 :param query: :return: """ return list(AnswerReply.objects.filter(query).exclude(user=user_id).values_list("id", flat=True)) replies_count = len(set(_reply_ids(answer_query) + _reply_ids(sub_reply_query))) return replies_count def get_answers_by_ids(ids): """get answers list by answer ids. :param ids: answer ids list :return: list of dict { 'answer_id': int, 'answer_content': str, 'question_id': int, 'question_title': str, } """ answers = Answer.objects.select_related('question').filter( id__in=ids, is_online=True, question__is_online=True ) result = [] for answer in answers: answer_data = { 'answer_id': answer.id, 'answer_content': answer.content, 'question_id': answer.question.id, 'question_title': answer.question.title, } result.append(answer_data) return result def get_answer_reply_by_pks(pks): """get answer reply by pks. :param pks: :return: list of ::qa.models.AnswerReply```get_reply_for_mime``` """ return [ reply.get_reply_for_mine() for reply in AnswerReply.objects.filter(id__in=pks, is_online=True) ] def get_answer_reply_new(user_id=None, start_num=0, count=10): """ 根据新需求 过滤自己的回复 回复只通知给被回复人 无被回复人就通知给回答者 :param user_id: :param start_num: :param count: :return: """ replies = AnswerReply.objects.filter( Q(is_online=True), Q(answer__user=user_id, commented_reply=None) | Q(commented_reply__user=user_id) ).exclude(user=user_id).order_by('-create_time')[start_num:start_num+count] return [reply.get_reply_for_mine() for reply in replies] def update_answer_reply_read_status(pks): AnswerReply.objects.filter(id__in=pks, is_read=False).update(is_read=True) def get_unread_answer_info_by_user_id(user_id): """ 通过 user_id 获取用户的未读回答信息 :param user_id: :return: result dict类型 count,user_info 未读数,最新的用户信息 """ result = { "count": 0, "last_user_info": {}, } if not user_id: return result # 当前我的问题,被回答但是没有查看的数据 question_ids = list(Question.objects.filter(user=user_id, is_online=True).values_list("id", flat=True)) # 过滤自己回复自己的内容 answer = Answer.objects.filter( question_id__in=question_ids, is_online=True, questioner_read=False).exclude(user=user_id) # 被我邀请的人回答了我邀请的问题,但我还未查看的数据 排除邀请者邀请他人回答自己的问题 inviters = QuestionInviter.objects.filter(user=user_id, answer_id__isnull=False, user_read=False).exclude(question_id__in=question_ids) if not answer and not inviters: return result result["count"] = answer.count() + inviters.count() invite_answer_ids = list(inviters.values_list("answer_id", flat=True)) invite_answer = Answer.objects.filter(id__in=invite_answer_ids, is_online=True).order_by("-create_time").first() lastest_answer = None if answer and invite_answer: lastest_answer = answer.last() if answer.last().create_time > invite_answer.create_time else invite_answer elif answer and not invite_answer: lastest_answer = answer.last() elif not answer and invite_answer: lastest_answer = invite_answer #answer为空,invite_answer存在 result["last_user_info"] = { "nickname": lastest_answer and lastest_answer.user.nickname or "", "portrait": lastest_answer and lastest_answer.user.portrait or "", } return result def get_unread_answer_vote_num_by_user_id(user_id): """ 通过 user_id 获取用户未读得回答点赞数 :param user_id: user_id :return: int count """ count = 0 if not user_id: return count votes = AnswerVote.objects.filter(answer__user=user_id, unread=True, is_fake=False) count = votes.count() return count def get_unread_answer_reply_vote_num_by_user_id(user_id): """ 通过 user_id 获取用户未读得回答评论点赞数 :param user_id: user_id :return: int count """ count = 0 if not user_id: return count votes = AnswerVoteReply.objects.filter(answerreply__user=user_id, unread=True) count = votes.count() return count def get_answer_vote_infos_by_ids(vote_ids): """ 通过 answer_vote_id 获取models 信息 :param vote_ids: list [] :return: dict {} """ result = {} if not vote_ids: return result votes = AnswerVote.objects.filter(id__in=vote_ids) return {str(vote.id): vote.to_dict() for vote in votes} def get_answer_replies_by_ids(reply_ids): """ 通过 answer_reply_id 获取 models 信息 :param reply_ids: list [] :return: dict {} """ result = {} if not reply_ids: return result replies = AnswerReply.objects.filter(id__in=reply_ids) return {str(reply.id): reply for reply in replies} def get_qa_content_text(qa_platform, qa_content): ''' 7675 问答支持图文混排,content内容格式变化为jsonsting :param qa_platform: :param qa_content: 可为空 :return: ''' if qa_platform == GRABBING_PLATFORM.GM and qa_content: content = '' original_content = json.loads(qa_content) for c in original_content: if int(c.get('type', -1)) == MIXED_TYPE.WORDS: content += c.get('content') else: content = qa_content return content def format_qa_content(qa_platform, content): ''' 格式化问答内容,7675后支持图文混排 v 7.7.10 update 新增问答内容类型 (视频...) :param qa_platform: :param content: :return: str update dict key为表中字段 ''' content_type = QA_CONTENT_TYPE.ORDINARY if qa_platform == GRABBING_PLATFORM.GM and content: original_content = json.loads(content) richtext_content = '' for c in original_content: if int(c.get('type', -1)) == MIXED_TYPE.WORDS: richtext_content += '<p>'+c.get('content')+'</p>' elif int(c.get('type', -1)) == MIXED_TYPE.IMAGE: richtext_content += '<img src='+'"'+get_w_path(c.get('content'))+'"'+'/>' elif int(c.get('type', -1)) == MIXED_TYPE.VIDEO: richtext_content += '<video src="{0}" controls="" width=100%></video>'.format( settings.VIDEO_HOST + c.get('content')) if richtext_content and bool(html.fromstring(richtext_content).xpath("//video[1]")): content_type = QA_CONTENT_TYPE.VIDEO else: richtext_content = content return { "content": richtext_content, "content_type": content_type, } def get_media_info_from_content(qa_platform, qa_content): """ 从图文混排内容中获取媒体相关(图片)数据 :param qa_platform: :param qa_content: :return: """ result = { "images_list": [], } if qa_platform == GRABBING_PLATFORM.GM and qa_content: original_content = json.loads(qa_content) _images_list = [] for c in original_content: if int(c.get('type', -1)) == MIXED_TYPE.IMAGE: _image_url = c.get("content", "") if _image_url: _images_list.append(_image_url) result["images_list"] = _images_list return result def refine_qa_content_data(content_list): """ 针对富文本类型,细化问答内容 :param content_list: 注意这里是list数据结构 :return: """ results = { "content": "", "images_list": [], "video_list": [], } if content_list: for c in content_list: _content = c.get("content", "") _type = int(c.get('type', -1)) if _type == MIXED_TYPE.WORDS: results["content"] += _content elif _type == MIXED_TYPE.IMAGE and _content: results["images_list"].append(_content) elif _type == MIXED_TYPE.VIDEO and _content: results["video_list"].append(_content) return results def format_qa_content_v2(qa_platform, content_data): ''' 格式化问答内容,7675后支持图文混排 v 7.7.10 update 新增问答内容类型 (视频...) :param qa_platform: :param content_data: :return: str update dict key为表中字段 ''' content_type = QA_CONTENT_TYPE.ORDINARY if qa_platform in QA_CONTENT_CONVERT_TO_RICH_TEXT_PLATFORM and content_data: richtext_content = '' for c in content_data: if int(c.get('type', -1)) == MIXED_TYPE.WORDS: richtext_content += '<p>'+c.get('content')+'</p>' elif int(c.get('type', -1)) == MIXED_TYPE.IMAGE: richtext_content += '<img src='+'"'+get_w_path(c.get('content'))+'"'+'/>' elif int(c.get('type', -1)) == MIXED_TYPE.VIDEO: richtext_content += '<video src="{0}" controls="" width=100%></video>'.format( settings.VIDEO_HOST + c.get('content')) if richtext_content and bool(html.fromstring(richtext_content).xpath("//video[1]")): content_type = QA_CONTENT_TYPE.VIDEO else: richtext_content = content_data return { "content": richtext_content, "content_type": content_type, } def get_media_extra_info(media_list, is_video=False): """ 获取媒体信息的其他数据 :param media_list: 媒体数据列表 [地址..] :param is_video: 是否是视频, 不知道会不会用到 :return: """ _new_media_list = [] default_base_data = { "width": 0, "height": 0, } for media_url in media_list: if is_video: _media_key = "video_url" try: _base_data = get_video_base_info(urljoin(settings.VIDEO_HOST, media_url)) except: logging_exception() _base_data = default_base_data else: _media_key = "image_url" media_url = Picture(media_url).raw # 发现客户端传递的图片地址是带后缀的,所以在这里处理下将后缀去掉 try: _base_data = get_image_base_info(media_url) except: logging_exception() _base_data = default_base_data _data = { _media_key: media_url } if not all(_base_data.values()): _base_data = default_base_data _data.update(_base_data) _new_media_list.append(_data) return _new_media_list def _get_content_text(rich_text): """ 富文本转成文本 :param rich_text: :return: """ rich_text = '<div>' + rich_text + '</div>' if not rich_text: return "" soup = BeautifulSoup(rich_text, 'lxml') return soup.get_text() def get_head_image_info(head_image_list): """ 获取头图信息 :param image_list: :return: """ image_list = [] if not head_image_list: return image_list default_base_data = { "width": 0, "height": 0, } for image_dict in head_image_list: image_url = image_dict.get("image", "") if not image_url: continue width = image_dict.get("width", 0) height = image_dict.get("height", 0) image_url = Picture(image_url).raw # 发现客户端传递的图片地址是带后缀的,所以在这里处理下将后缀去掉 if not (width and height): try: _base_data = get_image_base_info(image_url) except: logging_exception() _base_data = default_base_data width = width or _base_data.get("width", 0) height = height or _base_data.get("height", 0) image_info = dict( image_url=image_url, width=width, height=height, image_url_source= MEDIA_IMAGE_URL_SOURCE.HEAD ) image_list.append(image_info) return image_list