1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import queue
from multiprocessing import Queue, Process
import time
from django import db
from django.conf import settings
from django.core.management import BaseCommand
from django.db.models import Count
from talos.models.tractate import TractateReply, Tractate, TractateExtra
log_path = '/tmp/'
def product(q, tractate_max_id):
batch_size = 500
start_tractate_id = 1
log_str = 'start_id:{}, reply_count_list: {} \n'
while start_tractate_id < tractate_max_id:
log_info = []
tractate_ids = list(Tractate.objects.filter(
id__gte=start_tractate_id
).values_list('id', flat=True).order_by('id')[:batch_size])
replies = list(TractateReply.objects.using(settings.SLAVE_DB_NAME).filter(
tractate_id__in=tractate_ids, is_online=True
).annotate(reply_count=Count('tractate_id')).values_list('tractate_id').annotate(Count('tractate_id')))
for (tractate_id, reply_count) in replies:
q.put((tractate_id, reply_count))
log_info.append(
log_str.format(
start_tractate_id, replies
)
)
start_tractate_id = max(tractate_ids)
if log_info:
_file_path = log_path + "count.log"
with open(_file_path, "a+") as f:
f.writelines(log_info)
def consumer(q):
while 1:
try:
info = q.get(timeout=2)
except queue.Empty:
break
if not info:
break
tractate_id, reply_count = info
print(info)
TractateExtra.objects.filter(tractate_id=tractate_id).update(reply_count=reply_count)
class Command(BaseCommand):
"""
python django_manage.py tractate_reply
帖子评论数清洗
"""
def handle(self, *args, **options):
start_time = time.time()
tractate_max_id = Tractate.objects.last().id
db.connections.close_all()
q = Queue()
process_list = []
p = Process(target=product, args=(q, tractate_max_id))
process_list.append(p)
for i in range(8):
c = Process(target=consumer, args=(q, ))
process_list.append(c)
for j in process_list:
j.start()
for j in process_list:
j.join()
end_time = time.time()
print('max tractate id :{}, Done, {}'.format(tractate_max_id, int(end_time - start_time)))