1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# coding=utf-8
"""更新日记本评论到日记贴。"""
import copy
import math
import time
from multiprocessing import Pool, Manager
from django.core.management.base import BaseCommand
from django.db.models import Q, Max
from django import db
from django.conf import settings
from talos.models.diary import Diary
from talos.models.topic import TopicReply
filter_diary_query = Q(is_online=True)
filter_topicreply_query = Q(is_online=True) & Q(problem__isnull=True)
def update_diary_reply2topic(queue, limit):
start_id = queue.get()
diaries = Diary.objects.using(settings.SLAVE_DB_NAME).filter(Q(pk__gt=start_id) & filter_diary_query)[: limit]
max_id = diaries.aggregate(max_id=Max('id'))
queue.put(max_id["max_id"])
if not diaries:
return
for diary in diaries:
topics = diary.topics.order_by("created_time")
if not topics:
continue
replies_update = []
topics_update = []
replies = TopicReply.objects.filter(filter_topicreply_query & Q(diary_id=diary.id))
for reply in replies:
length = len(topics)
pre = None
for idx, topic in enumerate(list(topics)):
if not pre:
pre = topic
nex = topic
if pre != nex:
if reply.reply_date < pre.created_time or reply.reply_date >= nex.created_time:
pre = nex
if idx + 1 != length: # 不是最后一次(可能最后一次,时间归到最后一个日记帖)
continue
else:
if idx + 1 != length: # 可能只有一个topic
pre = nex
continue
reply.problem_id = pre.id
reply.diary_id = None
replies_update.append(reply)
pre.reply_num += 1
topics_update.append(pre)
break
[reply.save(update_fields=['problem_id', 'diary_id']) for reply in replies_update]
[topic.save(update_fields=['reply_num']) for topic in topics_update]
print(start_id)
class Command(BaseCommand):
def handle(self, *args, **options):
print('------ starting -----')
start_time = time.time()
print("start at: ", start_time)
queue = Manager().Queue(maxsize=1)
queue.put(0) # 触发程序开始
args_list = []
per_num = 200
count = Diary.objects.filter(filter_diary_query).count()
cnt = int(math.ceil(count/per_num))
for _ in range(cnt):
args_list.append((queue, per_num))
db.connections.close_all()
pool = Pool(processes=4)
pool.starmap(update_diary_reply2topic, args_list)
pool.close()
pool.join()
end_time = time.time()
print("end at: ", end_time)
print('total use {} s.'.format(end_time - start_time))
print('Done!')