Commit 951ac252 authored by ibuler's avatar ibuler

[Merge] 更新ansible功能

parents e4c2affb 24c4e1df
......@@ -79,5 +79,5 @@ class AdminUserTestConnectiveApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
admin_user = self.get_object()
test_admin_user_connectability_manual.delay(admin_user)
return Response({"msg": "Task created"})
\ No newline at end of file
task = test_admin_user_connectability_manual.delay(admin_user)
return Response({"task": task.id})
......@@ -87,12 +87,8 @@ class AssetRefreshHardwareApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
asset_id = kwargs.get('pk')
asset = get_object_or_404(Asset, pk=asset_id)
summary = update_asset_hardware_info_manual(asset)[1]
logger.debug("Refresh summary: {}".format(summary))
if summary.get('dark'):
return Response(summary['dark'].values(), status=501)
else:
return Response({"msg": "ok"})
task = update_asset_hardware_info_manual.delay(asset)
return Response({"task": task.id})
class AssetAdminUserTestApi(generics.RetrieveAPIView):
......@@ -105,8 +101,5 @@ class AssetAdminUserTestApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
asset_id = kwargs.get('pk')
asset = get_object_or_404(Asset, pk=asset_id)
ok, msg = test_asset_connectability_manual(asset)
if ok:
return Response({"msg": "pong"})
else:
return Response({"error": msg}, status=502)
\ No newline at end of file
task = test_asset_connectability_manual.delay(asset)
return Response({"task": task.id})
......@@ -130,10 +130,9 @@ class RefreshNodeHardwareInfoApi(APIView):
node_id = kwargs.get('pk')
node = get_object_or_404(self.model, id=node_id)
assets = node.assets.all()
# task_name = _("Refresh node assets hardware info: {}".format(node.name))
task_name = _("更新节点资产硬件信息: {}".format(node.name))
update_assets_hardware_info_util.delay(assets, task_name=task_name)
return Response({"msg": "Task created"})
task = update_assets_hardware_info_util.delay(assets, task_name=task_name)
return Response({"task": task.id})
class TestNodeConnectiveApi(APIView):
......@@ -145,6 +144,6 @@ class TestNodeConnectiveApi(APIView):
node = get_object_or_404(self.model, id=node_id)
assets = node.assets.all()
task_name = _("测试节点下资产是否可连接: {}".format(node.name))
test_asset_connectability_util.delay(assets, task_name=task_name)
return Response({"msg": "Task created"})
task = test_asset_connectability_util.delay(assets, task_name=task_name)
return Response({"task": task.id})
......@@ -58,8 +58,8 @@ class SystemUserPushApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
system_user = self.get_object()
push_system_user_to_assets_manual.delay(system_user)
return Response({"msg": "Task created"})
task = push_system_user_to_assets_manual.delay(system_user)
return Response({"task": task.id})
class SystemUserTestConnectiveApi(generics.RetrieveAPIView):
......@@ -71,5 +71,5 @@ class SystemUserTestConnectiveApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
system_user = self.get_object()
test_system_user_connectability_manual.delay(system_user)
return Response({"msg": "Task created"})
\ No newline at end of file
task = test_system_user_connectability_manual.delay(system_user)
return Response({"task": task.id})
......@@ -96,7 +96,7 @@ class Asset(models.Model):
return False, warning
def is_unixlike(self):
if self.platform not in ("Windows", "Other"):
if self.platform not in ("Windows",):
return True
else:
return False
......@@ -132,6 +132,15 @@ class Asset(models.Model):
info["gateways"] = [d.id for d in self.domain.gateway_set.all()]
return info
def get_auth_info(self):
if self.admin_user:
return {
'username': self.admin_user.username,
'password': self.admin_user.password,
'private_key': self.admin_user.private_key_file,
'become': self.admin_user.become_info,
}
def _to_secret_json(self):
"""
Ansible use it create inventory, First using asset user,
......@@ -175,4 +184,3 @@ class Asset(models.Model):
except IntegrityError:
print('Error continue')
continue
......@@ -3,15 +3,15 @@ import json
import re
import os
import paramiko
from celery import shared_task
from django.core.cache import cache
from django.utils.translation import ugettext as _
from common.utils import get_object_or_none, capacity_convert, \
sum_capacity, encrypt_password, get_logger
from common.celery import register_as_period_task, after_app_shutdown_clean, \
after_app_ready_start, app as celery_app
from ops.celery.utils import register_as_period_task, after_app_shutdown_clean, \
after_app_ready_start
from ops.celery import app as celery_app
from .models import SystemUser, AdminUser, Asset
from . import const
......@@ -215,7 +215,7 @@ def test_admin_user_connectability_period():
def test_admin_user_connectability_manual(admin_user):
# task_name = _("Test admin user connectability: {}").format(admin_user.name)
task_name = _("测试管理行号可连接性: {}").format(admin_user.name)
return test_admin_user_connectability_util.delay(admin_user, task_name)
return test_admin_user_connectability_util(admin_user, task_name)
@shared_task
......@@ -395,11 +395,12 @@ def get_node_push_system_user_task_name(system_user, node):
)
@shared_task
def push_system_user_to_node(system_user, node):
logger.info("Start push system user node: {} => {}".format(system_user.name, node.value))
assets = node.get_all_assets()
task_name = get_node_push_system_user_task_name(system_user, node)
push_system_user_util.delay([system_user], assets, task_name)
push_system_user_util([system_user], assets, task_name)
@shared_task
......
......@@ -121,14 +121,16 @@ $(document).ready(function () {
})
.on('click', '.btn-test-connective', function () {
var the_url = "{% url 'api-assets:admin-user-connective' pk=admin_user.id %}";
var error = function (data) {
alert(data)
var success = function (data) {
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
};
APIUpdateAttr({
url: the_url,
error: error,
method: 'GET',
success_message: "{% trans 'Task has been send, seen left asset status' %}"
success: success,
flash_message: false
});
})
</script>
......
......@@ -269,16 +269,15 @@ function updateAssetNodes(nodes) {
function refreshAssetHardware() {
var the_url = "{% url 'api-assets:asset-refresh' pk=asset.id %}";
var success = function (data) {
location.reload();
};
var error = function (data) {
alert(data)
var success = function(data) {
console.log(data);
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
};
APIUpdateAttr({
url: the_url,
success: success,
error: error,
method: 'GET'
});
}
......@@ -344,19 +343,20 @@ $(document).ready(function () {
var redirect_url = "{% url 'assets:asset-list' %}";
objectDelete($this, name, the_url, redirect_url);
}).on('click', '#btn_refresh_asset', function () {
alert('关闭alert, 等待完成, 自动刷新页面');
refreshAssetHardware()
}).on('click', '#btn-test-is-alive', function () {
var the_url = "{% url 'api-assets:asset-alive-test' pk=asset.id %}";
var error = function (data) {
alert(data)
var success = function(data) {
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
};
alert('关闭alert, 等待完成');
APIUpdateAttr({
url: the_url,
error: error,
method: 'GET',
success_message: "{% trans "Reachable" %}"
success: success
});
})
......
......@@ -497,14 +497,17 @@ $(document).ready(function(){
}
var the_url = url.replace("{{ DEFAULT_PK }}", current_node.id);
function success() {
function success(data) {
rMenu.css({"visibility" : "hidden"});
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
}
APIUpdateAttr({
url: the_url,
method: "GET",
success_message: "更新硬件信息任务下发成功",
success: success
success: success,
flash_message: false
});
})
......@@ -519,14 +522,17 @@ $(document).ready(function(){
}
var the_url = url.replace("{{ DEFAULT_PK }}", current_node.id);
function success() {
function success(data) {
rMenu.css({"visibility" : "hidden"});
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
}
APIUpdateAttr({
url: the_url,
method: "GET",
success_message: "测试可连接性任务下发成功",
success: success
success: success,
flash_message: false
});
})
.on('click', '.btn_asset_delete', function () {
......
......@@ -293,26 +293,30 @@ $(document).ready(function () {
})
.on('click', '.btn-push', function () {
var the_url = "{% url 'api-assets:system-user-push' pk=system_user.id %}";
var error = function (data) {
alert(data)
var success = function (data) {
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
};
APIUpdateAttr({
url: the_url,
error: error,
method: 'GET',
success_message: "{% trans "Task has been send, Go to ops task list seen result" %}"
success: success,
flash_message: false
});
})
.on('click', '.btn-test-connective', function () {
var the_url = "{% url 'api-assets:system-user-connective' pk=system_user.id %}";
var error = function (data) {
alert(data)
var success = function (data) {
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
};
APIUpdateAttr({
url: the_url,
error: error,
method: 'GET',
success_message: "{% trans "Task has been send, seen left assets status" %}"
success: success,
flash_message: false
});
})
</script>
......
......@@ -2,4 +2,4 @@ from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
......@@ -2,14 +2,13 @@
#
import json
from rest_framework.views import APIView
from rest_framework.views import Response
from rest_framework.views import Response, APIView
from ldap3 import Server, Connection
from django.core.mail import get_connection, send_mail
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from .permissions import IsSuperUser, IsAppUser
from .permissions import IsSuperUser
from .serializers import MailTestSerializer, LDAPTestSerializer
......@@ -105,3 +104,6 @@ class DjangoSettingsAPI(APIView):
if i.isupper():
configs[i] = str(getattr(settings, i))
return Response(configs)
# -*- coding: utf-8 -*-
#
from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_lazy as _
create_success_msg = _("<b>%(name)s</b> was created successfully")
update_success_msg = _("<b>%(name)s</b> was updated successfully")
\ No newline at end of file
update_success_msg = _("<b>%(name)s</b> was updated successfully")
FILE_END_GUARD = ">>> Content End <<<"
celery_task_pre_key = "CELERY_"
......@@ -79,3 +79,4 @@ class Setting(models.Model):
class Meta:
db_table = "settings"
from django.core.mail import send_mail
from django.conf import settings
from .celery import app
from celery import shared_task
from .utils import get_logger
logger = get_logger(__file__)
@app.task
@shared_task
def send_mail_async(*args, **kwargs):
""" Using celery to send email async
......
# -*- coding: utf-8 -*-
#
import re
import sys
from collections import OrderedDict
from six import string_types
import base64
......@@ -360,3 +361,20 @@ def get_signer():
signer = Signer(settings.SECRET_KEY)
return signer
class TeeObj:
origin_stdout = sys.stdout
def __init__(self, file_obj):
self.file_obj = file_obj
def write(self, msg):
self.origin_stdout.write(msg)
self.file_obj.write(msg.replace('*', ''))
def flush(self):
self.origin_stdout.flush()
self.file_obj.flush()
def close(self):
self.file_obj.close()
from django.views.generic import TemplateView
from django.shortcuts import render, redirect
from django.core.cache import cache
from django.views.generic import TemplateView, View, DetailView
from django.shortcuts import render, redirect, Http404, reverse
from django.contrib import messages
from django.utils.translation import ugettext as _
from django.conf import settings
from .forms import EmailSettingForm, LDAPSettingForm, BasicSettingForm, \
TerminalSettingForm
from .models import Setting
from .mixins import AdminUserRequiredMixin
from .signals import ldap_auth_enable
......@@ -120,3 +121,4 @@ class TerminalSettingView(AdminUserRequiredMixin, TemplateView):
context.update({"form": form})
return render(request, self.template_name, context)
This diff is collapsed.
from .celery import app as celery_app
# ~*~ coding: utf-8 ~*~
import sys
from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule
from .display import TeeObj
class AdHocResultCallback(CallbackModule):
"""
Task result Callback
"""
def __init__(self, display=None, options=None):
def __init__(self, display=None, options=None, file_obj=None):
# result_raw example: {
# "ok": {"hostname": {"task_name": {},...},..},
# "failed": {"hostname": {"task_name": {}..}, ..},
......@@ -22,6 +26,8 @@ class AdHocResultCallback(CallbackModule):
self.results_raw = dict(ok={}, failed={}, unreachable={}, skipped={})
self.results_summary = dict(contacted=[], dark={})
super().__init__()
if file_obj is not None:
sys.stdout = TeeObj(file_obj)
def gather_result(self, t, res):
self._clean_results(res._result, res._task.action)
......
# -*- coding: utf-8 -*-
#
import sys
class TeeObj:
origin_stdout = sys.stdout
def __init__(self, file_obj):
self.file_obj = file_obj
def write(self, msg):
self.origin_stdout.write(msg)
self.file_obj.write(msg.replace('*', ''))
def flush(self):
self.origin_stdout.flush()
self.file_obj.flush()
......@@ -29,7 +29,6 @@ class BaseHost(Host):
}
"groups": [],
"vars": {},
"other_ansbile_vars":
}
"""
self.host_data = host_data
......@@ -79,7 +78,7 @@ class BaseInventory(InventoryManager):
variable_manager_class = VariableManager
host_manager_class = BaseHost
def __init__(self, host_list=None):
def __init__(self, host_list=None, group_list=None):
"""
用于生成动态构建Ansible Inventory. super().__init__ 会自动调用
host_list: [{
......@@ -98,11 +97,14 @@ class BaseInventory(InventoryManager):
"vars": {},
},
]
group_list: [
{"name: "", children: [""]},
]
:param host_list:
:param group_list
"""
if host_list is None:
host_list = []
self.host_list = host_list
self.host_list = host_list or []
self.group_list = group_list or []
assert isinstance(host_list, list)
self.loader = self.loader_class()
self.variable_manager = self.variable_manager_class()
......@@ -114,25 +116,40 @@ class BaseInventory(InventoryManager):
def get_group(self, name):
return self._inventory.groups.get(name, None)
def parse_sources(self, cache=False):
group_all = self.get_group('all')
ungrouped = self.get_group('ungrouped')
def get_or_create_group(self, name):
group = self.get_group(name)
if not group:
self.add_group(name)
return self.get_or_create_group(name)
else:
return group
def parse_groups(self):
for g in self.group_list:
parent = self.get_or_create_group(g.get("name"))
children = [self.get_or_create_group(n) for n in g.get('children', [])]
for child in children:
parent.add_child_group(child)
def parse_hosts(self):
group_all = self.get_or_create_group('all')
ungrouped = self.get_or_create_group('ungrouped')
for host_data in self.host_list:
host = self.host_manager_class(host_data=host_data)
self.hosts[host_data['hostname']] = host
groups_data = host_data.get('groups')
if groups_data:
for group_name in groups_data:
group = self.get_group(group_name)
if group is None:
self.add_group(group_name)
group = self.get_group(group_name)
group = self.get_or_create_group(group_name)
group.add_host(host)
else:
ungrouped.add_host(host)
group_all.add_host(host)
def parse_sources(self, cache=False):
self.parse_groups()
self.parse_hosts()
def get_matched_hosts(self, pattern):
return self.get_hosts(pattern)
......
......@@ -9,6 +9,7 @@ from ansible.parsing.dataloader import DataLoader
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.playbook.play import Play
import ansible.constants as C
from ansible.utils.display import Display
from .callback import AdHocResultCallback, PlaybookResultCallBack, \
CommandResultCallback
......@@ -21,6 +22,13 @@ C.HOST_KEY_CHECKING = False
logger = get_logger(__name__)
class CustomDisplay(Display):
def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False):
pass
display = CustomDisplay()
Options = namedtuple('Options', [
'listtags', 'listtasks', 'listhosts', 'syntax', 'connection',
'module_path', 'forks', 'remote_user', 'private_key_file', 'timeout',
......@@ -123,20 +131,22 @@ class AdHocRunner:
ADHoc Runner接口
"""
results_callback_class = AdHocResultCallback
results_callback = None
loader_class = DataLoader
variable_manager_class = VariableManager
options = get_default_options()
default_options = get_default_options()
def __init__(self, inventory, options=None):
if options:
self.options = options
self.options = self.update_options(options)
self.inventory = inventory
self.loader = DataLoader()
self.variable_manager = VariableManager(
loader=self.loader, inventory=self.inventory
)
def get_result_callback(self, file_obj=None):
return self.__class__.results_callback_class(file_obj=file_obj)
@staticmethod
def check_module_args(module_name, module_args=''):
if module_name in C.MODULE_REQUIRE_ARGS and not module_args:
......@@ -160,19 +170,24 @@ class AdHocRunner:
cleaned_tasks.append(task)
return cleaned_tasks
def set_option(self, k, v):
kwargs = {k: v}
self.options = self.options._replace(**kwargs)
def update_options(self, options):
if options and isinstance(options, dict):
options = self.__class__.default_options._replace(**options)
else:
options = self.__class__.default_options
return options
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no'):
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no', file_obj=None):
"""
:param tasks: [{'action': {'module': 'shell', 'args': 'ls'}, ...}, ]
:param pattern: all, *, or others
:param play_name: The play name
:param gather_facts:
:param file_obj: logging to file_obj
:return:
"""
self.check_pattern(pattern)
results_callback = self.results_callback_class()
self.results_callback = self.get_result_callback(file_obj)
cleaned_tasks = self.clean_tasks(tasks)
play_source = dict(
......@@ -193,16 +208,16 @@ class AdHocRunner:
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
stdout_callback=results_callback,
stdout_callback=self.results_callback,
passwords=self.options.passwords,
)
logger.debug("Get inventory matched hosts: {}".format(
print("Get matched hosts: {}".format(
self.inventory.get_matched_hosts(pattern)
))
try:
tqm.run(play)
return results_callback
return self.results_callback
except Exception as e:
raise AnsibleError(e)
finally:
......
# ~*~ coding: utf-8 ~*~
import uuid
import os
from django.core.cache import cache
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext as _
from rest_framework import viewsets, generics
from rest_framework.views import Response
from .hands import IsSuperUser
from .models import Task, AdHoc, AdHocRunHistory
from .serializers import TaskSerializer, AdHocSerializer, AdHocRunHistorySerializer
from .models import Task, AdHoc, AdHocRunHistory, CeleryTask
from .serializers import TaskSerializer, AdHocSerializer, \
AdHocRunHistorySerializer
from .tasks import run_ansible_task
......@@ -24,8 +28,8 @@ class TaskRun(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
task = self.get_object()
run_ansible_task.delay(str(task.id))
return Response({"msg": "start"})
t = run_ansible_task.delay(str(task.id))
return Response({"task": t.id})
class AdHocViewSet(viewsets.ModelViewSet):
......@@ -58,3 +62,30 @@ class AdHocRunHistorySet(viewsets.ModelViewSet):
adhoc = get_object_or_404(AdHoc, id=adhoc_id)
self.queryset = self.queryset.filter(adhoc=adhoc)
return self.queryset
class CeleryTaskLogApi(generics.RetrieveAPIView):
permission_classes = (IsSuperUser,)
buff_size = 1024 * 10
end = False
queryset = CeleryTask.objects.all()
def get(self, request, *args, **kwargs):
mark = request.query_params.get("mark") or str(uuid.uuid4())
task = super().get_object()
log_path = task.full_log_path
if not log_path or not os.path.isfile(log_path):
return Response({"data": _("Waiting ...")}, status=203)
with open(log_path, 'r') as f:
offset = cache.get(mark, 0)
f.seek(offset)
data = f.read(self.buff_size).replace('\n', '\r\n')
mark = str(uuid.uuid4())
cache.set(mark, f.tell(), 5)
if data == '' and task.is_finished():
self.end = True
return Response({"data": data, 'end': self.end, 'mark': mark})
......@@ -5,3 +5,7 @@ from django.apps import AppConfig
class OpsConfig(AppConfig):
name = 'ops'
def ready(self):
super().ready()
from .celery import signal_handler
# -*- coding: utf-8 -*-
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')
from django.conf import settings
app = Celery('jumpserver')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
# -*- coding: utf-8 -*-
#
# -*- coding: utf-8 -*-
#
import os
import datetime
import sys
from django.conf import settings
from django.utils import timezone
from django.core.cache import cache
from django.db import transaction
from celery import subtask
from celery.signals import worker_ready, worker_shutdown, task_prerun, \
task_postrun, after_task_publish
from django_celery_beat.models import PeriodicTask
from common.utils import get_logger, TeeObj, get_object_or_none
from common.const import celery_task_pre_key
from .utils import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
from ..models import CeleryTask
logger = get_logger(__file__)
@worker_ready.connect
def on_app_ready(sender=None, headers=None, body=None, **kwargs):
if cache.get("CELERY_APP_READY", 0) == 1:
return
cache.set("CELERY_APP_READY", 1, 10)
logger.debug("App ready signal recv")
tasks = get_after_app_ready_tasks()
logger.debug("Start need start task: [{}]".format(
", ".join(tasks))
)
for task in tasks:
subtask(task).delay()
@worker_shutdown.connect
def after_app_shutdown(sender=None, headers=None, body=None, **kwargs):
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
return
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
tasks = get_after_app_shutdown_clean_tasks()
logger.debug("App shutdown signal recv")
logger.debug("Clean need cleaned period tasks: [{}]".format(
', '.join(tasks))
)
PeriodicTask.objects.filter(name__in=tasks).delete()
@after_task_publish.connect
def after_task_publish_signal_handler(sender, headers=None, **kwargs):
CeleryTask.objects.create(
id=headers["id"], status=CeleryTask.WAITING, name=headers["task"]
)
@task_prerun.connect
def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
t = get_object_or_none(CeleryTask, id=task_id)
if t is None:
logger.warn("Not get the task: {}".format(task_id))
return
now = datetime.datetime.now().strftime("%Y-%m-%d")
log_path = os.path.join(now, task_id + '.log')
full_path = os.path.join(CeleryTask.LOG_DIR, log_path)
if not os.path.exists(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
with transaction.atomic():
t.date_start = timezone.now()
t.status = CeleryTask.RUNNING
t.log_path = log_path
t.save()
f = open(full_path, 'w')
tee = TeeObj(f)
sys.stdout = tee
task.log_f = tee
@task_postrun.connect
def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs):
t = get_object_or_none(CeleryTask, id=task_id)
if t is None:
logger.warn("Not get the task: {}".format(task_id))
return
with transaction.atomic():
t.status = CeleryTask.FINISHED
t.date_finished = timezone.now()
t.save()
task.log_f.flush()
sys.stdout = task.log_f.origin_stdout
task.log_f.close()
# ~*~ coding: utf-8 ~*~
import os
# -*- coding: utf-8 -*-
#
import json
from functools import wraps
from celery import Celery, subtask
from celery.signals import worker_ready, worker_shutdown
from django.db.utils import ProgrammingError, OperationalError
from django.core.cache import cache
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from .utils import get_logger
logger = get_logger(__file__)
def add_register_period_task(name):
key = "__REGISTER_PERIODIC_TASKS"
value = cache.get(key, [])
value.append(name)
cache.set(key, value)
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')
from django.conf import settings
from django.core.cache import cache
def get_register_period_tasks():
key = "__REGISTER_PERIODIC_TASKS"
return cache.get(key, [])
def add_after_app_shutdown_clean_task(name):
key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS"
value = cache.get(key, [])
value.append(name)
cache.set(key, value)
def get_after_app_shutdown_clean_tasks():
key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS"
return cache.get(key, [])
app = Celery('jumpserver')
def add_after_app_ready_task(name):
key = "__AFTER_APP_READY_RUN_TASKS"
value = cache.get(key, [])
value.append(name)
cache.set(key, value)
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
def get_after_app_ready_tasks():
key = "__AFTER_APP_READY_RUN_TASKS"
return cache.get(key, [])
def create_or_update_celery_periodic_tasks(tasks):
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
"""
:param tasks: {
'add-every-monday-morning': {
......@@ -106,11 +123,6 @@ def delete_celery_periodic_task(task_name):
PeriodicTask.objects.filter(name=task_name).delete()
__REGISTER_PERIODIC_TASKS = []
__AFTER_APP_SHUTDOWN_CLEAN_TASKS = []
__AFTER_APP_READY_RUN_TASKS = []
def register_as_period_task(crontab=None, interval=None):
"""
Warning: Task must be have not any args and kwargs
......@@ -128,7 +140,7 @@ def register_as_period_task(crontab=None, interval=None):
# Because when this decorator run, the task was not created,
# So we can't use func.name
name = '{func.__module__}.{func.__name__}'.format(func=func)
if name not in __REGISTER_PERIODIC_TASKS:
if name not in get_register_period_tasks():
create_or_update_celery_periodic_tasks({
name: {
'task': name,
......@@ -138,7 +150,7 @@ def register_as_period_task(crontab=None, interval=None):
'enabled': True,
}
})
__REGISTER_PERIODIC_TASKS.append(name)
add_register_period_task(name)
@wraps(func)
def wrapper(*args, **kwargs):
......@@ -151,13 +163,12 @@ def after_app_ready_start(func):
# Because when this decorator run, the task was not created,
# So we can't use func.name
name = '{func.__module__}.{func.__name__}'.format(func=func)
if name not in __AFTER_APP_READY_RUN_TASKS:
__AFTER_APP_READY_RUN_TASKS.append(name)
if name not in get_after_app_ready_tasks():
add_after_app_ready_task(name)
@wraps(func)
def decorate(*args, **kwargs):
return func(*args, **kwargs)
return decorate
......@@ -165,37 +176,10 @@ def after_app_shutdown_clean(func):
# Because when this decorator run, the task was not created,
# So we can't use func.name
name = '{func.__module__}.{func.__name__}'.format(func=func)
if name not in __AFTER_APP_READY_RUN_TASKS:
__AFTER_APP_SHUTDOWN_CLEAN_TASKS.append(name)
if name not in get_after_app_shutdown_clean_tasks():
add_after_app_shutdown_clean_task(name)
@wraps(func)
def decorate(*args, **kwargs):
return func(*args, **kwargs)
return decorate
@worker_ready.connect
def on_app_ready(sender=None, headers=None, body=None, **kwargs):
if cache.get("CELERY_APP_READY", 0) == 1:
return
cache.set("CELERY_APP_READY", 1, 10)
logger.debug("App ready signal recv")
logger.debug("Start need start task: [{}]".format(
", ".join(__AFTER_APP_READY_RUN_TASKS))
)
for task in __AFTER_APP_READY_RUN_TASKS:
subtask(task).delay()
@worker_shutdown.connect
def after_app_shutdown(sender=None, headers=None, body=None, **kwargs):
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
return
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
from django_celery_beat.models import PeriodicTask
logger.debug("App shutdown signal recv")
logger.debug("Clean need cleaned period tasks: [{}]".format(
', '.join(__AFTER_APP_SHUTDOWN_CLEAN_TASKS))
)
PeriodicTask.objects.filter(name__in=__AFTER_APP_SHUTDOWN_CLEAN_TASKS).delete()
......@@ -9,29 +9,18 @@ __all__ = [
]
def make_proxy_command(asset):
gateway = asset.domain.random_gateway()
proxy_command = [
"ssh", "-p", str(gateway.port),
"{}@{}".format(gateway.username, gateway.ip),
"-W", "%h:%p", "-q",
]
if gateway.password:
proxy_command.insert(0, "sshpass -p {}".format(gateway.password))
if gateway.private_key:
proxy_command.append("-i {}".format(gateway.private_key_file))
return {"ansible_ssh_common_args": "'-o ProxyCommand={}'".format(" ".join(proxy_command))}
class JMSInventory(BaseInventory):
"""
JMS Inventory is the manager with jumpserver assets, so you can
write you own manager, construct you inventory
"""
def __init__(self, hostname_list, run_as_admin=False, run_as=None, become_info=None):
"""
:param hostname_list: ["test1", ]
:param run_as_admin: True 是否使用管理用户去执行, 每台服务器的管理用户可能不同
:param run_as: 是否统一使用某个系统用户去执行
:param become_info: 是否become成某个用户去执行
"""
self.hostname_list = hostname_list
self.using_admin = run_as_admin
self.run_as = run_as
......@@ -41,23 +30,14 @@ class JMSInventory(BaseInventory):
host_list = []
for asset in assets:
vars = {}
if run_as_admin:
info = asset._to_secret_json()
else:
info = asset.to_json()
info["vars"] = vars
if asset.domain and asset.domain.has_gateway():
vars.update(make_proxy_command(asset))
info.update(vars)
info = self.convert_to_ansible(asset, run_as_admin=run_as_admin)
host_list.append(info)
if run_as:
run_user_info = self.get_run_user_info()
for host in host_list:
host.update(run_user_info)
if become_info:
for host in host_list:
host.update(become_info)
......@@ -67,9 +47,57 @@ class JMSInventory(BaseInventory):
assets = get_assets_by_hostname_list(self.hostname_list)
return assets
def convert_to_ansible(self, asset, run_as_admin=False):
info = {
'id': asset.id,
'hostname': asset.hostname,
'ip': asset.ip,
'port': asset.port,
'vars': dict(),
'groups': [],
}
if asset.domain and asset.domain.has_gateway():
info["vars"].update(self.make_proxy_command(asset))
if run_as_admin:
info.update(asset.get_auth_info())
for node in asset.nodes.all():
info["groups"].append(node.value)
for label in asset.labels.all():
info["vars"].update({
label.name: label.value
})
info["groups"].append("{}:{}".format(label.name, label.value))
if asset.domain:
info["vars"].update({
"domain": asset.domain.name,
})
info["groups"].append("domain_"+asset.domain.name)
return info
def get_run_user_info(self):
system_user = get_system_user_by_name(self.run_as)
if not system_user:
return {}
else:
return system_user._to_secret_json()
@staticmethod
def make_proxy_command(asset):
gateway = asset.domain.random_gateway()
proxy_command_list = [
"ssh", "-p", str(gateway.port),
"{}@{}".format(gateway.username, gateway.ip),
"-W", "%h:%p", "-q",
]
if gateway.password:
proxy_command_list.insert(
0, "sshpass -p {}".format(gateway.password)
)
if gateway.private_key:
proxy_command_list.append("-i {}".format(gateway.private_key_file))
proxy_command = "'-o ProxyCommand={}'".format(
" ".join(proxy_command_list)
)
return {"ansible_ssh_common_args": proxy_command}
# -*- coding: utf-8 -*-
#
from .adhoc import *
from .celery import *
\ No newline at end of file
......@@ -2,18 +2,23 @@
import json
import uuid
import os
import time
import datetime
from celery import current_task
from django.db import models
from django.conf import settings
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask
from django_celery_beat.models import PeriodicTask
from common.utils import get_signer, get_logger
from common.celery import delete_celery_periodic_task, create_or_update_celery_periodic_tasks, \
disable_celery_periodic_task
from .ansible import AdHocRunner, AnsibleError
from .inventory import JMSInventory
from ..celery.utils import delete_celery_periodic_task, \
create_or_update_celery_periodic_tasks, \
disable_celery_periodic_task
from ..ansible import AdHocRunner, AnsibleError
from ..inventory import JMSInventory
__all__ = ["Task", "AdHoc", "AdHocRunHistory"]
......@@ -85,7 +90,7 @@ class Task(models.Model):
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
from .tasks import run_ansible_task
from ..tasks import run_ansible_task
super().save(
force_insert=force_insert, force_update=force_update,
using=using, update_fields=update_fields,
......@@ -206,10 +211,18 @@ class AdHoc(models.Model):
return self._run_only()
def _run_and_record(self):
history = AdHocRunHistory(adhoc=self, task=self.task)
try:
hid = current_task.request.id
except AttributeError:
hid = str(uuid.uuid4())
history = AdHocRunHistory(id=hid, adhoc=self, task=self.task)
time_start = time.time()
try:
date_start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("{} Start task: {}\r\n".format(date_start, self.task.name))
raw, summary = self._run_only()
date_end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("\r\n{} Task finished".format(date_end))
history.is_finished = True
if summary.get('dark'):
history.is_success = False
......@@ -221,17 +234,20 @@ class AdHoc(models.Model):
except Exception as e:
return {}, {"dark": {"all": str(e)}, "contacted": []}
finally:
# f.close()
history.date_finished = timezone.now()
history.timedelta = time.time() - time_start
history.save()
def _run_only(self):
runner = AdHocRunner(self.inventory)
for k, v in self.options.items():
runner.set_option(k, v)
def _run_only(self, file_obj=None):
runner = AdHocRunner(self.inventory, options=self.options)
try:
result = runner.run(self.tasks, self.pattern, self.task.name)
result = runner.run(
self.tasks,
self.pattern,
self.task.name,
file_obj=file_obj,
)
return result.results_raw, result.results_summary
except AnsibleError as e:
logger.warn("Failed run adhoc {}, {}".format(self.task.name, e))
......@@ -316,6 +332,14 @@ class AdHocRunHistory(models.Model):
def short_id(self):
return str(self.id).split('-')[-1]
@property
def log_path(self):
dt = datetime.datetime.now().strftime('%Y-%m-%d')
log_dir = os.path.join(settings.PROJECT_DIR, 'data', 'ansible', dt)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
return os.path.join(log_dir, str(self.id) + '.log')
@property
def result(self):
if self._result:
......
# -*- coding: utf-8 -*-
#
import uuid
import os
from django.conf import settings
from django.db import models
class CeleryTask(models.Model):
WAITING = "waiting"
RUNNING = "running"
FINISHED = "finished"
LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery')
STATUS_CHOICES = (
(WAITING, WAITING),
(RUNNING, RUNNING),
(FINISHED, FINISHED),
)
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
name = models.CharField(max_length=1024)
status = models.CharField(max_length=128, choices=STATUS_CHOICES)
log_path = models.CharField(max_length=256, blank=True, null=True)
date_published = models.DateTimeField(auto_now_add=True)
date_start = models.DateTimeField(null=True)
date_finished = models.DateTimeField(null=True)
def __str__(self):
return "{}: {}".format(self.name, self.id)
def is_finished(self):
return self.status == self.FINISHED
@property
def full_log_path(self):
return os.path.join(self.LOG_DIR, self.log_path)
......@@ -12,14 +12,13 @@ def rerun_task():
@shared_task
def run_ansible_task(task_id, callback=None, **kwargs):
def run_ansible_task(tid, callback=None, **kwargs):
"""
:param task_id: is the tasks serialized data
:param tid: is the tasks serialized data
:param callback: callback function name
:return:
"""
task = get_object_or_none(Task, id=task_id)
task = get_object_or_none(Task, id=tid)
if task:
result = task.run()
if callback is not None:
......
......@@ -82,7 +82,8 @@ function initTable() {
select: [],
columnDefs: [
{targets: 1, createdCell: function (td, cellData, rowData) {
$(td).html(cellData);
var d = new Date(cellData);
$(td).html(d);
}},
{targets: 2, createdCell: function (td, cellData) {
var total = "<span>" + cellData.total + "</span>";
......
......@@ -18,6 +18,9 @@
<li class="active">
<a href="{% url 'ops:adhoc-history-detail' pk=object.pk %}" class="text-center"><i class="fa fa-laptop"></i> {% trans 'Run history detail' %} </a>
</li>
<li>
<a class="text-center celery-task-log" onclick="window.open('{% url 'ops:celery-task-log' pk=object.pk %}','', 'width=800,height=600')"><i class="fa fa-laptop"></i> {% trans 'Output' %} </a>
</li>
</ul>
</div>
<div class="tab-content">
......
{% load static %}
<head>
<title>term.js</title>
<script src="{% static 'js/jquery-2.1.1.js' %}"></script>
<style>
html {
background: #000;
}
h1 {
margin-bottom: 20px;
font: 20px/1.5 sans-serif;
}
.terminal {
float: left;
font-family: 'Monaco', 'Consolas', "DejaVu Sans Mono", "Liberation Mono", monospace;
font-size: 12px;
color: #f0f0f0;
background-color: #555;
padding: 20px 20px 20px;
}
.terminal-cursor {
color: #000;
background: #f0f0f0;
}
</style>
</head>
<div class="container">
<div id="term">
</div>
</div>
<script src="{% static 'js/term.js' %}"></script>
<script>
var rowHeight = 1;
var colWidth = 1;
var mark = '';
var url = "{% url 'api-ops:celery-task-log' pk=object.id %}";
var term;
var end = false;
var error = false;
var interval = 200;
function calWinSize() {
var t = $('.terminal');
rowHeight = 1.00 * t.height() / 24;
colWidth = 1.00 * t.width() / 80;
}
function resize() {
var rows = Math.floor(window.innerHeight / rowHeight) - 2;
var cols = Math.floor(window.innerWidth / colWidth) - 10;
term.resize(cols, rows);
}
function requestAndWrite() {
if (!end) {
$.ajax({
url: url + '?mark=' + mark,
method: "GET",
contentType: "application/json; charset=utf-8"
}).done(function(data, textStatue, jqXHR) {
if (jqXHR.status === 203) {
error = true;
term.write('.');
interval = 500;
}
if (jqXHR.status === 200){
term.write(data.data);
mark = data.mark;
if (data.end){
end = true
}
}
})
}
}
$(document).ready(function () {
term = new Terminal({
cols: 80,
rows: 24,
useStyle: true,
screenKeys: false,
convertEol: false,
cursorBlink: false
});
term.open();
term.on('data', function (data) {
term.write(data.replace('\r', '\r\n'))
});
calWinSize();
resize();
$('.terminal').detach().appendTo('#term');
setInterval(function () {
requestAndWrite()
}, interval)
});
</script>
......@@ -24,6 +24,9 @@
<li>
<a href="{% url 'ops:task-history' pk=object.pk %}" class="text-center"><i class="fa fa-laptop"></i> {% trans 'Run history' %} </a>
</li>
<li>
<a class="text-center celery-task-log" onclick="window.open('{% url 'ops:celery-task-log' pk=object.latest_history.pk %}','', 'width=800,height=600')"><i class="fa fa-laptop"></i> {% trans 'Last run output' %} </a>
</li>
</ul>
</div>
<div class="tab-content">
......@@ -105,6 +108,10 @@
$(td).html(cellData.user)
}
}},
{targets: 6, createdCell: function (td, cellData) {
var d = new Date(cellData);
$(td).html(d.toLocaleString())
}},
{targets: 7, createdCell: function (td, cellData, rowData) {
var detail_btn = '<a class="btn btn-xs btn-primary m-l-xs btn-run" href="{% url 'ops:adhoc-detail' pk=DEFAULT_PK %}">{% trans "Detail" %}</a>'.replace('{{ DEFAULT_PK }}', cellData);
if (cellData) {
......
......@@ -24,6 +24,9 @@
<li>
<a href="{% url 'ops:task-history' pk=object.pk %}" class="text-center"><i class="fa fa-laptop"></i> {% trans 'Run history' %} </a>
</li>
<li>
<a class="text-center celery-task-log" onclick="window.open('{% url 'ops:celery-task-log' pk=object.latest_history.pk %}','', 'width=800,height=600')"><i class="fa fa-laptop"></i> {% trans 'Last run output' %} </a>
</li>
</ul>
</div>
<div class="tab-content">
......@@ -160,6 +163,5 @@
</div>
</div>
</div>
{% include 'users/_user_update_pk_modal.html' %}
{% endblock %}
......@@ -24,13 +24,16 @@
<li class="active">
<a href="{% url 'ops:task-history' pk=object.pk %}" class="text-center"><i class="fa fa-laptop"></i> {% trans 'Run history' %} </a>
</li>
<li>
<a class="text-center celery-task-log" onclick="window.open('{% url 'ops:celery-task-log' pk=object.latest_history.pk %}','', 'width=800,height=600')"><i class="fa fa-laptop"></i> {% trans 'Last run output' %} </a>
</li>
</ul>
</div>
<div class="tab-content">
<div class="col-sm-12" style="padding-left: 0">
<div class="ibox float-e-margins">
<div class="ibox-title">
<span style="float: left">{% trans 'History of ' %} <b>{{ object.task.name }}:{{ object.short_id }}</b></span>
<span style="float: left">{% trans 'History of ' %} <b>{{ object.name }}:{{ object.short_id }}</b></span>
<div class="ibox-tools">
<a class="collapse-link">
<i class="fa fa-chevron-up"></i>
......@@ -85,7 +88,8 @@ function initTable() {
select: [],
columnDefs: [
{targets: 1, createdCell: function (td, cellData, rowData) {
$(td).html(cellData);
var d = new Date(cellData);
$(td).html(d.toLocaleString());
}},
{targets: 2, createdCell: function (td, cellData) {
var total = "<span>" + cellData.total + "</span>";
......
{% extends '_base_list.html' %}
{% load i18n %}
{% load static %}
{% block content_left_head %}
<link href="{% static 'css/plugins/datepicker/datepicker3.css' %}" rel="stylesheet">
<div class="uc pull-left m-r-5"><a class="btn btn-sm btn-primary btn-create-asset"> {% trans "Create task" %} </a></div>
{% endblock %}
......@@ -111,9 +112,10 @@ $(document).ready(function() {
var error = function (data) {
alert(data)
};
var success = function () {
alert("任务开始执行,重定向到任务详情页面,多刷新几次查看结果")
window.location = "{% url 'ops:task-detail' pk=DEFAULT_PK %}".replace('{{ DEFAULT_PK }}', uid);
var success = function(data) {
var task_id = data.task;
var url = '{% url "ops:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id);
window.open(url, '', 'width=800,height=600')
};
APIUpdateAttr({
url: the_url,
......
......@@ -15,6 +15,7 @@ router.register(r'v1/history', api.AdHocRunHistorySet, 'history')
urlpatterns = [
url(r'^v1/tasks/(?P<pk>[0-9a-zA-Z\-]{36})/run/$', api.TaskRun.as_view(), name='task-run'),
url(r'^v1/celery/task/(?P<pk>[0-9a-zA-Z\-]{36})/log/$', api.CeleryTaskLogApi.as_view(), name='celery-task-log'),
]
urlpatterns += router.urls
......@@ -18,4 +18,5 @@ urlpatterns = [
url(r'^adhoc/(?P<pk>[0-9a-zA-Z\-]{36})/$', views.AdHocDetailView.as_view(), name='adhoc-detail'),
url(r'^adhoc/(?P<pk>[0-9a-zA-Z\-]{36})/history/$', views.AdHocHistoryView.as_view(), name='adhoc-history'),
url(r'^adhoc/history/(?P<pk>[0-9a-zA-Z\-]{36})/$', views.AdHocHistoryDetailView.as_view(), name='adhoc-history-detail'),
url(r'^celery/task/(?P<pk>[0-9a-zA-Z\-]{36})/log/$', views.CeleryTaskLogView.as_view(), name='celery-task-log'),
]
......@@ -2,10 +2,10 @@
from django.utils.translation import ugettext as _
from django.conf import settings
from django.views.generic import ListView, DetailView
from django.views.generic import ListView, DetailView, TemplateView
from common.mixins import DatetimeSearchMixin
from .models import Task, AdHoc, AdHocRunHistory
from .models import Task, AdHoc, AdHocRunHistory, CeleryTask
from .hands import AdminUserRequiredMixin
......@@ -118,4 +118,9 @@ class AdHocHistoryDetailView(AdminUserRequiredMixin, DetailView):
'action': _('Run history detail'),
}
kwargs.update(context)
return super().get_context_data(**kwargs)
\ No newline at end of file
return super().get_context_data(**kwargs)
class CeleryTaskLogView(AdminUserRequiredMixin, DetailView):
template_name = 'ops/celery_task_log.html'
model = CeleryTask
......@@ -157,7 +157,7 @@ function APIUpdateAttr(props) {
props = props || {};
var success_message = props.success_message || '更新成功!';
var fail_message = props.fail_message || '更新时发生未知错误.';
var flash_message = true;
var flash_message = props.flash_message || true;
if (props.flash_message === false){
flash_message = false;
}
......@@ -170,7 +170,9 @@ function APIUpdateAttr(props) {
dataType: props.data_type || "json"
}).done(function(data, textStatue, jqXHR) {
if (flash_message) {
toastr.success(success_message);
if (send_message) {
toastr.success(success_message);
}
}
if (typeof props.success === 'function') {
return props.success(data);
......
......@@ -5,8 +5,6 @@ from django.core.cache import cache
from django.db.utils import ProgrammingError, OperationalError
from common.utils import get_logger
from common.celery import after_app_ready_start, register_as_period_task, \
after_app_shutdown_clean
from .const import ASSETS_CACHE_KEY, USERS_CACHE_KEY, SYSTEM_USER_CACHE_KEY
RUNNING = False
......
......@@ -6,7 +6,7 @@ import datetime
from celery import shared_task
from django.utils import timezone
from common.celery import register_as_period_task, after_app_ready_start, \
from ops.celery.utils import register_as_period_task, after_app_ready_start, \
after_app_shutdown_clean
from .models import Status, Session
......
......@@ -155,7 +155,7 @@ def start_celery():
cmd = [
'celery', 'worker',
'-A', 'common',
'-A', 'ops',
'-l', LOG_LEVEL.lower(),
'--pidfile', pid_file,
'-c', str(WORKERS),
......@@ -182,7 +182,7 @@ def start_beat():
scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
cmd = [
'celery', 'beat',
'-A', 'common',
'-A', 'ops',
'--pidfile', pid_file,
'-l', LOG_LEVEL,
'--scheduler', scheduler,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment