#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import unicode_literals, absolute_import, print_function from collections import defaultdict from operator import itemgetter from urllib.parse import urljoin from django.conf import settings from django.db.models import Q from gm_types.mimas import ( VIDEO_SOURCE_TYPE, MEDIA_IMAGE_URL_SOURCE, ) from qa.models.answer import ( QuestionImage, AnswerImage, ) from talos.models.topic.video import VideoCover from qa.tasks import qa_image_add_base_info from qa.tasks.get_video_blurcover import get_video_blurcover from utils.common import get_data_from_rich_text class QaMediaService(object): def __init__(self, model, model_name, model_relation_name): self.model = model # 问答图片的model self.model_name = model_name # 随意定义的model别名 self.model_relation_name = model_relation_name # 图片表关联的id名 @staticmethod def _video_list_sorted(db_video_list, content_video_list): """ 对视频排序 :param db_video_list: 从数据库中查到的数据 :param content_video_list: 从富文本内容里获取的数据 :return: """ result = { "video_list": [], "diff_video_list": [], } _all_video_list = [] _db_video_url_list = list(map(itemgetter("video_url"), db_video_list)) # 看下共有的视频地址。数据库只记录,并未执行删除操作!!! 所以这块要注意下!!! _need_add_video_list = set(_db_video_url_list) & set(content_video_list) _all_video_list.extend(list(filter( lambda _item: _item.get("video_url", "") in _need_add_video_list, db_video_list) )) # 看下有没有不存在于数据库中的视频 _diff_video_list = set(content_video_list) - set(_db_video_url_list) if _diff_video_list: for video_url in _diff_video_list: _data = { "video_url": video_url, "video_pic": urljoin(video_url, settings.VIDEO_PIC_URL), "short_video_url": "", "width": 0, "height": 0, } _all_video_list.append(_data) # 保证视频顺序 result["video_list"] = sorted(_all_video_list, key=lambda _item: content_video_list.index(_item["video_url"])) result["diff_video_list"] = list(_diff_video_list) return result @staticmethod def _get_qa_videos_by_qa_ids(qa_ids, source_type): """ 从数据库中获取视频信息 :param qa_ids: :param source_type: :return: """ videos_from_db = VideoCover.objects.filter( source_id__in=qa_ids, source_type=source_type ).values("video_url", "video_pic", "webp_url", "width", "height", "source_id") _results = defaultdict(list) for video_info in videos_from_db: _id = video_info["source_id"] _data = { "video_url": urljoin(settings.VIDEO_HOST, video_info["video_url"]), # 视频地址 --> 域名 + 短链 "video_pic": video_info.get("video_pic") or "", "short_video_url": video_info.get("webp_url") or "", "width": video_info.get("width", 0), "height": video_info.get("height", 0), } _results[_id].append(_data) return dict(_results) def get_qa_videos(self, qa_data, source_type): """ 获取问答的视频 :param qa_data: {question_id: content} 类似于这种的数据结构 :param source_type: :return: """ result = {} # 条件判断 if not all([qa_data, source_type]) or source_type not in VIDEO_SOURCE_TYPE: # 参数都不存在 或是 枚举值不对 return result _qa_ids = list(map(int, qa_data.keys())) videos_list_from_db = self._get_qa_videos_by_qa_ids(_qa_ids, source_type) for _id, _content in qa_data.items(): _, content_video_list = get_data_from_rich_text(_content, u'//video/@src') # 从内容中获取视频 --> 视频地址为完整地址 if not content_video_list: continue db_video_list = videos_list_from_db.get(_id, []) _sorted_video_dic = self._video_list_sorted( db_video_list=db_video_list, content_video_list=content_video_list ) # 如果存在未入库的数据,则走异步任务去处理 _diff_video_list = _sorted_video_dic.get("diff_video_list", []) if _diff_video_list: get_video_blurcover.delay( source_id=_id, source_type=source_type, video_list=_diff_video_list) result[_id] = _sorted_video_dic.get("video_list", []) return result @staticmethod def _sorted_qa_images(content_images, need_sorted_images): """ 仅在获取新的图片数据中有用 :param content_images: 富文本内容中的图片列表 :param need_sorted_images:待排序的图片数据列表 :return: """ can_sorted_images, others = [], [] content_images = list(map(lambda item: item.split("-")[0], content_images)) for image_item in need_sorted_images: if image_item.get("image", "") in content_images: can_sorted_images.append(image_item) else: others.append(image_item) _images_list = sorted( can_sorted_images, key=lambda item: content_images.index(item.get("image", "")) ) _images_list.extend(others) return _images_list def _get_qa_images_by_qa_ids(self, qa_ids, image_url_sources=[]): """ 通过id获取图 :param qa_ids: :param image_url_sources:[] 图片地址来源 :return: { qa_id: {"images_list": [], "image_from_rich_text"} } """ _result = {} values_name = ["image_url", "image_webp", "width", "height", "image_url_source", self.model_relation_name] if self.model_relation_name == "question_id": query = Q(question_id__in=qa_ids, image_url_source__in=image_url_sources) elif self.model_relation_name == "answer_id": query = Q(answer_id__in=qa_ids, image_url_source__in=image_url_sources) else: query = Q() if not query: return _result qq_images = self.model.objects.filter(query).values(*values_name) for _image in qq_images: _qa_id = _image.get(self.model_relation_name, 0) _image_url = _image.get("image_url", "") _image_url_source = _image.get("image_url_source", "") # 如果问题id不存在则先创建 if _qa_id not in _result: _result[_qa_id] = { "images_list": [], "image_from_rich_text": 0, # 图片来自于内容的数量 } if _image_url_source == MEDIA_IMAGE_URL_SOURCE.RICH_TEXT: _result[_qa_id]["image_from_rich_text"] += 1 _data = { "image": _image_url, "image_url": _image_url, "width": _image.get("width", 0), "height": _image.get("height", 0), "image_webp": _image.get("image_webp", ""), } _result[_qa_id]["images_list"].append(_data) return _result def get_qa_images(self, qa_data, image_url_sources=None): """ 获取问答图片,需要从富文本中过滤一波 :param qa_data: {question_id: "content"} :param image_url_sources 图片地址来源 :return: """ _qa_ids = list(map(int, qa_data.keys())) if not image_url_sources: # 图片地址来源 image_url_sources = [MEDIA_IMAGE_URL_SOURCE.CREATE, MEDIA_IMAGE_URL_SOURCE.RICH_TEXT] question_images_from_db = self._get_qa_images_by_qa_ids( _qa_ids, image_url_sources=image_url_sources ) result = {} for k, v in qa_data.items(): _image_dic = question_images_from_db.get(k, {}) if image_url_sources and MEDIA_IMAGE_URL_SOURCE.RICH_TEXT in image_url_sources: _, content_images = get_data_from_rich_text(v, u"//img/@src") # 从富文本中取图 _source_from_rich_text_nums = _image_dic.get("image_from_rich_text", 0) _images_list = _image_dic.get("images_list", []) # 富文本中有图,但数据库里没图,则触发 if not _source_from_rich_text_nums and content_images: qa_image_add_base_info.delay( images_list=list(map(lambda item: item.split("-")[0], content_images)), params_info={ "model": self.model_name, "id": k, }, ) images_list = self._sorted_qa_images(content_images, _images_list) else: images_list = _image_dic.get("images_list", []) result[k] = images_list return result def get_qa_header_images(self, qa_ids, image_count=9): """ 获取问答的头图 :param qa_ids: :param image_count: :return: """ result = {} if not qa_ids: return result qa_header_images_from_db = self._get_qa_images_by_qa_ids( qa_ids, image_url_sources=[MEDIA_IMAGE_URL_SOURCE.HEAD] ) for qa_id in qa_ids: _data = qa_header_images_from_db.get(qa_id, {}) if _data: result[qa_id] = (_data.get("images_list") or [])[:image_count] return result question_media = QaMediaService( model=QuestionImage, model_name="question_image", # 触发异步任务用的 model_relation_name="question_id" ) answer_media = QaMediaService( model=AnswerImage, model_name="answer_image", # 触发异步任务用的 model_relation_name="answer_id" )