1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import time
import queue
import math
from concurrent.futures import ThreadPoolExecutor
from qa.models.answer import UserAnswerQuestion, Answer, Question
from django.core.management import BaseCommand
from django.db.models import Q
from django.conf import settings
answer_queue = queue.Queue()
question_queue = queue.Queue()
max_answer_info = Answer.objects.using(settings.SLAVE_DB_NAME).filter(Q(is_online=False) | Q(question__is_online=False)).last()
max_question_info = Question.objects.using(settings.SLAVE_DB_NAME).filter(is_online=False).last()
def handle_answer(answer_queue):
last_answer_id = answer_queue.get()
if max_answer_info.id <= last_answer_id:
answer_queue.put(last_answer_id)
return
print("last_answer_id=======", last_answer_id, max_answer_info.id)
per_num = 500
answers = Answer.objects.using(settings.SLAVE_DB_NAME).filter(id__gt=last_answer_id, is_online=False)[:per_num]\
.values_list('id', flat=True)
UserAnswerQuestion.objects.using(settings.SLAVE_DB_NAME).filter(answer_id__in=list(answers)).update(is_online=False)
last_answer_id = list(answers)[-1]
answer_queue.put(last_answer_id)
def clean_answer():
answer_count = Answer.objects.using(settings.SLAVE_DB_NAME).filter(Q(is_online=False) | Q(question__is_online=False)).count()
answer_queue.put(0) # 触发程序开始
per_num = 500
cnt = math.ceil(float(answer_count) / per_num)
print(cnt)
with ThreadPoolExecutor(max_workers=4) as ex:
for i in range(cnt):
ex.submit(handle_answer, answer_queue)
#handle_answer(answer_queue)
def handle_question(answer_queue):
last_question_id = question_queue.get()
if max_question_info.id <= last_question_id:
answer_queue.put(last_question_id)
return
per_num = 500
print("last_question_id=======", last_question_id)
questions = Question.objects.using(settings.SLAVE_DB_NAME).filter(id__gt=last_question_id, is_online=False)[:per_num]\
.values_list('id', flat=True)
UserAnswerQuestion.objects.using(settings.SLAVE_DB_NAME).filter(question_id__in=list(questions)).update(is_online=False)
last_question_id = list(questions)[-1]
question_queue.put(last_question_id)
def clean_question():
question_count = Question.objects.using(settings.SLAVE_DB_NAME).filter(is_online=False).count()
question_queue.put(0) # 触发程序开始
per_num = 500
cnt = math.ceil(float(question_count) / per_num)
with ThreadPoolExecutor(max_workers=4) as ex:
for i in range(cnt):
ex.submit(handle_question, question_queue)
class Command(BaseCommand):
def handle(self, *args, **options):
print('------ starting -----')
start_time = time.time()
print("start at: ", start_time)
clean_question()
clean_answer()
end_time = time.time()
print("end at: ", end_time)
print('total use {} s.'.format(end_time - start_time))
print('Done!')