1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import re
import queue
import threading
from django.conf import settings
from django.core.management import BaseCommand
from gm_types.mimas.qa import VIDEO_SOURCE_TYPE
from talos.models.topic.video import VideoCover
from qa.models.answer import Answer, Question
from utils.UploadVideoPicture import UploadVideoPicture
from talos.logger import info_logger
class ThreadPool(object):
def __init__(self, max_thread=20):
self.queue = queue.Queue(max_thread)
for i in range(max_thread):
self.queue.put(threading.Thread)
def get_thread(self):
return self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
class UploadVideoCover(object):
def __init__(self, model):
self._errorList = []
self.retrytimes = 3
self.videoQueue = queue.Queue()
self.querysetlist = []
self.model = model
self.source_type = VIDEO_SOURCE_TYPE.ANSWER if self.model == Answer else VIDEO_SOURCE_TYPE.QUESTION
def fetchvideo(self):
content_list = self.model.objects.filter(content__isnull=False).values("content","id")
video_url = []
for item in content_list:
video_list = re.findall('(' + settings.VIDEO_HOST + '.*?)\"', item["content"])
if video_list:
for _item in video_list:
video_url.append((_item, item["id"]))
return self.put_into_queue(video_url)
def put_into_queue(self, video_url):
for item in video_url:
self.videoQueue.put((item, self.retrytimes))
return self.videoQueue
def process(self, video, pool):
self.uploadcover(video)
pool.add_thread()
def uploadcover(self, video):
video_url = video[0]
retrytimes = video[1]
if retrytimes > 0:
try:
print("sql列表长度:", len(self.querysetlist))
video_cover = UploadVideoPicture(video_url[0])
self.querysetlist.append(
VideoCover(source_id=video_url[1], source_type=self.source_type, video_url=video_url[0].replace(settings.VIDEO_HOST, ''), video_pic=video_cover))
self.savecoverurl()
except:
self.catchexcept(video_url, retrytimes)
def savecoverurl(self):
print("sql队列长度:",self.querysetlist.__len__())
if self.querysetlist.__len__() > 50:
VideoCover.objects.bulk_create(self.querysetlist)
print("save success")
def catchexcept(self, video_url, retrytimes):
if self.retrytimes > 0:
self.videoQueue.put((video_url, int(retrytimes) - 1))
self._errorList.append(video_url)
def logerror(self):
if self._errorList:
info_logger.info('error_video_list:{0}'.format(self._errorList))
print(self._errorList)
def final(self):
VideoCover.objects.bulk_create(self.querysetlist)
print("save success")
def upload_video_cover(model=Answer):
uploadpideocover = UploadVideoCover(model=model)
pendingQueue = uploadpideocover.fetchvideo()
pool = ThreadPool(4)
while True:
if pendingQueue.qsize():
print("队列长度:", pendingQueue.qsize())
thread = pool.get_thread()
thread(target=uploadpideocover.process(pendingQueue.get(), pool))
else:
break
uploadpideocover.final()
print("错误列表:", uploadpideocover._errorList)
uploadpideocover.logerror()
class Command(BaseCommand):
def handle(self, *args, **kwargs):
print("start")
print("process Answer model")
upload_video_cover(model=Answer)
print("process Question model")
upload_video_cover(model=Question)