from __future__ import unicode_literals, absolute_import, print_function import ffmpy import hashlib import json import os import requests import time from urllib.parse import urljoin from django.conf import settings from celery import shared_task # from celery_once import QueueOnce from gm_upload import ( video_clipping, upload_file, video_delete, get_video_base_info, ) from gm_upload.utils.qiniu_tool import QiniuTool from gm_types.gaia import ( VIDEO_CODE_STATUS, QINIU_VIDEO_HANDLE_STATUS_CODE, ) from utils.UploadVideoPicture import UploadVideoPicture from talos.models.topic.video import VideoCover from talos.rpc import logging_exception @shared_task def get_video_blurcover(source_id=None, source_type=None, video_list=None): for item in video_list: if source_type and source_id: # 防止数据被重复写入数据库 TODO : 将先判断是否存在数据库中,改为在celery队列中判断是否有重复的数据 video = VideoCover.objects.filter( source_id=source_id, source_type=source_type, video_url=item.replace(settings.VIDEO_HOST, '') ).first() if not video: create_data = { "source_id": source_id, "source_type": source_type, "video_pic": UploadVideoPicture(item), "video_url": item.replace(settings.VIDEO_HOST, ''), } create_data.update(get_video_base_info(item)) VideoCover.objects.create(**create_data) @shared_task def video_cover_fsm_runner(): """ 状态机 考虑封装成类,根据状态给对应worker添加任务 :return: """ # 截取视频前两秒 clipping_id_list = list(VideoCover.objects.filter( persistent_clip_status=VIDEO_CODE_STATUS.NOSTART ).values_list("id", flat=True)) for video_id in clipping_id_list: video_cover_for_rich_text_set_clipping_to_video.delay(video_id) # 检查七牛云截取状态 check_id_list = list(VideoCover.objects.filter( persistent_clip_status=VIDEO_CODE_STATUS.WAITING ).values_list("id", flat=True)) for video_id in check_id_list: check_video_cover_for_rich_text_clipping_is_finish.delay(video_id) # 将七牛云截取成功的 mp4 转换为 webP 动图并上传 set_id_list = list(VideoCover.objects.filter( persistent_clip_status=VIDEO_CODE_STATUS.OPERATING_LOCAL ).values_list("id", flat=True)) for video_id in set_id_list: set_video_cover_for_rich_text_webp_pic.delay(video_id) @shared_task def video_cover_for_rich_text_set_clipping_to_video(video_id): video = VideoCover.objects.get(id=video_id) if video.persistent_clip_status != VIDEO_CODE_STATUS.NOSTART: return hash_key = hashlib.md5(str(time.time()).encode("utf8")).hexdigest() pid = video_clipping( video.video_url, new_filename='{}_clip.mp4'.format(hash_key), video_type='mp4', water_mark_url=None # settings.WATER_MARK_URL_FOR_VIDEO ) video.persistent_clip_status = VIDEO_CODE_STATUS.WAITING video.persistent_clip_id = pid video.save(update_fields=['persistent_clip_id', 'persistent_clip_status']) @shared_task def check_video_cover_for_rich_text_clipping_is_finish(video_id): try: video = VideoCover.objects.get(id=video_id) if video.persistent_clip_status != VIDEO_CODE_STATUS.WAITING: return if video.persistent_clip_id: result = json.loads(requests.get( settings.QINIU_VIDEO_INQUIRE_HOST + video.persistent_clip_id ).text) # 状态码0成功,1等待处理,2正在处理,3处理失败,4通知提交失败。 # 如果请求失败,返回包含如下内容的JSON字符串{"error": ""} code = result.get('code') error = result.get('error', '') if error: return if code not in (QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS, QINIU_VIDEO_HANDLE_STATUS_CODE.PROCESSING_FAIL): return elif code == QINIU_VIDEO_HANDLE_STATUS_CODE.PROCESSING_FAIL: video.persistent_clip_status = VIDEO_CODE_STATUS.FAIL video.save(update_fields=['persistent_clip_status']) else: if result['items'][0]['code'] == QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS: mp4_key = result['items'][0]['key'] # 刷新cdn缓存 QiniuTool.refresh_qiniu_resource_cache([ urljoin(settings.VIDEO_HOST, mp4_key) ]) video.intercept_video_url = mp4_key video.persistent_clip_status = VIDEO_CODE_STATUS.OPERATING_LOCAL video.save(update_fields=['persistent_clip_status', 'intercept_video_url']) else: video.persistent_status = result['items'][0]['code'] video.save(update_fields=['persistent_clip_status']) except: logging_exception() @shared_task def set_video_cover_for_rich_text_webp_pic(video_id): video = VideoCover.objects.get(id=video_id) if video.persistent_clip_status != VIDEO_CODE_STATUS.OPERATING_LOCAL: return mp4_key = video.intercept_video_url hash_key = hashlib.md5(str(time.time()).encode("utf8")).hexdigest() input_path = os.path.join(settings.VIDEO_CONVERT_PATH, "{}.mp4".format(hash_key)) output_path = os.path.join(settings.VIDEO_CONVERT_PATH, '{}.webp'.format(hash_key)) try: clipping_video_url = settings.VIDEO_HOST + mp4_key res = requests.get(clipping_video_url) # 文件存储到本地 with open(input_path, 'wb') as f: f.write(res.content) # 使用底层 FFmpeg 库进行转码 ff = ffmpy.FFmpeg( inputs={input_path: None}, outputs={output_path: settings.VIDEO_FF_ARGS} ) ff.run() # 上传webP图片 video.webp_url = upload_file(output_path) video.persistent_clip_status = VIDEO_CODE_STATUS.SUCCESS video.save(update_fields=['webp_url', 'persistent_clip_status']) # 删除中间数据  video_delete(mp4_key) # 刷新缓存 QiniuTool.refresh_qiniu_resource_cache( [urljoin(settings.VIDEO_HOST, mp4_key)] ) except: logging_exception() video.persistent_clip_status = VIDEO_CODE_STATUS.FAIL video.save(update_fields=['persistent_clip_status']) finally: if os.path.exists(input_path): os.remove(input_path) if os.path.exists(output_path): os.remove(output_path)