# coding=utf-8 """更新日记本评论数。""" import math import time from multiprocessing import Pool, Manager from django.core.management.base import BaseCommand from django.db.models import Q, Max from django import db from django.conf import settings from talos.models.diary import Diary def update_diary_reply_num(queue, limit): """更新limit条日记本信息。 其中我们的开始地址从queue中获取,每次进程工作的时候会将当前的最大日记本放入到相关的 """ start_id = queue.get() print(start_id) diarys = Diary.objects.filter(Q(pk__gt=start_id))[: limit] max_id = diarys.aggregate(max_id=Max('id')) queue.put(max_id["max_id"]) if not diarys: return for diary in diarys: reply_num = sum(list(diary.topics.filter(is_online=True).values_list('reply_num', flat=True))) if diary.reply_num == reply_num: continue diary.reply_num = reply_num diary.save(update_fields=["reply_num"]) class Command(BaseCommand): def handle(self, *args, **options): """更新 api_diary 中的评论数(reply_num) 。""" print('------ starting -----') start_time = time.time() print("start at: ", start_time) queue = Manager().Queue(maxsize=4) queue.put(0) # 触发程序开始 args_list = [] per_num = 200 count = Diary.objects.filter(is_online=True).count() cnt = int(math.ceil(count/per_num)) for _ in range(cnt): args_list.append((queue, per_num)) db.connections.close_all() pool = Pool(processes=4) pool.starmap(update_diary_reply_num, args_list) pool.close() pool.join() end_time = time.time() print("end at: ", end_time) print('total use {} s.'.format(end_time - start_time)) print('Done!')