Commit f59f03ad authored by ibuler's avatar ibuler

[Update] 支持使用环境变量关闭定时任务

parent fe030111
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
import json import json
import re import re
import os
from celery import shared_task from celery import shared_task
from django.core.cache import cache from django.core.cache import cache
...@@ -20,6 +21,7 @@ TIMEOUT = 60 ...@@ -20,6 +21,7 @@ TIMEOUT = 60
logger = get_logger(__file__) logger = get_logger(__file__)
CACHE_MAX_TIME = 60*60*60 CACHE_MAX_TIME = 60*60*60
disk_pattern = re.compile(r'^hd|sd|xvd|vd') disk_pattern = re.compile(r'^hd|sd|xvd|vd')
PERIOD_TASK = os.environ.get("PERIOD_TASK", "on")
@shared_task @shared_task
...@@ -118,6 +120,10 @@ def update_assets_hardware_info_period(): ...@@ -118,6 +120,10 @@ def update_assets_hardware_info_period():
Update asset hardware period task Update asset hardware period task
:return: :return:
""" """
if PERIOD_TASK != "on":
logger.debug("Period task disabled, update assets hardware info pass")
return
from ops.utils import update_or_create_ansible_task from ops.utils import update_or_create_ansible_task
task_name = _("Update assets hardware info period") task_name = _("Update assets hardware info period")
hostname_list = [ hostname_list = [
...@@ -190,6 +196,10 @@ def test_admin_user_connectability_period(): ...@@ -190,6 +196,10 @@ def test_admin_user_connectability_period():
""" """
A period task that update the ansible task period A period task that update the ansible task period
""" """
if PERIOD_TASK != "on":
logger.debug("Period task disabled, test admin user connectability pass")
return
admin_users = AdminUser.objects.all() admin_users = AdminUser.objects.all()
for admin_user in admin_users: for admin_user in admin_users:
task_name = _("Test admin user connectability period: {}".format(admin_user.name)) task_name = _("Test admin user connectability period: {}".format(admin_user.name))
...@@ -287,6 +297,10 @@ def test_system_user_connectability_manual(system_user): ...@@ -287,6 +297,10 @@ def test_system_user_connectability_manual(system_user):
@after_app_ready_start @after_app_ready_start
@after_app_shutdown_clean @after_app_shutdown_clean
def test_system_user_connectability_period(): def test_system_user_connectability_period():
if PERIOD_TASK != "on":
logger.debug("Period task disabled, test system user connectability pass")
return
system_users = SystemUser.objects.all() system_users = SystemUser.objects.all()
for system_user in system_users: for system_user in system_users:
task_name = _("test system user connectability period: {}".format(system_user)) task_name = _("test system user connectability period: {}".format(system_user))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment