import re import queue import threading from django.conf import settings from django.core.management import BaseCommand from gm_types.mimas.qa import VIDEO_SOURCE_TYPE from talos.models.topic.video import VideoCover from qa.models.answer import Answer, Question from utils.UploadVideoPicture import UploadVideoPicture from talos.logger import info_logger class ThreadPool(object): def __init__(self, max_thread=20): self.queue = queue.Queue(max_thread) for i in range(max_thread): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) class UploadVideoCover(object): def __init__(self, model): self._errorList = [] self.retrytimes = 3 self.videoQueue = queue.Queue() self.querysetlist = [] self.model = model self.source_type = VIDEO_SOURCE_TYPE.ANSWER if self.model == Answer else VIDEO_SOURCE_TYPE.QUESTION def fetchvideo(self): content_list = self.model.objects.filter(content__isnull=False).values("content","id") video_url = [] for item in content_list: video_list = re.findall('(' + settings.VIDEO_HOST + '.*?)\"', item["content"]) if video_list: for _item in video_list: video_url.append((_item, item["id"])) return self.put_into_queue(video_url) def put_into_queue(self, video_url): for item in video_url: self.videoQueue.put((item, self.retrytimes)) return self.videoQueue def process(self, video, pool): self.uploadcover(video) pool.add_thread() def uploadcover(self, video): video_url = video[0] retrytimes = video[1] if retrytimes > 0: try: print("sql列表长度:", len(self.querysetlist)) video_cover = UploadVideoPicture(video_url[0]) self.querysetlist.append( VideoCover(source_id=video_url[1], source_type=self.source_type, video_url=video_url[0].replace(settings.VIDEO_HOST, ''), video_pic=video_cover)) self.savecoverurl() except: self.catchexcept(video_url, retrytimes) def savecoverurl(self): print("sql队列长度:",self.querysetlist.__len__()) if self.querysetlist.__len__() > 50: VideoCover.objects.bulk_create(self.querysetlist) print("save success") def catchexcept(self, video_url, retrytimes): if self.retrytimes > 0: self.videoQueue.put((video_url, int(retrytimes) - 1)) self._errorList.append(video_url) def logerror(self): if self._errorList: info_logger.info('error_video_list:{0}'.format(self._errorList)) print(self._errorList) def final(self): VideoCover.objects.bulk_create(self.querysetlist) print("save success") def upload_video_cover(model=Answer): uploadpideocover = UploadVideoCover(model=model) pendingQueue = uploadpideocover.fetchvideo() pool = ThreadPool(4) while True: if pendingQueue.qsize(): print("队列长度:", pendingQueue.qsize()) thread = pool.get_thread() thread(target=uploadpideocover.process(pendingQueue.get(), pool)) else: break uploadpideocover.final() print("错误列表:", uploadpideocover._errorList) uploadpideocover.logerror() class Command(BaseCommand): def handle(self, *args, **kwargs): print("start") print("process Answer model") upload_video_cover(model=Answer) print("process Question model") upload_video_cover(model=Question)