1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import queue
import threading
from django.core.management import BaseCommand
from talos.models.topic import Video
from talos.logger import info_logger
from utils.UploadVideoPicture import UploadVideoPicture
class ThreadPool(object):
def __init__(self, max_thread=20):
self.queue = queue.Queue(max_thread)
for i in range(max_thread):
self.queue.put(threading.Thread)
def get_thread(self):
return self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
class UploadVideoCover(object):
def __init__(self):
self._errorList = []
self.retrytimes = 3
self.videoQueue = queue.Queue()
def fetchvideo(self):
video_list = Video.objects.all()
for item in video_list:
self.videoQueue.put((item, self.retrytimes))
return self.videoQueue
def process(self, video, pool):
self.uploadandsavevideo(video)
pool.add_thread()
def uploadandsavevideo(self, video):
videoquery = video[0]
retrytimes = video[1]
if retrytimes > 0:
try:
video_cover = UploadVideoPicture(videoquery.video_url)
videoquery.video_pic = video_cover
videoquery.save()
print("save success", videoquery.id)
except:
self.catchexcept(videoquery, retrytimes)
def catchexcept(self, videoquery, retrytimes):
if self.retrytimes > 0:
self.videoQueue.put((videoquery, int(retrytimes) - 1))
self._errorList.append(videoquery.id)
def logerror(self):
if self._errorList:
info_logger.info('error_video_list:{0}'.format(self._errorList))
print(self._errorList)
class Command(BaseCommand):
def handle(self, *args, **kwargs):
print("start")
uploadpideocover = UploadVideoCover()
pendingQueue = uploadpideocover.fetchvideo()
pool = ThreadPool(4)
while pendingQueue:
print("队列长度:", pendingQueue.qsize())
thread = pool.get_thread()
thread(target=uploadpideocover.process(pendingQueue.get(), pool))
if pendingQueue.qsize() == 0:
break
uploadpideocover.logerror()