# coding=utf-8 """更新日记本title。 tag 获取逻辑 """ import math import time from multiprocessing import Pool, Manager from django.core.management.base import BaseCommand from django.db.models import Q, Max from django import db from django.conf import settings from gm_types.gaia import TAG_TYPE from talos.models.diary import Diary from utils.rpc import rpc_client def update_diary_title(queue, limit, service_id_isnull=True): """更新日记本标题。""" start_id = queue.get() print("has_service_id:{0}, start_id: {1}".format(service_id_isnull, start_id)) query = Q(pk__gt=start_id, is_online=True, service_id__isnull=service_id_isnull) diaries = Diary.objects.filter(query)[: limit] if not diaries: return max_id = diaries.aggregate(max_id=Max('id')) queue.put(max_id["max_id"]) print("queue put: ", max_id["max_id"]) service_tags = {} if not service_id_isnull: service_ids = list(set(diaries.values_list("service_id", flat=True))) service_tags = rpc_client['api/services/tags/by_service_ids'](service_ids=service_ids).unwrap() for diary in diaries: name = '' if diary.service_id: # 获取美购tag第一个id name = service_tags.get(str(diary.service_id)) # 获取日记本tag第一个id if not name: tags = list(filter(lambda i: i["type"] in [TAG_TYPE.BODY_PART, TAG_TYPE.BODY_PART_SUB_ITEM, TAG_TYPE.ITEM_WIKI], diary.tags_new_era)) tags.sort(key=lambda x: x['tag_id']) if tags: name = tags[0]["name"] if name and diary.title != name + '日记': diary.title = name + "日记" diary.save() class Command(BaseCommand): def handle(self, *args, **options): """更新 api_diary 中的title""" print('------ starting -----') start_time = time.time() print("start at: ", start_time) queue = Manager().Queue(maxsize=8) queue.put(0) # 触发程序开始 # 没有美购的日记 args_list = [] per_num = 500 count = Diary.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, service_id__isnull=True).count() cnt = int(math.ceil(count/per_num)) for _ in range(cnt): args_list.append((queue, per_num, True)) # 有美购的日记 count = Diary.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, service_id__isnull=False).count() cnt = int(math.ceil(count/per_num)) for _ in range(cnt): args_list.append((queue, per_num, False)) db.connections.close_all() pool = Pool(processes=4) for args in args_list: pool.apply(update_diary_title, args) pool.close() pool.join() end_time = time.time() print("end at: ", end_time) print('total use {} s.'.format(end_time - start_time)) print('Done!')