import time import queue import math from concurrent.futures import ThreadPoolExecutor from qa.models.answer import UserAnswerQuestion, Answer, Question from django.core.management import BaseCommand from django.db.models import Q from django.conf import settings answer_queue = queue.Queue() question_queue = queue.Queue() max_answer_info = Answer.objects.using(settings.SLAVE_DB_NAME).filter(Q(is_online=False) | Q(question__is_online=False)).last() max_question_info = Question.objects.using(settings.SLAVE_DB_NAME).filter(is_online=False).last() def handle_answer(answer_queue): last_answer_id = answer_queue.get() if max_answer_info.id <= last_answer_id: answer_queue.put(last_answer_id) return print("last_answer_id=======", last_answer_id, max_answer_info.id) per_num = 500 answers = Answer.objects.using(settings.SLAVE_DB_NAME).filter(id__gt=last_answer_id, is_online=False)[:per_num]\ .values_list('id', flat=True) UserAnswerQuestion.objects.using(settings.SLAVE_DB_NAME).filter(answer_id__in=list(answers)).update(is_online=False) last_answer_id = list(answers)[-1] answer_queue.put(last_answer_id) def clean_answer(): answer_count = Answer.objects.using(settings.SLAVE_DB_NAME).filter(Q(is_online=False) | Q(question__is_online=False)).count() answer_queue.put(0) # 触发程序开始 per_num = 500 cnt = math.ceil(float(answer_count) / per_num) print(cnt) with ThreadPoolExecutor(max_workers=4) as ex: for i in range(cnt): ex.submit(handle_answer, answer_queue) #handle_answer(answer_queue) def handle_question(answer_queue): last_question_id = question_queue.get() if max_question_info.id <= last_question_id: answer_queue.put(last_question_id) return per_num = 500 print("last_question_id=======", last_question_id) questions = Question.objects.using(settings.SLAVE_DB_NAME).filter(id__gt=last_question_id, is_online=False)[:per_num]\ .values_list('id', flat=True) UserAnswerQuestion.objects.using(settings.SLAVE_DB_NAME).filter(question_id__in=list(questions)).update(is_online=False) last_question_id = list(questions)[-1] question_queue.put(last_question_id) def clean_question(): question_count = Question.objects.using(settings.SLAVE_DB_NAME).filter(is_online=False).count() question_queue.put(0) # 触发程序开始 per_num = 500 cnt = math.ceil(float(question_count) / per_num) with ThreadPoolExecutor(max_workers=4) as ex: for i in range(cnt): ex.submit(handle_question, question_queue) class Command(BaseCommand): def handle(self, *args, **options): print('------ starting -----') start_time = time.time() print("start at: ", start_time) clean_question() clean_answer() end_time = time.time() print("end at: ", end_time) print('total use {} s.'.format(end_time - start_time)) print('Done!')