Commit 30efec1b authored by ibuler's avatar ibuler

[Update] 修改 task 运行机制

parent 4893c466
...@@ -26,3 +26,6 @@ jumpserver.iml ...@@ -26,3 +26,6 @@ jumpserver.iml
tmp/* tmp/*
sessions/* sessions/*
media media
celerybeat.pid
django.db
celerybeat-schedule.db
...@@ -10,6 +10,7 @@ from common.utils import get_object_or_none, capacity_convert, \ ...@@ -10,6 +10,7 @@ from common.utils import get_object_or_none, capacity_convert, \
sum_capacity, encrypt_password, get_logger sum_capacity, encrypt_password, get_logger
from .models import SystemUser, AdminUser, Asset from .models import SystemUser, AdminUser, Asset
from . import const from . import const
from ops.decorators import register_as_period_task
FORKS = 10 FORKS = 10
...@@ -71,12 +72,12 @@ def update_assets_hardware_info(assets, task_name=None): ...@@ -71,12 +72,12 @@ def update_assets_hardware_info(assets, task_name=None):
:param task_name: task_name running :param task_name: task_name running
:return: result summary ['contacted': {}, 'dark': {}] :return: result summary ['contacted': {}, 'dark': {}]
""" """
from ops.utils import create_or_update_task from ops.utils import create_or_update_ansible_task
if task_name is None: if task_name is None:
task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME
tasks = const.UPDATE_ASSETS_HARDWARE_TASKS tasks = const.UPDATE_ASSETS_HARDWARE_TASKS
hostname_list = [asset.hostname for asset in assets] hostname_list = [asset.hostname for asset in assets]
task = create_or_update_task( task = create_or_update_ansible_task(
task_name, hosts=hostname_list, tasks=tasks, pattern='all', task_name, hosts=hostname_list, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
) )
...@@ -88,11 +89,13 @@ def update_assets_hardware_info(assets, task_name=None): ...@@ -88,11 +89,13 @@ def update_assets_hardware_info(assets, task_name=None):
@shared_task @shared_task
@register_as_period_task(interval=60*60*60*24)
def update_assets_hardware_period(): def update_assets_hardware_period():
""" """
Update asset hardware period task Update asset hardware period task
:return: :return:
""" """
from ops.utils import create_or_update_ansible_task
task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME
if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1:
msg = "Task {} is running or before long, passed this time".format( msg = "Task {} is running or before long, passed this time".format(
...@@ -115,7 +118,7 @@ def test_admin_user_connectability(admin_user, force=False): ...@@ -115,7 +118,7 @@ def test_admin_user_connectability(admin_user, force=False):
:param force: Force update :param force: Force update
:return: :return:
""" """
from ops.utils import create_or_update_task from ops.utils import create_or_update_ansible_task
task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name) task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name)
lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name) lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name)
...@@ -127,7 +130,7 @@ def test_admin_user_connectability(admin_user, force=False): ...@@ -127,7 +130,7 @@ def test_admin_user_connectability(admin_user, force=False):
assets = admin_user.get_related_assets() assets = admin_user.get_related_assets()
hosts = [asset.hostname for asset in assets] hosts = [asset.hostname for asset in assets]
tasks = const.TEST_ADMIN_USER_CONN_TASKS tasks = const.TEST_ADMIN_USER_CONN_TASKS
task = create_or_update_task( task = create_or_update_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
) )
...@@ -166,12 +169,12 @@ def test_admin_user_connectability_period(): ...@@ -166,12 +169,12 @@ def test_admin_user_connectability_period():
@shared_task @shared_task
def test_admin_user_connectability_manual(asset, task_name=None): def test_admin_user_connectability_manual(asset, task_name=None):
from ops.utils import create_or_update_task from ops.utils import create_or_update_ansible_task
if task_name is None: if task_name is None:
task_name = const.TEST_ASSET_CONN_TASK_NAME task_name = const.TEST_ASSET_CONN_TASK_NAME
hosts = [asset.hostname] hosts = [asset.hostname]
tasks = const.TEST_ADMIN_USER_CONN_TASKS tasks = const.TEST_ADMIN_USER_CONN_TASKS
task = create_or_update_task( task = create_or_update_ansible_task(
task_name, tasks=tasks, hosts=hosts, run_as_admin=True, task_name, tasks=tasks, hosts=hosts, run_as_admin=True,
created_by='System', options=const.TASK_OPTIONS, pattern='all', created_by='System', options=const.TASK_OPTIONS, pattern='all',
) )
...@@ -193,7 +196,7 @@ def test_system_user_connectability(system_user, force=False): ...@@ -193,7 +196,7 @@ def test_system_user_connectability(system_user, force=False):
:param force :param force
:return: :return:
""" """
from ops.utils import create_or_update_task from ops.utils import create_or_update_ansible_task
lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name) lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name)
task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME.format(system_user.name) task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME.format(system_user.name)
if cache.get(lock_key, 0) == 1 and not force: if cache.get(lock_key, 0) == 1 and not force:
...@@ -202,7 +205,7 @@ def test_system_user_connectability(system_user, force=False): ...@@ -202,7 +205,7 @@ def test_system_user_connectability(system_user, force=False):
assets = system_user.get_clusters_assets() assets = system_user.get_clusters_assets()
hosts = [asset.hostname for asset in assets] hosts = [asset.hostname for asset in assets]
tasks = const.TEST_SYSTEM_USER_CONN_TASKS tasks = const.TEST_SYSTEM_USER_CONN_TASKS
task = create_or_update_task( task = create_or_update_ansible_task(
task_name, hosts=hosts, tasks=tasks, pattern='all', task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, options=const.TASK_OPTIONS,
run_as=system_user.name, created_by="System", run_as=system_user.name, created_by="System",
...@@ -269,7 +272,7 @@ def get_push_system_user_tasks(system_user): ...@@ -269,7 +272,7 @@ def get_push_system_user_tasks(system_user):
@shared_task @shared_task
def push_system_user(system_user, assets, task_name=None): def push_system_user(system_user, assets, task_name=None):
from ops.utils import create_or_update_task from ops.utils import create_or_update_ansible_task
if system_user.auto_push and assets: if system_user.auto_push and assets:
if task_name is None: if task_name is None:
...@@ -278,7 +281,7 @@ def push_system_user(system_user, assets, task_name=None): ...@@ -278,7 +281,7 @@ def push_system_user(system_user, assets, task_name=None):
hosts = [asset.hostname for asset in assets] hosts = [asset.hostname for asset in assets]
tasks = get_push_system_user_tasks(system_user) tasks = get_push_system_user_tasks(system_user)
task = create_or_update_task( task = create_or_update_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
) )
...@@ -334,7 +337,7 @@ def push_system_user_period(): ...@@ -334,7 +337,7 @@ def push_system_user_period():
@shared_task @shared_task
def push_asset_system_users(asset, system_users=None, task_name=None): def push_asset_system_users(asset, system_users=None, task_name=None):
from ops.utils import create_or_update_task from ops.utils import create_or_update_ansible_task
if task_name is None: if task_name is None:
task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname) task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname)
...@@ -348,7 +351,7 @@ def push_asset_system_users(asset, system_users=None, task_name=None): ...@@ -348,7 +351,7 @@ def push_asset_system_users(asset, system_users=None, task_name=None):
hosts = [asset.hostname] hosts = [asset.hostname]
task = create_or_update_task( task = create_or_update_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
) )
......
...@@ -13,10 +13,5 @@ app = Celery('jumpserver') ...@@ -13,10 +13,5 @@ app = Celery('jumpserver')
# Using a string here means the worker will not have to # Using a string here means the worker will not have to
# pickle the object when using Windows. # pickle the object when using Windows.
app.config_from_object('django.conf:settings') app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
app.conf.update(
CELERYBEAT_SCHEDULE={
}
)
...@@ -130,7 +130,6 @@ MESSAGE_STORAGE = 'django.contrib.messages.storage.cookie.CookieStorage' ...@@ -130,7 +130,6 @@ MESSAGE_STORAGE = 'django.contrib.messages.storage.cookie.CookieStorage'
# } # }
# } # }
print(CONFIG.DB_ENGINE)
DATABASES = { DATABASES = {
'default': { 'default': {
'ENGINE': 'django.db.backends.{}'.format(CONFIG.DB_ENGINE), 'ENGINE': 'django.db.backends.{}'.format(CONFIG.DB_ENGINE),
...@@ -243,7 +242,8 @@ LOGGING = { ...@@ -243,7 +242,8 @@ LOGGING = {
# https://docs.djangoproject.com/en/1.10/topics/i18n/ # https://docs.djangoproject.com/en/1.10/topics/i18n/
LANGUAGE_CODE = 'en-us' LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai' TIME_ZONE = 'UTC'
# TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True USE_I18N = True
...@@ -258,6 +258,8 @@ LOCALE_PATHS = [os.path.join(BASE_DIR, 'locale'), ] ...@@ -258,6 +258,8 @@ LOCALE_PATHS = [os.path.join(BASE_DIR, 'locale'), ]
# https://docs.djangoproject.com/en/1.10/howto/static-files/ # https://docs.djangoproject.com/en/1.10/howto/static-files/
STATIC_URL = '/static/' STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, "static")
STATICFILES_DIRS = ( STATICFILES_DIRS = (
os.path.join(BASE_DIR, "static"), os.path.join(BASE_DIR, "static"),
...@@ -323,18 +325,18 @@ if CONFIG.AUTH_LDAP: ...@@ -323,18 +325,18 @@ if CONFIG.AUTH_LDAP:
AUTH_LDAP_USER_ATTR_MAP = CONFIG.AUTH_LDAP_USER_ATTR_MAP AUTH_LDAP_USER_ATTR_MAP = CONFIG.AUTH_LDAP_USER_ATTR_MAP
# Celery using redis as broker # Celery using redis as broker
BROKER_URL = 'redis://:%(password)s@%(host)s:%(port)s/3' % { CELERY_BROKER_URL = 'redis://:%(password)s@%(host)s:%(port)s/3' % {
'password': CONFIG.REDIS_PASSWORD if CONFIG.REDIS_PASSWORD else '', 'password': CONFIG.REDIS_PASSWORD if CONFIG.REDIS_PASSWORD else '',
'host': CONFIG.REDIS_HOST or '127.0.0.1', 'host': CONFIG.REDIS_HOST or '127.0.0.1',
'port': CONFIG.REDIS_PORT or 6379, 'port': CONFIG.REDIS_PORT or 6379,
} }
CELERY_TASK_SERIALIZER = 'pickle' CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_RESULT_BACKEND = BROKER_URL CELERY_RESULT_BACKEND = CELERY_BROKER_URL
CELERY_ACCEPT_CONTENT = ['json', 'pickle'] CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_RESULT_EXPIRES = 3600 CELERY_RESULT_EXPIRES = 3600
CELERYD_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
CELERYD_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
CELERY_TIMEZONE = TIME_ZONE CELERY_TIMEZONE = TIME_ZONE
# TERMINAL_HEATBEAT_INTERVAL = CONFIG.TERMINAL_HEATBEAT_INTERVAL or 30 # TERMINAL_HEATBEAT_INTERVAL = CONFIG.TERMINAL_HEATBEAT_INTERVAL or 30
......
...@@ -4,6 +4,7 @@ from __future__ import unicode_literals ...@@ -4,6 +4,7 @@ from __future__ import unicode_literals
from django.conf.urls import url, include from django.conf.urls import url, include
from django.conf import settings from django.conf import settings
from django.conf.urls.static import static from django.conf.urls.static import static
from django.views.static import serve as static_serve
from rest_framework.schemas import get_schema_view from rest_framework.schemas import get_schema_view
from rest_framework_swagger.renderers import SwaggerUIRenderer, OpenAPIRenderer from rest_framework_swagger.renderers import SwaggerUIRenderer, OpenAPIRenderer
...@@ -33,8 +34,8 @@ urlpatterns = [ ...@@ -33,8 +34,8 @@ urlpatterns = [
if settings.DEBUG: if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
urlpatterns += [ urlpatterns += [
url(r'^docs/', schema_view, name="docs"), url(r'^docs/', schema_view, name="docs"),
] ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) \
+ static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
...@@ -28,6 +28,7 @@ class AdHocResultCallback(CallbackModule): ...@@ -28,6 +28,7 @@ class AdHocResultCallback(CallbackModule):
host = res._host.get_name() host = res._host.get_name()
task_name = res.task_name task_name = res.task_name
task_result = res._result task_result = res._result
print(task_result)
if self.results_raw[t].get(host): if self.results_raw[t].get(host):
self.results_raw[t][host][task_name] = task_result self.results_raw[t][host][task_name] = task_result
...@@ -50,6 +51,7 @@ class AdHocResultCallback(CallbackModule): ...@@ -50,6 +51,7 @@ class AdHocResultCallback(CallbackModule):
contacted.remove(host) contacted.remove(host)
def v2_runner_on_failed(self, result, ignore_errors=False): def v2_runner_on_failed(self, result, ignore_errors=False):
print("#######RUN FAILED" * 19)
self.gather_result("failed", result) self.gather_result("failed", result)
super().v2_runner_on_failed(result, ignore_errors=ignore_errors) super().v2_runner_on_failed(result, ignore_errors=ignore_errors)
......
# -*- coding: utf-8 -*-
#
from functools import wraps
TASK_PREFIX = "TOOT"
CALLBACK_PREFIX = "COC"
def register_as_period_task(crontab=None, interval=None):
"""
:param crontab: "* * * * *"
:param interval: 60*60*60
:return:
"""
from .utils import create_or_update_celery_periodic_tasks
if crontab is None and interval is None:
raise SyntaxError("Must set crontab or interval one")
def decorate(func):
@wraps(func)
def wrapper(*args, **kwargs):
tasks = {
func.__name__: {
'task': func.__name__,
'args': args,
'kwargs': kwargs,
'interval': interval,
'crontab': crontab,
'enabled': True,
}
}
create_or_update_celery_periodic_tasks(tasks)
return func(*args, **kwargs)
return wrapper
return decorate
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
import logging
import json import json
import uuid import uuid
...@@ -8,16 +7,16 @@ import time ...@@ -8,16 +7,16 @@ import time
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core import serializers
from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask
from common.utils import signer from common.utils import signer, get_logger
from .ansible import AdHocRunner, AnsibleError from .ansible import AdHocRunner, AnsibleError
from .inventory import JMSInventory
__all__ = ["Task", "AdHoc", "AdHocRunHistory"] __all__ = ["Task", "AdHoc", "AdHocRunHistory"]
logger = logging.getLogger(__name__) logger = get_logger(__file__)
class Task(models.Model): class Task(models.Model):
...@@ -27,15 +26,10 @@ class Task(models.Model): ...@@ -27,15 +26,10 @@ class Task(models.Model):
""" """
id = models.UUIDField(default=uuid.uuid4, primary_key=True) id = models.UUIDField(default=uuid.uuid4, primary_key=True)
name = models.CharField(max_length=128, unique=True, verbose_name=_('Name')) name = models.CharField(max_length=128, unique=True, verbose_name=_('Name'))
interval = models.ForeignKey( interval = models.IntegerField(verbose_name=_("Interval"), null=True, blank=True, help_text=_("Units: seconds"))
IntervalSchedule, on_delete=models.CASCADE, crontab = models.CharField(verbose_name=_("Crontab"), null=True, blank=True, max_length=128, help_text=_("5 * * * *"))
null=True, blank=True, verbose_name=_('Interval'),
)
crontab = models.ForeignKey(
CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True,
verbose_name=_('Crontab'), help_text=_('Use one of Interval/Crontab'),
)
is_periodic = models.BooleanField(default=False) is_periodic = models.BooleanField(default=False)
callback = models.CharField(max_length=128, blank=True, null=True, verbose_name=_("Callback")) # Callback must be a registered celery task
is_deleted = models.BooleanField(default=False) is_deleted = models.BooleanField(default=False)
comment = models.TextField(blank=True, verbose_name=_("Comment")) comment = models.TextField(blank=True, verbose_name=_("Comment"))
created_by = models.CharField(max_length=128, blank=True, null=True, default='') created_by = models.CharField(max_length=128, blank=True, null=True, default='')
...@@ -88,23 +82,48 @@ class Task(models.Model): ...@@ -88,23 +82,48 @@ class Task(models.Model):
def save(self, force_insert=False, force_update=False, using=None, def save(self, force_insert=False, force_update=False, using=None,
update_fields=None): update_fields=None):
instance = super().save( from .utils import create_or_update_celery_periodic_tasks, \
disable_celery_periodic_task
from .tasks import run_ansible_task
super().save(
force_insert=force_insert, force_update=force_update, force_insert=force_insert, force_update=force_update,
using=using, update_fields=update_fields, using=using, update_fields=update_fields,
) )
if instance.is_periodic: if self.is_periodic:
PeriodicTask.objects.update_or_create( interval = None
interval=instance.interval, crontab = None
crontab=instance.crontab,
name=self.name, if self.interval:
task='ops.run_task', interval = self.interval
args=serializers.serialize('json', [instance]), elif self.crontab:
) crontab = self.crontab
tasks = {
self.name: {
"task": run_ansible_task.name,
"interval": interval,
"crontab": crontab,
"args": (str(self.id),),
"kwargs": {"callback": self.callback},
"enabled": True,
}
}
create_or_update_celery_periodic_tasks(tasks)
else: else:
PeriodicTask.objects.filter(name=self.name).delete() disable_celery_periodic_task(self.name)
def delete(self, using=None, keep_parents=False):
from .utils import delete_celery_periodic_task
super().delete(using=using, keep_parents=keep_parents)
delete_celery_periodic_task(self.name)
return instance @property
def schedule(self):
try:
return PeriodicTask.objects.get(name=self.name)
except PeriodicTask.DoesNotExist:
return None
def __str__(self): def __str__(self):
return self.name return self.name
...@@ -156,6 +175,23 @@ class AdHoc(models.Model): ...@@ -156,6 +175,23 @@ class AdHoc(models.Model):
def hosts(self, item): def hosts(self, item):
self._hosts = json.dumps(item) self._hosts = json.dumps(item)
@property
def inventory(self):
if self.become:
become_info = {
'become': {
self.become
}
}
else:
become_info = None
inventory = JMSInventory(
self.hosts, run_as_admin=self.run_as_admin,
run_as=self.run_as, become_info=become_info
)
return inventory
@property @property
def become(self): def become(self):
if self._become: if self._become:
...@@ -173,30 +209,30 @@ class AdHoc(models.Model): ...@@ -173,30 +209,30 @@ class AdHoc(models.Model):
history = AdHocRunHistory(adhoc=self, task=self.task) history = AdHocRunHistory(adhoc=self, task=self.task)
time_start = time.time() time_start = time.time()
try: try:
result = self._run_only() raw, summary = self._run_only()
history.is_finished = True history.is_finished = True
if result.results_summary.get('dark'): if summary.get('dark'):
history.is_success = False history.is_success = False
else: else:
history.is_success = True history.is_success = True
history.result = result.results_raw history.result = raw
history.summary = result.results_summary history.summary = summary
return result return raw, summary
except:
return {}, {}
finally: finally:
history.date_finished = timezone.now() history.date_finished = timezone.now()
history.timedelta = time.time() - time_start history.timedelta = time.time() - time_start
history.save() history.save()
def _run_only(self): def _run_only(self):
from .utils import get_adhoc_inventory runner = AdHocRunner(self.inventory)
inventory = get_adhoc_inventory(self)
runner = AdHocRunner(inventory)
for k, v in self.options.items(): for k, v in self.options.items():
runner.set_option(k, v) runner.set_option(k, v)
try: try:
result = runner.run(self.tasks, self.pattern, self.task.name) result = runner.run(self.tasks, self.pattern, self.task.name)
return result return result.results_raw, result.results_summary
except AnsibleError as e: except AnsibleError as e:
logger.error("Failed run adhoc {}, {}".format(self.task.name, e)) logger.error("Failed run adhoc {}, {}".format(self.task.name, e))
......
# coding: utf-8 # coding: utf-8
from celery import shared_task from celery import shared_task, subtask
from django.core import serializers
from common.utils import get_logger, get_object_or_none
from .models import Task
logger = get_logger(__file__)
def rerun_task(): def rerun_task():
...@@ -8,6 +12,31 @@ def rerun_task(): ...@@ -8,6 +12,31 @@ def rerun_task():
@shared_task @shared_task
def run_task(tasks_json): def run_ansible_task(task_id, callback=None, **kwargs):
for task in serializers.deserialize('json', tasks_json): """
task.object.run() :param task_id: is the tasks serialized data
:param callback: callback function name
:return:
"""
task = get_object_or_none(Task, id=task_id)
if task:
result = task.object.run()
if callback is not None:
subtask(callback).delay(result)
return result
else:
logger.error("No task found")
@shared_task
def hello(name, callback=None):
print("Hello {}".format(name))
if callback is not None:
subtask(callback).delay("Guahongwei")
@shared_task
def hello_callback(result):
print(result)
print("Hello callback")
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
import json
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import time
from django.utils import timezone
from django.db import transaction
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from common.utils import get_logger, get_object_or_none, get_short_uuid_str from common.utils import get_logger, get_object_or_none
from .ansible import AdHocRunner, CommandResultCallback from .models import Task, AdHoc
from .inventory import JMSInventory
from .ansible.exceptions import AnsibleError
from .models import AdHocRunHistory, Task, AdHoc
logger = get_logger(__file__) logger = get_logger(__file__)
def record_adhoc(func): def get_task_by_id(task_id):
def _deco(adhoc, **options): return get_object_or_none(Task, id=task_id)
record = AdHocRunHistory(adhoc=adhoc, task=adhoc.task)
time_start = time.time()
try:
result = func(adhoc, **options)
record.is_finished = True
if result.results_summary.get('dark'):
record.is_success = False
else:
record.is_success = True
record.result = result.results_raw
record.summary = result.results_summary
return result
finally:
record.date_finished = timezone.now()
record.timedelta = time.time() - time_start
record.save()
return _deco
def get_adhoc_inventory(adhoc):
if adhoc.become:
become_info = {
'become': {
adhoc.become
}
}
else:
become_info = None
inventory = JMSInventory(
adhoc.hosts, run_as_admin=adhoc.run_as_admin,
run_as=adhoc.run_as, become_info=become_info
)
return inventory
def get_inventory(hostname_list, run_as_admin=False, run_as=None, become_info=None):
return JMSInventory(
hostname_list, run_as_admin=run_as_admin,
run_as=run_as, become_info=become_info
)
def get_adhoc_runner(hostname_list, run_as_admin=False, run_as=None, become_info=None):
inventory = get_inventory(
hostname_list, run_as_admin=run_as_admin,
run_as=run_as, become_info=become_info
)
runner = AdHocRunner(inventory)
return runner
@record_adhoc
def run_adhoc_object(adhoc, **options):
"""
:param adhoc: Instance of AdHoc
:param options: ansible support option, like forks ...
:return:
"""
name = adhoc.task.name
inventory = get_adhoc_inventory(adhoc)
runner = AdHocRunner(inventory)
for k, v in options.items():
runner.set_option(k, v)
try:
result = runner.run(adhoc.tasks, adhoc.pattern, name)
return result
except AnsibleError as e:
logger.error("Failed run adhoc {}, {}".format(name, e))
raise
def run_adhoc(hostname_list, pattern, tasks, name=None,
run_as_admin=False, run_as=None, become_info=None):
if name is None:
name = "Adhoc-task-{}-{}".format(
get_short_uuid_str(),
timezone.now().strftime("%Y-%m-%d %H:%M:%S"),
)
inventory = get_inventory(
hostname_list, run_as_admin=run_as_admin,
run_as=run_as, become_info=become_info
)
runner = AdHocRunner(inventory)
return runner.run(tasks, pattern, play_name=name)
def create_or_update_task( def create_or_update_ansible_task(
task_name, hosts, tasks, pattern='all', options=None, task_name, hosts, tasks, pattern='all', options=None,
run_as_admin=False, run_as="", become_info=None, run_as_admin=False, run_as="", become_info=None,
created_by=None created_by=None, interval=None, crontab=None,
is_periodic=False, callback=None,
): ):
print(options)
print(task_name)
task = get_object_or_none(Task, name=task_name) task = get_object_or_none(Task, name=task_name)
if task is None: if task is None:
task = Task(name=task_name, created_by=created_by) task = Task(
name=task_name, interval=interval,
crontab=crontab, is_periodic=is_periodic,
callback=callback, created_by=created_by
)
task.save() task.save()
adhoc = task.get_latest_adhoc() adhoc = task.latest_adhoc
new_adhoc = AdHoc(task=task, pattern=pattern, new_adhoc = AdHoc(task=task, pattern=pattern,
run_as_admin=run_as_admin, run_as_admin=run_as_admin,
run_as=run_as) run_as=run_as)
...@@ -128,23 +41,67 @@ def create_or_update_task( ...@@ -128,23 +41,67 @@ def create_or_update_task(
if not adhoc or adhoc != new_adhoc: if not adhoc or adhoc != new_adhoc:
new_adhoc.save() new_adhoc.save()
task.latest_adhoc = new_adhoc task.latest_adhoc = new_adhoc
print("Return task")
return task return task
def create_periodic_tasks(tasks): def create_or_update_celery_periodic_tasks(tasks):
"""
:param tasks: {
'add-every-monday-morning': {
'task': 'tasks.add' # A registered celery task,
'interval': 30,
'crontab': "30 7 * * *",
'args': (16, 16),
'kwargs': {},
'enabled': False,
},
}
:return:
"""
# Todo: check task valid, task and callback must be a celery task
for name, detail in tasks.items(): for name, detail in tasks.items():
schedule, _ = IntervalSchedule.objects.get_or_create( interval = None
every=detail['schedule'], crontab = None
if isinstance(detail.get("interval"), int):
interval, _ = IntervalSchedule.objects.get_or_create(
every=detail['interval'],
period=IntervalSchedule.SECONDS, period=IntervalSchedule.SECONDS,
) )
elif isinstance(detail.get("crontab"), str):
try:
minute, hour, day, month, week = detail["crontab"].split()
except ValueError:
raise SyntaxError("crontab is not valid")
task = PeriodicTask.objects.create( crontab, _ = CrontabSchedule.objects.get_or_create(
interval=schedule, minute=minute, hour=hour, day_of_week=week,
day_of_month=day, month_of_year=month,
)
else:
raise SyntaxError("Schedule is not valid")
defaults = dict(
interval=interval,
crontab=crontab,
name=name, name=name,
task=detail['task'], task=detail['task'],
args=json.dumps(detail.get('args', [])), args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})), kwargs=json.dumps(detail.get('kwargs', {})),
enabled=detail['enabled']
) )
print("Create periodic task: {}".format(task))
task = PeriodicTask.objects.update_or_create(
defaults=defaults, name=name,
)
logger.info("Create periodic task: {}".format(task))
return task
def disable_celery_periodic_task(task_name):
PeriodicTask.objects.filter(name=task_name).update(enabled=False)
def delete_celery_periodic_task(task_name):
PeriodicTask.objects.filter(name=task_name).delete()
...@@ -3,7 +3,7 @@ ansible==2.4.2.0 ...@@ -3,7 +3,7 @@ ansible==2.4.2.0
asn1crypto==0.24.0 asn1crypto==0.24.0
bcrypt==3.1.4 bcrypt==3.1.4
billiard==3.5.0.3 billiard==3.5.0.3
celery==4.0.2 celery==4.1.0
certifi==2017.11.5 certifi==2017.11.5
cffi==1.11.2 cffi==1.11.2
chardet==3.0.4 chardet==3.0.4
......
...@@ -17,6 +17,7 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) ...@@ -17,6 +17,7 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__))
APPS_DIR = os.path.join(BASE_DIR, 'apps') APPS_DIR = os.path.join(BASE_DIR, 'apps')
HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1' HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1'
HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080
DEBUG = CONFIG.DEBUG
LOG_LEVEL = CONFIG.LOG_LEVEL LOG_LEVEL = CONFIG.LOG_LEVEL
WORKERS = 4 WORKERS = 4
...@@ -25,13 +26,16 @@ def start_gunicorn(): ...@@ -25,13 +26,16 @@ def start_gunicorn():
print("- Start Gunicorn WSGI HTTP Server") print("- Start Gunicorn WSGI HTTP Server")
os.chdir(APPS_DIR) os.chdir(APPS_DIR)
cmd = "gunicorn jumpserver.wsgi -b {}:{} -w {}".format(HTTP_HOST, HTTP_PORT, WORKERS) cmd = "gunicorn jumpserver.wsgi -b {}:{} -w {}".format(HTTP_HOST, HTTP_PORT, WORKERS)
if DEBUG:
cmd += " --reload"
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
def start_celery(): def start_celery():
print("- Start Celery as Distributed Task Queue") print("- Start Celery as Distributed Task Queue")
os.chdir(APPS_DIR) os.chdir(APPS_DIR)
# os.environ.setdefault('PYTHONOPTIMIZE', '1') # Todo: Must set this environment, if not no ansible result return
os.environ.setdefault('PYTHONOPTIMIZE', '1')
cmd = 'celery -A common worker -l {}'.format(LOG_LEVEL.lower()) cmd = 'celery -A common worker -l {}'.format(LOG_LEVEL.lower())
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
...@@ -39,9 +43,9 @@ def start_celery(): ...@@ -39,9 +43,9 @@ def start_celery():
def start_beat(): def start_beat():
print("- Start Beat as Periodic Task Scheduler") print("- Start Beat as Periodic Task Scheduler")
os.chdir(APPS_DIR) os.chdir(APPS_DIR)
# os.environ.setdefault('PYTHONOPTIMIZE', '1') os.environ.setdefault('PYTHONOPTIMIZE', '1')
schduler = "django_celery_beat.schedulers:DatabaseScheduler" scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
cmd = 'celery -A common beat -l {} --scheduler {}'.format(LOG_LEVEL, schduler) cmd = 'celery -A common beat -l {} --scheduler {} --max-interval 30 '.format(LOG_LEVEL, scheduler)
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment