Commit e9a6daa7 authored by ibuler's avatar ibuler

[Update] 修改日志记录选项

parent eb810d14
......@@ -9,7 +9,6 @@ from .config import Config
from .sshd import SSHServer
from .httpd import HttpServer
from .logging import create_logger
from . import utils
__version__ = '0.4.0'
......@@ -34,6 +33,7 @@ class Coco:
'LOG_LEVEL': 'INFO',
'LOG_DIR': os.path.join(BASE_DIR, 'logs'),
'SESSION_DIR': os.path.join(BASE_DIR, 'sessions'),
'SESSION_COMMAND_STORE': "server", # elasticsearch
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
'SSH_PASSWORD_AUTH': True,
'SSH_PUBLIC_KEY_AUTH': True,
......@@ -42,32 +42,37 @@ class Coco:
}
def __init__(self, name=None, root_path=None):
self.config = self.config_class(BASE_DIR, defaults=self.default_config)
self.root_path = root_path if root_path else BASE_DIR
self.config = self.config_class(self.root_path, defaults=self.default_config)
self.name = name if name else self.config['NAME']
self.sessions = []
self.clients = []
self.root_path = root_path
self.name = name
self.lock = threading.Lock()
self.stop_evt = threading.Event()
self.service = None
if name is None:
self.name = self.config['NAME']
if root_path is None:
self.root_path = BASE_DIR
@property
def service(self):
return AppService(self)
self.httpd = None
self.sshd = None
self.running = True
@property
def sshd(self):
return SSHServer(self)
@property
def httpd(self):
return HttpServer(self)
def make_logger(self):
create_logger(self)
# Todo: load some config from server like replay and common upload
def load_extra_conf_from_server(self):
pass
def bootstrap(self):
self.make_logger()
self.sshd = SSHServer(self)
self.httpd = HttpServer(self)
self.initial_service()
self.service.initial()
self.load_extra_conf_from_server()
self.heartbeat()
self.monitor_sessions()
......@@ -85,17 +90,20 @@ class Coco:
thread.start()
def monitor_sessions(self):
interval = self.config["HEARTBEAT_INTERVAL"]
def func():
while not self.stop_evt.is_set():
for s in self.sessions:
if s.stop_evt.is_set():
if s.date_finished is None:
self.sessions.remove(s)
continue
delta = datetime.datetime.now() - s.date_finished
if delta > datetime.timedelta(minutes=1):
self.sessions.remove(s)
time.sleep(self.config["HEARTBEAT_INTERVAL"])
if not s.is_finished:
continue
if s.date_finished is None:
self.remove_session(s)
continue
delta = datetime.datetime.now() - s.date_finished
if delta > datetime.timedelta(seconds=interval*5):
self.remove_session(s)
time.sleep(interval)
thread = threading.Thread(target=func)
thread.start()
......@@ -106,7 +114,7 @@ class Coco:
def run_forever(self):
self.bootstrap()
print(time.ctime())
print('Coco version %s, more see https://www.jumpserver.org' % __version__)
print('Coco version {}, more see https://www.jumpserver.org'.format(__version__))
print('Quit the server with CONTROL-C.')
try:
......@@ -114,7 +122,7 @@ class Coco:
self.run_sshd()
if self.config['WS_PORT'] != 0:
self.run_ws()
self.run_httpd()
self.stop_evt.wait()
except KeyboardInterrupt:
......@@ -126,7 +134,7 @@ class Coco:
thread.daemon = True
thread.start()
def run_ws(self):
def run_httpd(self):
thread = threading.Thread(target=self.httpd.run, args=())
thread.daemon = True
thread.start()
......@@ -136,6 +144,7 @@ class Coco:
self.remove_client(client)
time.sleep(1)
self.sshd.shutdown()
self.httpd.shutdown()
logger.info("Grace shutdown the server")
def add_client(self, client):
......@@ -148,12 +157,17 @@ class Coco:
try:
self.clients.remove(client)
logger.info("Client %s leave, total %d now" % (client, len(self.clients)))
client.send("Closed by server")
client.close()
except:
pass
def initial_service(self):
self.service = AppService(self)
self.service.initial()
def add_session(self, session):
with self.lock:
self.sessions.append(session)
def remove_session(self, session):
with self.lock:
self.sessions.remove(session)
......@@ -3,9 +3,9 @@
import socket
import threading
import logging
import time
import paramiko
import time
from .session import Session
from .models import Server
......@@ -29,10 +29,10 @@ class ProxyServer:
self.server = self.get_server_conn(asset, system_user)
if self.server is None:
return
session = Session(self.client, self.server, self.app.config["SESSION_DIR"])
self.app.sessions.append(session)
session = Session(self.client, self.server)
self.app.add_session(session)
self.watch_win_size_change_async()
session.record_async()
session.add_recorder()
session.bridge()
session.stop_evt.set()
......@@ -59,11 +59,22 @@ class ProxyServer:
if not self.validate_permission(asset, system_user):
self.client.send(_('No permission'))
return None
self.get_system_user_auth(system_user)
if True:
server = self.get_ssh_server_conn(asset, system_user)
else:
server = self.get_ssh_server_conn(asset, system_user)
return server
# Todo: Support telnet
def get_telnet_server_conn(self, asset, system_user):
pass
def get_ssh_server_conn(self, asset, system_user):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print(asset.ip, asset.port, system_user.username, system_user.password, system_user.private_key)
print(asset.ip, asset.port, system_user.username, system_user.password,
system_user.private_key)
try:
ssh.connect(asset.ip, port=asset.port,
username=system_user.username,
......@@ -110,3 +121,4 @@ class ProxyServer:
thread = threading.Thread(target=func)
thread.start()
......@@ -104,3 +104,6 @@ class HttpServer:
ws = tornado.web.Application(self.routers, **self.settings)
ws.listen(port=port, address=host)
tornado.ioloop.IOLoop.current().start()
def shutdown(self):
pass
......@@ -4,7 +4,7 @@ import socket
import threading
# Todo remove
from jms.models import Asset, SystemUser, AssetGroup
from jms.models import Asset, AssetGroup
from . import char
from .utils import wrap_with_line_feed as wr, wrap_with_title as title, \
......@@ -48,7 +48,7 @@ class InteractiveServer:
)
self.client.send(banner)
def get_choice(self, prompt='Opt> '):
def get_option(self, prompt='Opt> '):
"""实现了一个ssh input, 提示用户输入, 获取并返回
:return user input string
......@@ -218,7 +218,7 @@ class InteractiveServer:
while True:
self.client.send(wr(_("Choose one to login: "), after=1))
self.display_system_users(system_users)
opt = self.get_choice("ID> ")
opt = self.get_option("ID> ")
if opt.isdigit() and len(system_users) > int(opt):
return system_users[int(opt)]
elif opt in ['q', 'Q']:
......@@ -247,15 +247,13 @@ class InteractiveServer:
forwarder.proxy(asset, system_user)
def replay_session(self, session_id):
session = Session(self.client, None)
session.id = "5a5dbfbe-093f-4bc1-810f-e8401b9e6045"
session.replay()
pass
def activate(self):
def interact(self):
self.display_banner()
while True:
try:
opt = self.get_choice()
opt = self.get_option()
self.dispatch(opt)
except socket.error as e:
logger.error("Socket error %s" % e)
......@@ -263,7 +261,7 @@ class InteractiveServer:
self.close()
def activate_async(self):
thread = threading.Thread(target=self.activate)
thread = threading.Thread(target=self.interact)
thread.daemon = True
thread.start()
......
import json
import queue
import threading
import datetime
......@@ -57,9 +58,8 @@ class Server:
Server object like client, a wrapper object, a connection to the asset,
Because we don't want to using python dynamic feature, such asset
have the chan and system_user attr.
"""
# Todo: Server name is not very proper
# Todo: Server name is not very suitable
def __init__(self, chan, asset, system_user):
self.chan = chan
self.asset = asset
......@@ -69,12 +69,27 @@ class Server:
self.input_data = []
self.output_data = []
self.input = ''
self.output = ''
self._in_input_state = True
self._input_initial = False
self._in_vim_state = False
self.recorders = []
self.queue = queue.Queue()
def add_recorder(self, recorder):
self.recorders.append(recorder)
def remove_recorder(self, recorder):
self.recorders.remove(recorder)
def record_command(self, _input, _output):
while True:
_input, _output = self.queue.get()
for recorder in self.recorders:
t = threading.Thread(target=recorder.record_command,
args=(_input, _output))
t.start()
def fileno(self):
return self.chan.fileno()
......@@ -89,24 +104,21 @@ class Server:
else:
if not self._in_input_state:
print("#" * 30 + " 新周期 " + "#" * 30)
self._parse_input()
self._parse_output()
_input = self._parse_input()
_output = self._parse_output()
self.record_command(_input, _output)
del self.input_data[:]
del self.output_data[:]
self._in_input_state = True
print("Send: %s" % b)
return self.chan.send(b)
def recv(self, size):
data = self.chan.recv(size)
print("Recv: %s" % data)
if self._input_initial:
if self._in_input_state:
self.input_data.append(data)
else:
self.output_data.append(data)
return data
def close(self):
......@@ -122,18 +134,19 @@ class Server:
def _parse_output(self):
parser = utils.TtyIOParser()
print("\tOutput: %s" % parser.parse_output(self.output_data))
return parser.parse_output(self.output_data)
def _parse_input(self):
parser = utils.TtyIOParser()
print("\tInput: %s" % parser.parse_input(self.input_data))
return parser.parse_input(self.input_data)
def __getattr__(self, item):
return getattr(self.chan, item)
def __str__(self):
return "<%s@%s:%s>" % (self.system_user.username,
self.asset.hostname, self.asset.port)
self.asset.hostname,
self.asset.port)
class WSProxy:
......
......@@ -2,8 +2,60 @@
#
import abc
import datetime
import multiprocessing
import threading
import time
import datetime
import socket
import os
import logging
logger = logging.getLogger(__file__)
BUF_SIZE = 1024
class Recorder(metaclass=abc.ABCMeta):
def __init__(self, app, session):
self.app = app
self.session = session
self.replay_queue = multiprocessing.Queue()
self.command_queue = multiprocessing.Queue()
self.stop_evt = multiprocessing.Event()
@abc.abstractmethod
def record_replay(self):
pass
@abc.abstractmethod
def record_command(self, _input, _output):
pass
class FileRecorder(Recorder):
def record_replay(self):
parent, child = socket.socketpair()
self.session.add_watcher(parent)
session_dir = self.app.config["SESSION_DIR"]
with open(os.path.join(session_dir, session.id + ".rec"), 'wb') as dataf, \
open(os.path.join(session_dir, session.id + ".time"), "w") as timef:
dataf.write("Script started on {}\n".format(time.asctime()).encode("utf-8"))
while not self.stop_evt.is_set():
start_t = time.time()
data = child.recv(BUF_SIZE)
end_t = time.time()
size = len(data)
if size == 0:
break
timef.write("%.4f %s\n" % (end_t - start_t, size))
dataf.write(data)
dataf.write("Script done on {}\n".format(time.asctime()).encode("utf-8"))
def record_command(self, _input, _output):
pass
class SessionReplay(metaclass=abc.ABCMeta):
......@@ -17,7 +69,7 @@ class SessionReplay(metaclass=abc.ABCMeta):
pass
@abc.abstractmethod
def replay(self, sock):
def done(self):
pass
......@@ -52,6 +104,9 @@ class FileSessionReplay(SessionReplay):
sock.send(data)
sock.send("Replay session end")
def done(self):
pass
class FileSessionCommand(SessionCommand):
......@@ -63,3 +118,6 @@ class FileSessionCommand(SessionCommand):
self.f.write("$ {}\n".format(cmd))
self.f.write("{}\n\n".format(output))
def done(self):
pass
......@@ -16,17 +16,17 @@ logger = logging.getLogger(__file__)
class Session:
def __init__(self, client, server, session_dir):
def __init__(self, client, server):
self.id = str(uuid.uuid4())
self.client = client # Master of the session, it's a client sock
self.server = server # Server channel
self.session_dir = session_dir # Dir to save session record
self.watchers = [] # Only watch session
self.sharers = [] # Join to the session, read and write
self.stop_evt = threading.Event()
self.replaying = True
self.date_created = datetime.datetime.now()
self.date_finished = None
self.recorder = []
self.stop_evt = threading.Event()
self.sel = selectors.DefaultSelector()
def add_watcher(self, watcher, silent=False):
......@@ -65,7 +65,8 @@ class Session:
def remove_sharer(self, sharer):
logger.info("Session %s remove sharer %s" % (self.id, sharer))
sharer.send("Leave session {} at {}"
.format(self.id, datetime.datetime.now()).encode("utf-8"))
.format(self.id, datetime.datetime.now())
.encode("utf-8"))
self.sel.unregister(sharer)
self.sharers.remove(sharer)
......@@ -91,78 +92,39 @@ class Session:
elif sock == self.client:
if len(data) == 0:
for watcher in self.watchers + self.sharers:
watcher.send("Client {} close the session".format(self.client).encode("utf-8"))
watcher.send("Client {} close the session"
.format(self.client)
.encode("utf-8"))
self.close()
break
self.server.send(data)
elif sock in self.sharers:
if len(data) == 0:
logger.info("Sharer %s leave session %s" % (sock, self.id))
logger.info("Sharer {} leave session {}"
.format(sock, self.id))
self.remove_sharer(sock)
self.server.send(data)
elif sock in self.watchers:
if len(data) == 0:
logger.info("Watcher %s leave session %s" % (sock, self.id))
logger.info("Watcher {} leave session {}"
.format(sock, self.id))
def set_size(self, width, height):
self.server.resize_pty(width=width, height=height)
def add_recorder(self, recorder):
self.recorder.append(recorder)
def remove_recorder(self, recorder):
self.recorder.remove(recorder)
def record(self):
"""
Record the session to a file. Using it replay in the future
:return:
"""
logger.info("Start record session %s" % self.id)
parent, child = socket.socketpair()
self.add_watcher(parent)
record_dir = os.path.join(self.session_dir, self.date_created.strftime("%Y-%m-%d"))
if not os.path.isdir(record_dir):
os.mkdir(record_dir)
with open(os.path.join(record_dir, self.id + ".rec"), 'wb') as dataf, \
open(os.path.join(record_dir, self.id + ".time"), "w") as timef:
dataf.write("Script started on {}\n".format(time.asctime()).encode("utf-8"))
while not self.stop_evt.is_set():
start_t = time.time()
data = child.recv(BUF_SIZE)
end_t = time.time()
size = len(data)
if size == 0:
break
timef.write("%.4f %s\n" % (end_t-start_t, size))
dataf.write(data)
dataf.write("Script done on {}\n".format(time.asctime()).encode("utf-8"))
logger.info("End session record %s" % self.id)
def record_async(self):
thread = threading.Thread(target=self.record)
thread.start()
def replay(self):
"""
Replay the session
:return:
"""
with open(os.path.join(self.session_dir, self.id + ".rec"), 'rb') as dataf, \
open(os.path.join(self.session_dir, self.id + ".time"), "r") as timef:
self.client.send(dataf.readline())
for l in timef:
if not self.replaying:
break
t, size = float(l.split()[0]), int(l.split()[1])
data = dataf.read(size)
time.sleep(t)
self.client.send(data)
self.client.send("Replay session {} end".format(self.id).encode('utf-8'))
self.replaying = False
def replay_download(self):
"""
Using termrecord generate html, then down user download it and share it
Record the session
:return:
"""
pass
for recorder in self.recorder:
recorder.record(self)
def close(self):
self.stop_evt.set()
......
......@@ -17,9 +17,9 @@ BACKLOG = 5
class SSHServer:
def __init__(self, app=None):
def __init__(self, app):
self.app = app
self.stop_event = threading.Event()
self.stop_evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host_key_path = os.path.join(self.app.root_path, 'keys', 'host_rsa_key')
......@@ -37,12 +37,11 @@ class SSHServer:
def run(self):
host = self.app.config["BIND_HOST"]
port = self.app.config["SSHD_PORT"]
print('Starting ssh server at %(host)s:%(port)s' %
{"host": host, "port": port})
print('Starting ssh server at {}:{}'.format(host, port))
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((host, port))
self.sock.listen(BACKLOG)
while not self.stop_event.is_set():
while not self.stop_evt.is_set():
try:
sock, addr = self.sock.accept()
logger.info("Get ssh request from %s: %s" % (addr[0], addr[1]))
......@@ -65,7 +64,7 @@ class SSHServer:
try:
transport.start_server(server=server)
except paramiko.SSHException:
logger.warning("SSH negotiation failed.")
logger.warning("SSH negotiation failed")
sys.exit(1)
except EOFError:
logger.warning("EOF Error")
......@@ -88,7 +87,7 @@ class SSHServer:
def dispatch(self, client):
request_type = client.request.type
if request_type == 'pty':
InteractiveServer(self.app, client).activate()
InteractiveServer(self.app, client).interact()
elif request_type == 'exec':
pass
elif request_type == 'subsystem':
......@@ -97,4 +96,4 @@ class SSHServer:
client.send("Not support request type: %s" % request_type)
def shutdown(self):
self.stop_event.set()
self.stop_evt.set()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment