1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# coding=utf-8
"""更新日记本title。
tag 获取逻辑
"""
import math
import time
from multiprocessing import Pool, Manager
from django.core.management.base import BaseCommand
from django.db.models import Q, Max
from django import db
from django.conf import settings
from gm_types.gaia import TAG_TYPE
from talos.models.diary import Diary
from utils.rpc import rpc_client
def update_diary_title(queue, limit, service_id_isnull=True):
"""更新日记本标题。"""
start_id = queue.get()
print("has_service_id:{0}, start_id: {1}".format(service_id_isnull, start_id))
query = Q(pk__gt=start_id, is_online=True, service_id__isnull=service_id_isnull)
diaries = Diary.objects.filter(query)[: limit]
if not diaries:
return
max_id = diaries.aggregate(max_id=Max('id'))
queue.put(max_id["max_id"])
print("queue put: ", max_id["max_id"])
service_tags = {}
if not service_id_isnull:
service_ids = list(set(diaries.values_list("service_id", flat=True)))
service_tags = rpc_client['api/services/tags/by_service_ids'](service_ids=service_ids).unwrap()
for diary in diaries:
name = ''
if diary.service_id:
# 获取美购tag第一个id
name = service_tags.get(str(diary.service_id))
# 获取日记本tag第一个id
if not name:
tags = list(filter(lambda i: i["type"] in [TAG_TYPE.BODY_PART, TAG_TYPE.BODY_PART_SUB_ITEM, TAG_TYPE.ITEM_WIKI],
diary.tags_new_era))
tags.sort(key=lambda x: x['tag_id'])
if tags:
name = tags[0]["name"]
if name and diary.title != name + '日记':
diary.title = name + "日记"
diary.save()
class Command(BaseCommand):
def handle(self, *args, **options):
"""更新 api_diary 中的title"""
print('------ starting -----')
start_time = time.time()
print("start at: ", start_time)
queue = Manager().Queue(maxsize=8)
queue.put(0) # 触发程序开始
# 没有美购的日记
args_list = []
per_num = 500
count = Diary.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, service_id__isnull=True).count()
cnt = int(math.ceil(count/per_num))
for _ in range(cnt):
args_list.append((queue, per_num, True))
# 有美购的日记
count = Diary.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, service_id__isnull=False).count()
cnt = int(math.ceil(count/per_num))
for _ in range(cnt):
args_list.append((queue, per_num, False))
db.connections.close_all()
pool = Pool(processes=4)
for args in args_list:
pool.apply(update_diary_title, args)
pool.close()
pool.join()
end_time = time.time()
print("end at: ", end_time)
print('total use {} s.'.format(end_time - start_time))
print('Done!')