#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import unicode_literals, print_function, absolute_import import itertools from django.conf import settings class CeleryTaskRouter(object): # Map[QueueName, List[TaskName]] queue_task_map = { 'communal': [ 'communal.tasks.push_task.push_control_task', 'communal.tasks.push_task.aggregation_push_trigger_task', 'communal.tasks.push_task.aggregation_push_timing_task', # 'communal.tasks.irrigation_task.irrigation_tasks', 'qa.tasks.get_ctr_device_push_question.get_push_questions_for_device', 'qa.tasks.get_ctr_device_push_question.update_answers_count_of_question', 'qa.tasks.get_ctr_device_push_question.record_push_content_of_ctr_device', ], 'live': [ 'live.tasks.check_live_status', 'live.tasks.add_live_robot_user_enter', 'live.tasks.add_live_robot_comment', 'live.tasks.add_live_robot', ] } # Map[TaskName, QueueName] task_queue_map = dict(itertools.chain.from_iterable( [(task, queue) for task in task_list] for (queue, task_list) in queue_task_map.items() )) def route_for_task(self, task, args=None, kwargs=None): if settings.DEBUG and ("live.tasks" not in task): return None queue_name_or_none = self.task_queue_map.get(task) return queue_name_or_none