import queue import threading from django.core.management import BaseCommand from talos.models.topic import Video from talos.logger import info_logger from utils.UploadVideoPicture import UploadVideoPicture class ThreadPool(object): def __init__(self, max_thread=20): self.queue = queue.Queue(max_thread) for i in range(max_thread): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) class UploadVideoCover(object): def __init__(self): self._errorList = [] self.retrytimes = 3 self.videoQueue = queue.Queue() def fetchvideo(self): video_list = Video.objects.all() for item in video_list: self.videoQueue.put((item, self.retrytimes)) return self.videoQueue def process(self, video, pool): self.uploadandsavevideo(video) pool.add_thread() def uploadandsavevideo(self, video): videoquery = video[0] retrytimes = video[1] if retrytimes > 0: try: video_cover = UploadVideoPicture(videoquery.video_url) videoquery.video_pic = video_cover videoquery.save() print("save success", videoquery.id) except: self.catchexcept(videoquery, retrytimes) def catchexcept(self, videoquery, retrytimes): if self.retrytimes > 0: self.videoQueue.put((videoquery, int(retrytimes) - 1)) self._errorList.append(videoquery.id) def logerror(self): if self._errorList: info_logger.info('error_video_list:{0}'.format(self._errorList)) print(self._errorList) class Command(BaseCommand): def handle(self, *args, **kwargs): print("start") uploadpideocover = UploadVideoCover() pendingQueue = uploadpideocover.fetchvideo() pool = ThreadPool(4) while pendingQueue: print("队列长度:", pendingQueue.qsize()) thread = pool.get_thread() thread(target=uploadpideocover.process(pendingQueue.get(), pool)) if pendingQueue.qsize() == 0: break uploadpideocover.logerror()