#!/usr/bin/env python # -*- coding: utf-8 -*- """ 问答视频,图片获取宽高并入库 """ import math import multiprocessing import time from itertools import chain from urllib.parse import urljoin from django.conf import settings from django.core.management import BaseCommand from gm_upload import get_video_base_info from gm_upload.utils.image_utils import Picture from gm_types.mimas.enum import MEDIA_IMAGE_URL_SOURCE from gm_types.mimas.qa import VIDEO_SOURCE_TYPE from hera.queries.qa import get_media_list_from_content from qa.models import Answer, Question, AnswerImage, QuestionImage from qa.utils.image import get_image_base_info from talos.models.topic.video import VideoCover step = 1000 # 步长 def _sync_image_data(query_obj): """ 图片数据同步入库 :param query_obj:对象 :return: """ if query_obj.width: return _image_url = Picture(query_obj.image_url).raw base_data = get_image_base_info(_image_url) query_obj.width = base_data.get("width") or 0 query_obj.height = base_data.get("height") or 0 query_obj.save() print("qa original image add width and height, model:{0};id:{1}".format(query_obj.__class__.__name__, query_obj.id)) def _sync_video_data(query_obj): """ 视频数据同步入库 :param query_obj: :return: """ if query_obj.width: return base_data = get_video_base_info(urljoin(settings.VIDEO_HOST, query_obj.video_url)) query_obj.width = base_data.get("width") or 0 query_obj.height = base_data.get("height") or 0 query_obj.save() print("video add width and height, video_id:{}".format(query_obj.id)) def _async_answer_images(answer_obj): """ 将回答内容中的图片入到回答图库中 :param answer_obj: :return: """ images_form_content = get_media_list_from_content(answer_obj.content, u'//img/@src') _answer_id = answer_obj.id old_images = set(AnswerImage.objects.filter(answer_id=_answer_id).values_list("image_url", flat=True)) _bulk_create_list = [] for image_dic in images_form_content: _image_url = image_dic.get("image_url", "") if _image_url in old_images: continue else: if _image_url: image_dic.update({ "answer_id": _answer_id, "image_url_source": MEDIA_IMAGE_URL_SOURCE.RICH_TEXT, }) _bulk_create_list.append(AnswerImage(**image_dic)) if _bulk_create_list: AnswerImage.objects.bulk_create(_bulk_create_list) print("answer rich_content images add width and height, answer_id:{}".format(_answer_id)) def _async_question_images(question_obj): """ 将问题内容中的图片入到问题图库中 :param question_obj: :return: """ images_form_content = get_media_list_from_content(question_obj.content, u'//img/@src') _question_id = question_obj.id old_images = set(QuestionImage.objects.filter(question_id=_question_id).values_list("image_url", flat=True)) _bulk_create_list = [] for image_dic in images_form_content: _image_url = image_dic.get("image_url", "") if _image_url in old_images: continue else: if _image_url: image_dic.update({ "question_id": _question_id, "image_url_source": MEDIA_IMAGE_URL_SOURCE.RICH_TEXT, }) _bulk_create_list.append(QuestionImage(**image_dic)) if _bulk_create_list: QuestionImage.objects.bulk_create(_bulk_create_list) print("question rich_content images add width and height, question_id:{}".format(_question_id)) class QAHandleFactory(object): """ 问答处理工厂 """ @classmethod def handle_original_info(cls, pool): """ 先处理原始数据 :return: """ print("start handle original image") print("start handle answer image") # 取到最大的回答图片索引id last_answer_image_id = AnswerImage.objects.order_by("-id").values_list("id", flat=True).first() answer_image_start_num = 1 for i in range(int(math.ceil(last_answer_image_id/step))): answer_image_end_num = step * (i + 1) for image_obj in AnswerImage.objects.filter( pk__range=[answer_image_start_num, answer_image_end_num], width=0 ).iterator(): pool.apply_async(_sync_image_data, args=(image_obj,)) answer_image_start_num = answer_image_end_num print("end handle answer image") print("start handle question image") # 取最大的问题图片索引id last_question_image_id = QuestionImage.objects.order_by("-id").values_list("id", flat=True).first() question_image_start_num = 1 for i in range(int(math.ceil(last_question_image_id / step))): question_image_end_num = step * (i + 1) for image_obj in AnswerImage.objects.filter( pk__range=[question_image_start_num, question_image_end_num], width=0 ).iterator(): pool.apply_async(_sync_image_data, args=(image_obj,)) question_image_start_num = question_image_end_num print("end handle question image") print("end handle original image") @classmethod def handle_video_info(cls, pool): """ 处理视频数据 :return: """ print("start handle original video") video_last_id = VideoCover.objects.order_by("-id").values_list("id", flat=True).first() video_start_num = 1 for i in range(int(math.ceil(video_last_id/step))): video_end_num = step * (i + 1) for video_obj in VideoCover.objects.filter( pk__range=[video_start_num, video_end_num], source_type__in=[VIDEO_SOURCE_TYPE.QUESTION, VIDEO_SOURCE_TYPE.ANSWER], width=0 ).iterator(): pool.apply_async(_sync_video_data, args=(video_obj, )) video_start_num = video_end_num print("end handle original video") @classmethod def handle_rich_text_data(cls, pool): """ 处理富文本类型的数据,这里用进程池吧 :return: """ print("start handle answer rich_text content") answer_last_id = Answer.objects.order_by("-id").values_list("id", flat=True).first() answer_start_num = 1 for i in range(int(math.ceil(answer_last_id / step))): answer_end_num = step * (i + 1) for answer_obj in Answer.objects.filter( pk__range=[answer_start_num, answer_end_num], is_online=True ).only("content", "id").iterator(): pool.apply_async(_async_answer_images, args=(answer_obj, )) answer_start_num = answer_end_num print("end handle answer rich_text content") print("start handle question rich_text content") question_last_id = Question.objects.order_by("-id").values_list("id", flat=True).first() question_start_num = 1 for i in range(int(math.ceil(question_last_id / step))): question_end_num = step * (i + 1) print(question_start_num, question_end_num) for question_obj in Question.objects.filter( pk__range=[question_start_num, question_end_num], is_online=True ).only("content", "id").iterator(): pool.apply_async(_async_question_images, args=(question_obj, )) question_start_num = question_end_num print("end handle question rich_text content") class Command(BaseCommand): def handle(self, *args, **options): print("START") start_time = time.time() pool = multiprocessing.Pool(processes=4) QAHandleFactory.handle_original_info(pool=pool) QAHandleFactory.handle_video_info(pool=pool) QAHandleFactory.handle_rich_text_data(pool=pool) pool.close() pool.join() end_time = time.time() print("total time: {}".format(end_time - start_time)) print("END")