Commit 94b942bb authored by ibuler's avatar ibuler

[Feture] 发送records replay

parent c6856f78
......@@ -8,7 +8,7 @@ import paramiko
from .session import Session
from .models import Server
from .record import LocalFileReplayRecorder, LocalFileCommandRecorder
from .record import LocalFileReplayRecorder, LocalFileCommandRecorder, ServerReplayRecorder
from .utils import wrap_with_line_feed as wr
......@@ -33,7 +33,10 @@ class ProxyServer:
session = Session(self.client, self.server)
self.app.add_session(session)
self.watch_win_size_change_async()
replay_recorder = LocalFileReplayRecorder(self.app, session)
if self.app.config["REPLAY_STORE_ENGINE"].lower() == "server":
replay_recorder = ServerReplayRecorder(self.app, session)
else:
replay_recorder = LocalFileReplayRecorder(self.app, session)
session.add_recorder(replay_recorder)
session.record_replay_async()
cmd_recorder = LocalFileCommandRecorder(self.app, session)
......@@ -79,8 +82,6 @@ class ProxyServer:
def get_ssh_server_conn(self, asset, system_user):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print(asset.ip, asset.port, system_user.username, system_user.password,
system_user.private_key)
try:
ssh.connect(asset.ip, port=asset.port,
username=system_user.username,
......
......@@ -99,8 +99,7 @@ class HttpServer:
def run(self):
host = self.app.config["BIND_HOST"]
port = self.app.config["HTTPD_PORT"]
print('Starting websocket server at %(host)s:%(port)s' %
{"host": host, "port": port})
print('Starting websocket server at {}:{}'.format(host, port))
ws = tornado.web.Application(self.routers, **self.settings)
ws.listen(port=port, address=host)
tornado.ioloop.IOLoop.current().start()
......
......@@ -2,6 +2,7 @@
#
import abc
import tarfile
import threading
import time
import os
......@@ -53,25 +54,28 @@ class LocalFileReplayRecorder(ReplayRecorder):
def __init__(self, app, session):
super().__init__(app, session)
self.session_dir = ""
self.data_filename = ""
self.time_filename = ""
self.data_f = None
self.time_f = None
self.prepare_file()
def prepare_file(self):
session_dir = os.path.join(
self.session_dir = os.path.join(
self.app.config["SESSION_DIR"],
self.session.date_created.strftime("%Y-%m-%d")
self.session.date_created.strftime("%Y-%m-%d"),
str(self.session.id)
)
if not os.path.isdir(session_dir):
os.mkdir(session_dir)
if not os.path.isdir(self.session_dir):
os.makedirs(self.session_dir)
filename = os.path.join(session_dir, str(self.session.id))
data_filename = filename + ".rec"
time_filename = filename + ".time"
self.data_filename = os.path.join(self.session_dir, "data.txt")
self.time_filename = os.path.join(self.session_dir, "time.txt")
try:
self.data_f = open(data_filename, "wb")
self.time_f = open(time_filename, "w")
self.data_f = open(self.data_filename, "wb")
self.time_f = open(self.time_filename, "w")
except IOError as e:
logger.debug(e)
self.done()
......@@ -104,14 +108,13 @@ class LocalFileCommandRecorder(CommandRecorder):
def prepare_file(self):
session_dir = os.path.join(
self.app.config["SESSION_DIR"],
self.session.date_created.strftime("%Y-%m-%d")
self.session.date_created.strftime("%Y-%m-%d"),
str(self.session.id)
)
if not os.path.isdir(session_dir):
os.mkdir(session_dir)
filename = os.path.join(session_dir, str(self.session.id))
cmd_filename = filename + ".cmd"
os.makedirs(session_dir)
cmd_filename = os.path.join(session_dir, "cmd.txt")
try:
self.cmd_f = open(cmd_filename, "w")
except IOError as e:
......@@ -138,18 +141,28 @@ class ServerReplayRecorder(LocalFileReplayRecorder):
super().done()
self.push_records()
def archive_replay(self):
filename = os.path.join(self.session_dir, "archive.tar.bz2")
logger.debug("Start archive log: {}".format(filename))
tar = tarfile.open(filename, "w:bz2")
os.chdir(self.session_dir)
time_filename = os.path.basename(self.time_filename)
data_filename = os.path.basename(self.data_filename)
for i in (time_filename, data_filename):
tar.add(i)
tar.close()
return filename
def push_replay_record(self, archive):
logger.debug("Start push replay record to server")
return self.app.service.push_session_replay(archive)
def push_records(self):
def func():
self.push_replay_record()
archive = self.archive_replay()
result = self.push_replay_record(archive)
if not result:
logger.error("Push replay error")
thread = threading.Thread(target=func)
thread.start()
def push_replay_record(self):
pass
......@@ -81,7 +81,7 @@ class Session:
Bridge clients with server
:return:
"""
logger.info("Start bridge session %s" % self.id)
logger.info("Start bridge session {}".format(self.id))
self.sel.register(self.client, selectors.EVENT_READ)
self.sel.register(self.server, selectors.EVENT_READ)
while not self.stop_evt.is_set():
......@@ -109,8 +109,10 @@ class Session:
elif sock in self.watchers:
if len(data) == 0:
logger.info("Watcher {} leave session {}".format(sock, self.id))
logger.info("End session {} bride".format(self.id))
def set_size(self, width, height):
logger.debug("Resize server chan size {}*{}".format(width, height))
self.server.resize_pty(width=width, height=height)
def record_replay_async(self):
......@@ -130,14 +132,18 @@ class Session:
break
for recorder in self.recorders:
recorder.record_replay(now, timedelta, size, data)
logger.debug("Stop event set, exit record replay")
for recorder in self.recorders:
recorder.done()
thread = threading.Thread(target=func)
thread.start()
def close(self):
logger.debug("Session {} closing".format(self.id))
self.stop_evt.set()
self.date_finished = datetime.datetime.now()
for c in self.watchers + self.sharers:
c.close()
self.server.close()
def to_json(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment