celery_task_router.py 1.44 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, absolute_import

import itertools

from django.conf import settings


class CeleryTaskRouter(object):
    # Map[QueueName, List[TaskName]]
    queue_task_map = {
        'communal': [
            'communal.tasks.push_task.push_control_task',
            'communal.tasks.push_task.aggregation_push_trigger_task',
            'communal.tasks.push_task.aggregation_push_timing_task',
            # 'communal.tasks.irrigation_task.irrigation_tasks',
            'qa.tasks.get_ctr_device_push_question.get_push_questions_for_device',
            'qa.tasks.get_ctr_device_push_question.update_answers_count_of_question',
            'qa.tasks.get_ctr_device_push_question.record_push_content_of_ctr_device',
        ],
        'live': [
            'live.tasks.check_live_status',
            'live.tasks.add_live_robot_user_enter',
            'live.tasks.add_live_robot_comment',
            'live.tasks.add_live_robot',
        ]
    }

    # Map[TaskName, QueueName]
    task_queue_map = dict(itertools.chain.from_iterable(
        [(task, queue) for task in task_list]
        for (queue, task_list) in queue_task_map.items()
    ))

    def route_for_task(self, task, args=None, kwargs=None):
        if settings.DEBUG and ("live.tasks" not in task):
            return None

        queue_name_or_none = self.task_queue_map.get(task)
        return queue_name_or_none