Commit b15b5efa authored by ibuler's avatar ibuler

[Feature] 将recorde拆出来完成

parent e9a6daa7
......@@ -49,18 +49,27 @@ class Coco:
self.clients = []
self.lock = threading.Lock()
self.stop_evt = threading.Event()
self._service = None
self._sshd = None
self._httpd = None
@property
def service(self):
return AppService(self)
if self._service is None:
self._service = AppService(self)
return self._service
@property
def sshd(self):
return SSHServer(self)
if self._sshd is None:
self._sshd = SSHServer(self)
return self._sshd
@property
def httpd(self):
return HttpServer(self)
if self._httpd is None:
self._httpd = HttpServer(self)
return self._httpd
def make_logger(self):
create_logger(self)
......@@ -73,10 +82,10 @@ class Coco:
self.make_logger()
self.service.initial()
self.load_extra_conf_from_server()
self.heartbeat()
self.keep_heartbeat()
self.monitor_sessions()
def heartbeat(self):
def keep_heartbeat(self):
def func():
while not self.stop_evt.is_set():
_sessions = [s.to_json() for s in self.sessions]
......@@ -95,7 +104,7 @@ class Coco:
def func():
while not self.stop_evt.is_set():
for s in self.sessions:
if not s.is_finished:
if not s.stop_evt.is_set():
continue
if s.date_finished is None:
self.remove_session(s)
......
# coding: utf-8
import socket
import threading
import logging
......@@ -9,11 +8,13 @@ import paramiko
from .session import Session
from .models import Server
from .record import FileRecorder
from .utils import wrap_with_line_feed as wr
logger = logging.getLogger(__file__)
TIMEOUT = 8
BUF_SIZE = 4096
class ProxyServer:
......@@ -32,7 +33,11 @@ class ProxyServer:
session = Session(self.client, self.server)
self.app.add_session(session)
self.watch_win_size_change_async()
session.add_recorder()
recorder = FileRecorder(self.app, session)
session.add_recorder(recorder)
session.record_async()
self.server.add_recorder(recorder)
self.server.record_command_async()
session.bridge()
session.stop_evt.set()
......@@ -55,7 +60,7 @@ class ProxyServer:
self.app.service.get_system_user_auth_info(system_user)
def get_server_conn(self, asset, system_user):
logger.info("Connect to %s" % asset.hostname)
logger.info("Connect to {}".format(asset.hostname))
if not self.validate_permission(asset, system_user):
self.client.send(_('No permission'))
return None
......
......@@ -39,7 +39,7 @@ class BaseWehSocketHandler:
class InteractiveWehSocketHandler(BaseWehSocketHandler, tornado.websocket.WebSocketHandler):
@tornado.web.authenticated
def open(self):
InteractiveServer(self.app, self.client).activate_async()
InteractiveServer(self.app, self.client).interact_async()
def on_message(self, message):
try:
......
......@@ -3,7 +3,6 @@ import logging
import socket
import threading
# Todo remove
from jms.models import Asset, AssetGroup
from . import char
......@@ -12,7 +11,6 @@ from .utils import wrap_with_line_feed as wr, wrap_with_title as title, \
is_obj_attr_has, is_obj_attr_eq, sort_assets, TtyIOParser, \
ugettext as _
from .forward import ProxyServer
from .session import Session
logger = logging.getLogger(__file__)
......@@ -172,9 +170,10 @@ class InteractiveServer:
self.display_search_result()
def display_search_result(self):
if len(self.search_result) == 0:
self.client.send(warning("Nothing match"))
return
# if len(self.search_result) == 0:
# self.client.send(warning("Nothing match"))
# return
print("Total assets: ".format(len(self.assets)))
self.search_result = sort_assets(self.search_result, self.app.config["ASSET_LIST_SORT_BY"])
fake_asset = Asset(hostname=_("Hostname"), ip=_("IP"), system_users_join=_("LoginAs"), comment=_("Comment"))
......@@ -189,7 +188,9 @@ class InteractiveServer:
self.client.send(wr(title(header.format(fake_asset, "ID"))))
for index, asset in enumerate(self.search_result, 1):
self.client.send(wr(line.format(asset, index)))
self.client.send(wr(_("Total: {}").format(len(self.search_result)), before=1))
self.client.send(wr(_("Total: {} Matched: {}").format(
len(self.assets), len(self.search_result)), before=1)
)
def search_and_display(self, q):
self.search_assets(q)
......@@ -204,6 +205,7 @@ class InteractiveServer:
def get_user_assets(self):
self.assets = self.app.service.get_user_assets(self.client.user)
logger.debug("Get user {} assets total: {}".format(self.client.user, len(self.assets)))
def get_user_assets_async(self):
thread = threading.Thread(target=self.get_user_assets)
......@@ -260,7 +262,7 @@ class InteractiveServer:
break
self.close()
def activate_async(self):
def interact_async(self):
thread = threading.Thread(target=self.interact)
thread.daemon = True
thread.start()
......
......@@ -2,11 +2,13 @@ import json
import queue
import threading
import datetime
import logging
from . import char
from . import utils
BUF_SIZE = 4096
logger = logging.getLogger(__file__)
class Request:
......@@ -66,6 +68,7 @@ class Server:
self.system_user = system_user
self.send_bytes = 0
self.recv_bytes = 0
self.stop_evt = threading.Event()
self.input_data = []
self.output_data = []
......@@ -74,7 +77,10 @@ class Server:
self._in_vim_state = False
self.recorders = []
self.queue = queue.Queue()
self.filters = []
self._input = ""
self._output = ""
self.command_queue = queue.Queue()
def add_recorder(self, recorder):
self.recorders.append(recorder)
......@@ -82,13 +88,21 @@ class Server:
def remove_recorder(self, recorder):
self.recorders.remove(recorder)
def record_command(self, _input, _output):
while True:
_input, _output = self.queue.get()
for recorder in self.recorders:
t = threading.Thread(target=recorder.record_command,
args=(_input, _output))
t.start()
def add_filter(self, _filter):
self.filters.append(_filter)
def remove_filter(self, _filter):
self.filters.remove(_filter)
def record_command_async(self):
def func():
while not self.stop_evt.is_set():
_input, _output = self.command_queue.get()
logger.debug("Record command: ({},{})".format(_input, _output))
for recorder in self.recorders:
recorder.record_command(datetime.datetime.now(), _input, _output)
thread = threading.Thread(target=func)
thread.start()
def fileno(self):
return self.chan.fileno()
......@@ -101,14 +115,18 @@ class Server:
if self._have_enter_char(b):
self._in_input_state = False
self._input = self._parse_input()
else:
if not self._in_input_state:
print("#" * 30 + " 新周期 " + "#" * 30)
_input = self._parse_input()
_output = self._parse_output()
self.record_command(_input, _output)
self._output = self._parse_output()
print(self._input)
print(self._output)
self.command_queue.put((self._input, self._output))
del self.input_data[:]
del self.output_data[:]
self._input = ""
self._output = ""
self._in_input_state = True
return self.chan.send(b)
......
......@@ -2,11 +2,7 @@
#
import abc
import multiprocessing
import threading
import time
import datetime
import socket
import os
import logging
......@@ -20,52 +16,17 @@ class Recorder(metaclass=abc.ABCMeta):
def __init__(self, app, session):
self.app = app
self.session = session
self.replay_queue = multiprocessing.Queue()
self.command_queue = multiprocessing.Queue()
self.stop_evt = multiprocessing.Event()
@abc.abstractmethod
def record_replay(self):
def record_replay(self, now, timedelta, size, data):
pass
@abc.abstractmethod
def record_command(self, _input, _output):
pass
class FileRecorder(Recorder):
def record_replay(self):
parent, child = socket.socketpair()
self.session.add_watcher(parent)
session_dir = self.app.config["SESSION_DIR"]
with open(os.path.join(session_dir, session.id + ".rec"), 'wb') as dataf, \
open(os.path.join(session_dir, session.id + ".time"), "w") as timef:
dataf.write("Script started on {}\n".format(time.asctime()).encode("utf-8"))
while not self.stop_evt.is_set():
start_t = time.time()
data = child.recv(BUF_SIZE)
end_t = time.time()
size = len(data)
if size == 0:
break
timef.write("%.4f %s\n" % (end_t - start_t, size))
dataf.write(data)
dataf.write("Script done on {}\n".format(time.asctime()).encode("utf-8"))
def record_command(self, _input, _output):
pass
class SessionReplay(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write_meta(self, meta):
def record_command(self, now, _input, _output):
pass
@abc.abstractmethod
def write_data(self, data):
def start(self):
pass
@abc.abstractmethod
......@@ -73,51 +34,107 @@ class SessionReplay(metaclass=abc.ABCMeta):
pass
class SessionCommand(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write(self, cmd, output):
pass
class FileSessionReplay(SessionReplay):
def __init__(self, dataf, metaf):
self.dataf = dataf
self.metaf = metaf
self.playing = True
def write_data(self, data):
self.dataf.write(data)
def write_meta(self, meta):
self.metaf.write(meta)
class FileRecorder(Recorder):
def replay(self, sock):
sock.send(self.dataf.readline())
for l in self.metaf:
if not self.playing:
break
t, size = float(l.split()[0]), int(l.split()[1])
data = self.dataf.read(size)
time.sleep(t)
sock.send(data)
sock.send("Replay session end")
@property
def session_dir(self):
session_dir = os.path.join(
self.app.config["SESSION_DIR"],
self.session.date_created.strftime("%Y-%m-%d")
)
if not os.path.isdir(session_dir):
os.mkdir(session_dir)
return session_dir
@property
def data_f(self):
filename = os.path.join(self.session_dir, str(self.session.id) + ".rec")
try:
f = open(filename, 'wb')
except IOError:
logger.error("Failed open file {} in recorder".format(filename))
raise
return f
@property
def time_f(self):
filename = os.path.join(self.session_dir, str(self.session.id) + ".time")
try:
f = open(filename, 'w')
except IOError:
logger.error("Failed open file {} in recorder".format(filename))
raise
return f
@property
def cmd_f(self):
filename = os.path.join(self.session_dir, str(self.session.id) + ".cmd")
try:
f = open(filename, "w")
except IOError:
logger.error("Failed open file {} in recorder".format(filename))
raise
return f
def record_replay(self, now, timedelta, size, data):
self.time_f.write("%.4f %s\n" % (timedelta, size))
self.data_f.write(data)
def record_command(self, now, _input, _output):
self.cmd_f.write("{}\n".format(now.strftime("%Y-%m-%d %H:%M:%S")))
self.cmd_f.write("$ {}\n".format(_input))
self.cmd_f.write("{}\n\n".format(_output))
def start(self):
self.data_f.write("Session started on {}\n".format(time.asctime()).encode("utf-8"))
def done(self):
pass
class FileSessionCommand(SessionCommand):
def __init__(self, f):
self.f = f
self.data_f.write("Session done on {}\n".format(time.asctime()).encode("utf-8"))
for f in [self.data_f, self.time_f, self.cmd_f]:
try:
f.close()
except IOError:
pass
def write(self, cmd, output):
self.f.write("{}\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
self.f.write("$ {}\n".format(cmd))
self.f.write("{}\n\n".format(output))
def done(self):
pass
# class FileSessionReplay(SessionReplay):
#
# def __init__(self, dataf, metaf):
# self.dataf = dataf
# self.metaf = metaf
# self.playing = True
#
# def write_data(self, data):
# self.dataf.write(data)
#
# def write_meta(self, meta):
# self.metaf.write(meta)
#
# def replay(self, sock):
# sock.send(self.dataf.readline())
# for l in self.metaf:
# if not self.playing:
# break
# t, size = float(l.split()[0]), int(l.split()[1])
# data = self.dataf.read(size)
# time.sleep(t)
# sock.send(data)
# sock.send("Replay session end")
#
# def done(self):
# pass
#
#
# class FileSessionCommand(SessionCommand):
#
# def __init__(self, f):
# self.f = f
#
# def write(self, cmd, output):
# self.f.write("{}\n".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
# self.f.write("$ {}\n".format(cmd))
# self.f.write("{}\n\n".format(output))
#
# def done(self):
# pass
......@@ -25,7 +25,7 @@ class Session:
self.replaying = True
self.date_created = datetime.datetime.now()
self.date_finished = None
self.recorder = []
self.recorders = []
self.stop_evt = threading.Event()
self.sel = selectors.DefaultSelector()
......@@ -70,11 +70,16 @@ class Session:
self.sel.unregister(sharer)
self.sharers.remove(sharer)
def add_recorder(self, recorder):
self.recorders.append(recorder)
def remove_recorder(self, recorder):
self.recorders.remove(recorder)
def bridge(self):
"""
Bridge clients with server
:return:
"""
logger.info("Start bridge session %s" % self.id)
self.sel.register(self.client, selectors.EVENT_READ)
......@@ -92,39 +97,42 @@ class Session:
elif sock == self.client:
if len(data) == 0:
for watcher in self.watchers + self.sharers:
watcher.send("Client {} close the session"
.format(self.client)
.encode("utf-8"))
watcher.send("Client {} close the session".format(self.client).encode("utf-8"))
self.close()
break
self.server.send(data)
elif sock in self.sharers:
if len(data) == 0:
logger.info("Sharer {} leave session {}"
.format(sock, self.id))
logger.info("Sharer {} leave session {}".format(sock, self.id))
self.remove_sharer(sock)
self.server.send(data)
elif sock in self.watchers:
if len(data) == 0:
logger.info("Watcher {} leave session {}"
.format(sock, self.id))
logger.info("Watcher {} leave session {}".format(sock, self.id))
def set_size(self, width, height):
self.server.resize_pty(width=width, height=height)
def add_recorder(self, recorder):
self.recorder.append(recorder)
def remove_recorder(self, recorder):
self.recorder.remove(recorder)
def record(self):
"""
Record the session
:return:
"""
for recorder in self.recorder:
recorder.record(self)
def record_async(self):
def func():
parent, child = socket.socketpair()
for recorder in self.recorders:
recorder.start()
while not self.stop_evt.is_set():
start_t = time.time()
data = child.recv(BUF_SIZE)
end_t = time.time()
size = len(data)
now = datetime.datetime.now()
timedelta = '{.4f}'.format(end_t - start_t)
if size == 0:
break
for recorder in self.recorders:
recorder.record_replay(now, timedelta, size, data)
for recorder in self.recorders:
recorder.done()
thread = threading.Thread(target=func)
thread.start()
def close(self):
self.stop_evt.set()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment