Unverified Commit 7e7583e4 authored by 老广's avatar 老广 Committed by GitHub

Merge pull request #3321 from jumpserver/bugfix

[Update] 修改celery log
parents ca50c0a9 74b4ff49
...@@ -22,7 +22,8 @@ ...@@ -22,7 +22,8 @@
<script> <script>
var scheme = document.location.protocol === "https:" ? "wss" : "ws"; var scheme = document.location.protocol === "https:" ? "wss" : "ws";
var port = document.location.port ? ":" + document.location.port : ""; var port = document.location.port ? ":" + document.location.port : "";
var url = "/ws/ops/tasks/" + "{{ task_id }}" + "/log/"; var taskId = "{{ task_id }}";
var url = "/ws/ops/tasks/log/";
var wsURL = scheme + "://" + document.location.hostname + port + url; var wsURL = scheme + "://" + document.location.hostname + port + url;
var failOverPort = "{{ ws_port }}"; var failOverPort = "{{ ws_port }}";
var failOverWsURL = scheme + "://" + document.location.hostname + ':' + failOverPort + url; var failOverWsURL = scheme + "://" + document.location.hostname + ':' + failOverPort + url;
...@@ -46,6 +47,10 @@ ...@@ -46,6 +47,10 @@
var data = JSON.parse(e.data); var data = JSON.parse(e.data);
term.write(data.message); term.write(data.message);
}; };
ws.onopen = function() {
var msg = {"task": taskId};
ws.send(JSON.stringify(msg))
};
ws.onerror = function (e) { ws.onerror = function (e) {
ws = new WebSocket(failOverWsURL); ws = new WebSocket(failOverWsURL);
ws.onmessage = function(e) { ws.onmessage = function(e) {
......
...@@ -237,6 +237,7 @@ ...@@ -237,6 +237,7 @@
} }
var term = null; var term = null;
var ws = null;
function initResultTerminal() { function initResultTerminal() {
term = new Terminal({ term = new Terminal({
...@@ -254,7 +255,29 @@ ...@@ -254,7 +255,29 @@
term.open(document.getElementById('term')); term.open(document.getElementById('term'));
var msg = "{% trans 'Select the left asset, select the running system user, execute command in batch' %}" + "\r\n"; var msg = "{% trans 'Select the left asset, select the running system user, execute command in batch' %}" + "\r\n";
fit(term); fit(term);
term.write(msg) term.write(msg);
var scheme = document.location.protocol === "https:" ? "wss" : "ws";
var port = document.location.port ? ":" + document.location.port : "";
var url = "/ws/ops/tasks/log/";
var wsURL = scheme + "://" + document.location.hostname + port + url;
var failOverPort = "{{ ws_port }}";
var failOverWsURL = scheme + "://" + document.location.hostname + ':' + failOverPort + url;
ws = new WebSocket(wsURL);
ws.onerror = function (e) {
ws = new WebSocket(failOverWsURL);
ws.onmessage = function(e) {
var data = JSON.parse(e.data);
term.write(data.message);
};
ws.onerror = function (e) {
term.write("Connect websocket server error")
}
};
ws.onmessage = function(e) {
var data = JSON.parse(e.data);
term.write(data.message);
};
} }
function wrapperError(msg) { function wrapperError(msg) {
...@@ -289,35 +312,12 @@ ...@@ -289,35 +312,12 @@
run_as: run_as, run_as: run_as,
command: command command: command
}; };
var mark = '';
var log_url = null;
var end = false;
var error = false;
var int = null;
var interval = 200;
function writeExecutionOutput() { function writeExecutionOutput(taskId) {
if (!end) { var msg = "{% trans 'Pending' %} ";
$.ajax({ term.write(msg);
url: log_url + '?mark=' + mark, msg = JSON.stringify({task: taskId});
method: "GET", ws.send(msg);
contentType: "application/json; charset=utf-8"
}).done(function (data, textStatue, jqXHR) {
if (jqXHR.status === 203) {
error = true;
term.write('.');
interval = 500;
}
if (jqXHR.status === 200) {
term.write(data.data);
mark = data.mark;
if (data.end) {
end = true;
window.clearInterval(int)
}
}
})
}
} }
requestApi({ requestApi({
...@@ -326,12 +326,8 @@ ...@@ -326,12 +326,8 @@
method: 'POST', method: 'POST',
flash_message: false, flash_message: false,
success: function (resp) { success: function (resp) {
var msg = "{% trans 'Pending' %}"; {#log_url = resp.log_url;#}
term.write(msg + "...\r\n"); writeExecutionOutput(resp.id)
log_url = resp.log_url;
int = setInterval(function () {
writeExecutionOutput()
}, interval);
} }
}); });
return false; return false;
...@@ -340,6 +336,8 @@ ...@@ -340,6 +336,8 @@
var editor; var editor;
$(document).ready(function () { $(document).ready(function () {
systemUserId = $('#system-users-select').val(); systemUserId = $('#system-users-select').val();
$(".select2").select2({ $(".select2").select2({
dropdownAutoWidth: true, dropdownAutoWidth: true,
}).on('select2:select', function (evt) { }).on('select2:select', function (evt) {
......
...@@ -5,5 +5,5 @@ from .. import ws ...@@ -5,5 +5,5 @@ from .. import ws
app_name = 'ops' app_name = 'ops'
urlpatterns = [ urlpatterns = [
path('ws/ops/tasks/<uuid:task_id>/log/', ws.CeleryLogWebsocket, name='task-log-ws'), path('ws/ops/tasks/log/', ws.CeleryLogWebsocket, name='task-log-ws'),
] ]
...@@ -79,7 +79,8 @@ class CommandExecutionStartView(PermissionsMixin, TemplateView): ...@@ -79,7 +79,8 @@ class CommandExecutionStartView(PermissionsMixin, TemplateView):
'app': _('Ops'), 'app': _('Ops'),
'action': _('Command execution'), 'action': _('Command execution'),
'form': self.get_form(), 'form': self.get_form(),
'system_users': system_users 'system_users': system_users,
'ws_port': settings.CONFIG.WS_LISTEN_PORT
} }
kwargs.update(context) kwargs.update(context)
return super().get_context_data(**kwargs) return super().get_context_data(**kwargs)
......
import time import time
import os
import threading import threading
import json
from celery.result import AsyncResult
from .celery.utils import get_celery_task_log_path from .celery.utils import get_celery_task_log_path
from channels.generic.websocket import JsonWebsocketConsumer from channels.generic.websocket import JsonWebsocketConsumer
class CeleryLogWebsocket(JsonWebsocketConsumer): class CeleryLogWebsocket(JsonWebsocketConsumer):
task = ''
task_log_f = None
disconnected = False disconnected = False
def connect(self): def connect(self):
task_id = self.scope['url_route']['kwargs']['task_id'] self.accept()
def receive(self, text_data=None, bytes_data=None, **kwargs):
data = json.loads(text_data)
task_id = data.get("task")
if task_id:
self.handle_task(task_id)
def handle_task(self, task_id):
log_path = get_celery_task_log_path(task_id) log_path = get_celery_task_log_path(task_id)
def func():
task_log_f = None
while not self.disconnected:
if not os.path.exists(log_path):
self.send_json({'message': '.', 'task': task_id})
time.sleep(0.5)
continue
self.send_json({'message': '\r\n'})
try: try:
self.task_log_f = open(log_path) task_log_f = open(log_path)
break
except OSError: except OSError:
self.send({'message': "Task {} log not found".format(task_id)})
self.disconnect(None)
return return
self.accept()
self.send_log_to_client()
def disconnect(self, close_code):
self.disconnected = True
if self.task_log_f and not self.task_log_f.closed:
self.task_log_f.close()
self.close()
def send_log_to_client(self):
def func():
while not self.disconnected: while not self.disconnected:
data = self.task_log_f.read(4096) data = task_log_f.readline()
if data: if data:
data = data.replace('\n', '\r\n') data = data.replace('\n', '\r\n')
self.send_json({'message': data}) self.send_json({'message': data, 'task': task_id})
if data.startswith('Task') and data.find('succeeded'):
break
time.sleep(0.2) time.sleep(0.2)
task_log_f.close()
thread = threading.Thread(target=func) thread = threading.Thread(target=func)
thread.start() thread.start()
def disconnect(self, close_code):
self.disconnected = True
self.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment