1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# coding=utf-8
"""更新日记本评论数。"""
import math
import time
from multiprocessing import Pool, Manager
from django.core.management.base import BaseCommand
from django.db.models import Q, Max
from django import db
from django.conf import settings
from talos.models.diary import Diary
def update_diary_reply_num(queue, limit):
"""更新limit条日记本信息。
其中我们的开始地址从queue中获取,每次进程工作的时候会将当前的最大日记本放入到相关的
"""
start_id = queue.get()
print(start_id)
diarys = Diary.objects.filter(Q(pk__gt=start_id))[: limit]
max_id = diarys.aggregate(max_id=Max('id'))
queue.put(max_id["max_id"])
if not diarys:
return
for diary in diarys:
reply_num = sum(list(diary.topics.filter(is_online=True).values_list('reply_num', flat=True)))
if diary.reply_num == reply_num:
continue
diary.reply_num = reply_num
diary.save(update_fields=["reply_num"])
class Command(BaseCommand):
def handle(self, *args, **options):
"""更新 api_diary 中的评论数(reply_num) 。"""
print('------ starting -----')
start_time = time.time()
print("start at: ", start_time)
queue = Manager().Queue(maxsize=4)
queue.put(0) # 触发程序开始
args_list = []
per_num = 200
count = Diary.objects.filter(is_online=True).count()
cnt = int(math.ceil(count/per_num))
for _ in range(cnt):
args_list.append((queue, per_num))
db.connections.close_all()
pool = Pool(processes=4)
pool.starmap(update_diary_reply_num, args_list)
pool.close()
pool.join()
end_time = time.time()
print("end at: ", end_time)
print('total use {} s.'.format(end_time - start_time))
print('Done!')