Commit 74b4ff49 authored by ibuler's avatar ibuler

[Update] 修改celery log

parent 9ee9be33
......@@ -22,7 +22,8 @@
<script>
var scheme = document.location.protocol === "https:" ? "wss" : "ws";
var port = document.location.port ? ":" + document.location.port : "";
var url = "/ws/ops/tasks/" + "{{ task_id }}" + "/log/";
var taskId = "{{ task_id }}";
var url = "/ws/ops/tasks/log/";
var wsURL = scheme + "://" + document.location.hostname + port + url;
var failOverPort = "{{ ws_port }}";
var failOverWsURL = scheme + "://" + document.location.hostname + ':' + failOverPort + url;
......@@ -46,6 +47,10 @@
var data = JSON.parse(e.data);
term.write(data.message);
};
ws.onopen = function() {
var msg = {"task": taskId};
ws.send(JSON.stringify(msg))
};
ws.onerror = function (e) {
ws = new WebSocket(failOverWsURL);
ws.onmessage = function(e) {
......
......@@ -237,6 +237,7 @@
}
var term = null;
var ws = null;
function initResultTerminal() {
term = new Terminal({
......@@ -254,7 +255,29 @@
term.open(document.getElementById('term'));
var msg = "{% trans 'Select the left asset, select the running system user, execute command in batch' %}" + "\r\n";
fit(term);
term.write(msg)
term.write(msg);
var scheme = document.location.protocol === "https:" ? "wss" : "ws";
var port = document.location.port ? ":" + document.location.port : "";
var url = "/ws/ops/tasks/log/";
var wsURL = scheme + "://" + document.location.hostname + port + url;
var failOverPort = "{{ ws_port }}";
var failOverWsURL = scheme + "://" + document.location.hostname + ':' + failOverPort + url;
ws = new WebSocket(wsURL);
ws.onerror = function (e) {
ws = new WebSocket(failOverWsURL);
ws.onmessage = function(e) {
var data = JSON.parse(e.data);
term.write(data.message);
};
ws.onerror = function (e) {
term.write("Connect websocket server error")
}
};
ws.onmessage = function(e) {
var data = JSON.parse(e.data);
term.write(data.message);
};
}
function wrapperError(msg) {
......@@ -289,35 +312,12 @@
run_as: run_as,
command: command
};
var mark = '';
var log_url = null;
var end = false;
var error = false;
var int = null;
var interval = 200;
function writeExecutionOutput() {
if (!end) {
$.ajax({
url: log_url + '?mark=' + mark,
method: "GET",
contentType: "application/json; charset=utf-8"
}).done(function (data, textStatue, jqXHR) {
if (jqXHR.status === 203) {
error = true;
term.write('.');
interval = 500;
}
if (jqXHR.status === 200) {
term.write(data.data);
mark = data.mark;
if (data.end) {
end = true;
window.clearInterval(int)
}
}
})
}
function writeExecutionOutput(taskId) {
var msg = "{% trans 'Pending' %} ";
term.write(msg);
msg = JSON.stringify({task: taskId});
ws.send(msg);
}
requestApi({
......@@ -326,12 +326,8 @@
method: 'POST',
flash_message: false,
success: function (resp) {
var msg = "{% trans 'Pending' %}";
term.write(msg + "...\r\n");
log_url = resp.log_url;
int = setInterval(function () {
writeExecutionOutput()
}, interval);
{#log_url = resp.log_url;#}
writeExecutionOutput(resp.id)
}
});
return false;
......@@ -340,6 +336,8 @@
var editor;
$(document).ready(function () {
systemUserId = $('#system-users-select').val();
$(".select2").select2({
dropdownAutoWidth: true,
}).on('select2:select', function (evt) {
......
......@@ -5,5 +5,5 @@ from .. import ws
app_name = 'ops'
urlpatterns = [
path('ws/ops/tasks/<uuid:task_id>/log/', ws.CeleryLogWebsocket, name='task-log-ws'),
path('ws/ops/tasks/log/', ws.CeleryLogWebsocket, name='task-log-ws'),
]
......@@ -79,7 +79,8 @@ class CommandExecutionStartView(PermissionsMixin, TemplateView):
'app': _('Ops'),
'action': _('Command execution'),
'form': self.get_form(),
'system_users': system_users
'system_users': system_users,
'ws_port': settings.CONFIG.WS_LISTEN_PORT
}
kwargs.update(context)
return super().get_context_data(**kwargs)
......
import time
import os
import threading
import json
from celery.result import AsyncResult
from .celery.utils import get_celery_task_log_path
from channels.generic.websocket import JsonWebsocketConsumer
class CeleryLogWebsocket(JsonWebsocketConsumer):
task = ''
task_log_f = None
disconnected = False
def connect(self):
task_id = self.scope['url_route']['kwargs']['task_id']
log_path = get_celery_task_log_path(task_id)
try:
self.task_log_f = open(log_path)
except OSError:
self.send({'message': "Task {} log not found".format(task_id)})
self.disconnect(None)
return
self.accept()
self.send_log_to_client()
def disconnect(self, close_code):
self.disconnected = True
if self.task_log_f and not self.task_log_f.closed:
self.task_log_f.close()
self.close()
def receive(self, text_data=None, bytes_data=None, **kwargs):
data = json.loads(text_data)
task_id = data.get("task")
if task_id:
self.handle_task(task_id)
def handle_task(self, task_id):
log_path = get_celery_task_log_path(task_id)
def send_log_to_client(self):
def func():
task_log_f = None
while not self.disconnected:
data = self.task_log_f.read(4096)
if not os.path.exists(log_path):
self.send_json({'message': '.', 'task': task_id})
time.sleep(0.5)
continue
self.send_json({'message': '\r\n'})
try:
task_log_f = open(log_path)
break
except OSError:
return
while not self.disconnected:
data = task_log_f.readline()
if data:
data = data.replace('\n', '\r\n')
self.send_json({'message': data})
self.send_json({'message': data, 'task': task_id})
if data.startswith('Task') and data.find('succeeded'):
break
time.sleep(0.2)
task_log_f.close()
thread = threading.Thread(target=func)
thread.start()
def disconnect(self, close_code):
self.disconnected = True
self.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment