#!/usr/bin/env python # -*- coding: utf-8 -*- import queue import threading from django.core.management import BaseCommand from qa.models import Answer from utils.rpc import rpc_client num_worker_threads = 3 def _async_func(user_id, info): """ 通过 user_id,筛选出对应的回答,同步用户的医生id及职称 对应接口 api/doctor/iterator_for_inner doctors 字段勿删! :param user_id: :param info: {"doctor_id": "", "doctor_title": "" } :return: """ Answer.objects.filter(user=int(user_id), is_online=True).update(**info) def worker(): while True: item = q.get() if not item: break try: _async_func(item[0], item[1]) except Exception as e: with open("/tmp/update_answer_doctor_info.log", "a+") as f: f.write("update failure! user_id:{0}\n".format(item[0])) continue q.task_done() q = queue.Queue(maxsize=500) thread_list = [] for _ in range(num_worker_threads): t = threading.Thread(target=worker) t.start() thread_list.append(t) class Command(BaseCommand): def handle(self, *args, **kwargs): print("start") start_num = 0 limit = 1000 while True: doctors = rpc_client["api/doctor/iterator_for_inner"](start_num=start_num, count=limit).unwrap() start_num += limit doctors = doctors.get("doctors", {}) if not doctors: break for user_id, info in doctors.items(): q.put((user_id, info)) q.join() for _ in range(num_worker_threads): q.put(None) for t in thread_list: t.join() print("end")