import queue from multiprocessing import Queue, Process import time from django import db from django.conf import settings from django.core.management import BaseCommand from django.db.models import Count from talos.models.tractate import TractateReply, Tractate, TractateExtra log_path = '/tmp/' def product(q, tractate_max_id): batch_size = 500 start_tractate_id = 1 log_str = 'start_id:{}, reply_count_list: {} \n' while start_tractate_id < tractate_max_id: log_info = [] tractate_ids = list(Tractate.objects.filter( id__gte=start_tractate_id ).values_list('id', flat=True).order_by('id')[:batch_size]) replies = list(TractateReply.objects.using(settings.SLAVE_DB_NAME).filter( tractate_id__in=tractate_ids, is_online=True ).annotate(reply_count=Count('tractate_id')).values_list('tractate_id').annotate(Count('tractate_id'))) for (tractate_id, reply_count) in replies: q.put((tractate_id, reply_count)) log_info.append( log_str.format( start_tractate_id, replies ) ) start_tractate_id = max(tractate_ids) if log_info: _file_path = log_path + "count.log" with open(_file_path, "a+") as f: f.writelines(log_info) def consumer(q): while 1: try: info = q.get(timeout=2) except queue.Empty: break if not info: break tractate_id, reply_count = info print(info) TractateExtra.objects.filter(tractate_id=tractate_id).update(reply_count=reply_count) class Command(BaseCommand): """ python django_manage.py tractate_reply 帖子评论数清洗 """ def handle(self, *args, **options): start_time = time.time() tractate_max_id = Tractate.objects.last().id db.connections.close_all() q = Queue() process_list = [] p = Process(target=product, args=(q, tractate_max_id)) process_list.append(p) for i in range(8): c = Process(target=consumer, args=(q, )) process_list.append(c) for j in process_list: j.start() for j in process_list: j.join() end_time = time.time() print('max tractate id :{}, Done, {}'.format(tractate_max_id, int(end_time - start_time)))