Commit a0a71d8d authored by ibuler's avatar ibuler

[Log] 修改一些log显示,推送command log到server

parent 29b066ac
......@@ -40,6 +40,7 @@ class Coco:
'SSH_PUBLIC_KEY_AUTH': True,
'HEARTBEAT_INTERVAL': 5,
'MAX_CONNECTIONS': 500,
'ADMINS': '',
# 'MAX_RECORD_OUTPUT_LENGTH': 4096,
}
......@@ -100,7 +101,6 @@ class Coco:
time.sleep(self.config["HEARTBEAT_INTERVAL"])
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
def monitor_sessions(self):
......@@ -164,15 +164,15 @@ class Coco:
def add_client(self, client):
with self.lock:
self.clients.append(client)
logger.info("New client %s join, total %d now" % (client, len(self.clients)))
logger.info("New client {} join, total {} now".format(client, len(self.clients)))
def remove_client(self, client):
with self.lock:
try:
self.clients.remove(client)
logger.info("Client %s leave, total %d now" % (client, len(self.clients)))
client.send("Closed by server")
logger.info("Client {} leave, total {} now".format(client, len(self.clients)))
client.close()
del client
except:
pass
......@@ -183,7 +183,11 @@ class Coco:
def remove_session(self, session):
with self.lock:
logger.info("Remove session: {}".format(session))
self.sessions.remove(session)
del session.server
del session.client
del session
self.heartbeat()
......@@ -8,8 +8,9 @@ import paramiko
from .session import Session
from .models import Server
from .record import LocalFileReplayRecorder, LocalFileCommandRecorder, ServerReplayRecorder
from .utils import wrap_with_line_feed as wr
from .record import LocalFileReplayRecorder, LocalFileCommandRecorder, \
ServerReplayRecorder, ServerCommandRecorder
from .utils import wrap_with_line_feed as wr, wrap_with_warning as warning
logger = logging.getLogger(__file__)
......@@ -24,31 +25,42 @@ class ProxyServer:
self.request = client.request
self.server = None
self.connecting = True
self.session = None
def proxy(self, asset, system_user):
self.send_connecting_message(asset, system_user)
self.server = self.get_server_conn(asset, system_user)
if self.server is None:
return
session = Session(self.client, self.server)
self.app.add_session(session)
self.session = Session(self.client, self.server)
self.app.add_session(self.session)
self.watch_win_size_change_async()
self.add_recorder()
self.session.bridge()
def add_recorder(self):
"""
上传记录,如果配置的是server,就上传到服务器端,实例化对应的recorder,
将来有计划直接上传到 es和oss
:return:
"""
if self.app.config["REPLAY_STORE_ENGINE"].lower() == "server":
replay_recorder = ServerReplayRecorder(self.app, session)
replay_recorder = ServerReplayRecorder(self.app, self.session)
else:
replay_recorder = LocalFileReplayRecorder(self.app, session)
session.add_recorder(replay_recorder)
session.record_replay_async()
cmd_recorder = LocalFileCommandRecorder(self.app, session)
self.server.add_recorder(cmd_recorder)
replay_recorder = LocalFileReplayRecorder(self.app, self.session)
if self.app.config["COMMAND_STORE_ENGINE"].lower() == "server":
command_recorder = ServerCommandRecorder(self.app, self.session)
else:
command_recorder = LocalFileCommandRecorder(self.app, self.session)
self.session.add_recorder(replay_recorder)
self.session.record_replay_async()
self.server.add_recorder(command_recorder)
self.server.record_command_async()
session.bridge()
session.stop_evt.set()
def validate_permission(self, asset, system_user):
"""
Validate use is have the permission to connect this asset using that
system user
验证用户是否有连接改资产的权限
:return: True or False
"""
return self.app.service.validate_user_asset_permission(
......@@ -57,7 +69,7 @@ class ProxyServer:
def get_system_user_auth(self, system_user):
"""
Get the system user auth ..., using this to connect asset
获取系统用户的认证信息,密码或秘钥
:return: system user have full info
"""
system_user.password, system_user.private_key = \
......@@ -66,7 +78,7 @@ class ProxyServer:
def get_server_conn(self, asset, system_user):
logger.info("Connect to {}".format(asset.hostname))
if not self.validate_permission(asset, system_user):
self.client.send(_('No permission'))
self.client.send(warning(_('No permission')))
return None
self.get_system_user_auth(system_user)
if True:
......@@ -83,13 +95,23 @@ class ProxyServer:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(asset.ip, port=asset.port,
username=system_user.username,
password=system_user.password,
pkey=system_user.private_key,
timeout=TIMEOUT)
except paramiko.AuthenticationException as e:
self.client.send(wr("[Errno 66] {}".format(e)))
ssh.connect(
asset.ip, port=asset.port, username=system_user.username,
password=system_user.password, pkey=system_user.private_key,
timeout=TIMEOUT, compress=True, auth_timeout=10,
look_for_keys=False
)
except paramiko.AuthenticationException:
admins = self.app.config['ADMINS'] or 'administrator'
self.client.send(warning(wr(
"Authenticate with server failed, contact {}".format(admins),
before=1, after=0
)))
key_fingerprint = system_user.private_key.get_hex() if system_user.private_key else None
logger.error("Connect {}@{}:{} auth failed, password: {}, key: {}".format(
system_user.username, asset.ip, asset.port,
system_user.password, key_fingerprint,
))
return None
except socket.error as e:
self.client.send(wr(" {}".format(e)))
......
......@@ -17,6 +17,7 @@ logger = logging.getLogger(__file__)
class InteractiveServer:
_sentinel = object()
def __init__(self, app, client):
self.app = app
......@@ -103,7 +104,7 @@ class InteractiveServer:
def dispatch(self, opt):
if opt is None:
return
return self._sentinel
elif opt.startswith("/"):
self.search_and_display(opt.lstrip("/"))
elif opt in ['p', 'P', '3']:
......@@ -113,7 +114,7 @@ class InteractiveServer:
elif opt.startswith("g") and opt.lstrip("g").isdigit():
self.display_group_assets(int(opt.lstrip("g")))
elif opt in ['q', 'Q', '0']:
self.app.remove_client(self.client)
return self._sentinel
elif opt in ['h', 'H', '9']:
self.display_banner()
else:
......@@ -122,7 +123,6 @@ class InteractiveServer:
def search_assets(self, q):
if self.assets is None:
self.get_user_assets()
result = []
# 所有的
......@@ -178,11 +178,6 @@ class InteractiveServer:
self.display_search_result()
def display_search_result(self):
# if len(self.search_result) == 0:
# self.client.send(warning("Nothing match"))
# return
print("Total assets: ".format(len(self.assets)))
self.search_result = sort_assets(self.search_result, self.app.config["ASSET_LIST_SORT_BY"])
fake_asset = Asset(hostname=_("Hostname"), ip=_("IP"), system_users_join=_("LoginAs"), comment=_("Comment"))
id_max_length = max(len(str(len(self.search_result))), 3)
......@@ -264,9 +259,10 @@ class InteractiveServer:
while True:
try:
opt = self.get_option()
self.dispatch(opt)
except socket.error as e:
logger.error("Socket error %s" % e)
rv = self.dispatch(opt)
if rv is self._sentinel:
break
except socket.error:
break
self.close()
......@@ -276,4 +272,5 @@ class InteractiveServer:
thread.start()
def close(self):
pass
self.app.remove_client(self.client)
logger.info("Exit interactive server")
......@@ -20,6 +20,7 @@ class SSHInterface(paramiko.ServerInterface):
self.app = app
self.request = request
self.event = threading.Event()
self.auth_valid = False
def check_auth_interactive(self, username, submethods):
logger.info("Check auth interactive: %s %s" % (username, submethods))
......@@ -46,11 +47,23 @@ class SSHInterface(paramiko.ServerInterface):
return paramiko.AUTH_FAILED
def check_auth_password(self, username, password):
return self.validate_auth(username, password=password)
valid = self.validate_auth(username, password=password)
if not valid:
logger.warning("Password and public key auth <%s> failed, reject it" % username)
return paramiko.AUTH_FAILED
else:
logger.info("Password auth <%s> success" % username)
return paramiko.AUTH_SUCCESSFUL
def check_auth_publickey(self, username, key):
key = key.get_base64()
return self.validate_auth(username, key=key)
valid = self.validate_auth(username, key=key)
if not valid:
logger.debug("Public key auth <%s> failed, try to password" % username)
return paramiko.AUTH_FAILED
else:
logger.debug("Public key auth <%s> success" % username)
return paramiko.AUTH_SUCCESSFUL
def validate_auth(self, username, password="", key=""):
user, _ = self.app.service.authenticate(
......@@ -60,13 +73,13 @@ class SSHInterface(paramiko.ServerInterface):
if user:
self.request.user = user
return paramiko.AUTH_SUCCESSFUL
return True
else:
return paramiko.AUTH_FAILED
return False
def check_channel_direct_tcpip_request(self, chanid, origin, destination):
logger.debug("Check channel direct tcpip request: %d %s %s" %
(chanid, origin, destination))
(chanid, origin, destination))
self.request.type = 'direct-tcpip'
self.request.meta = {
'chanid': chanid, 'origin': origin,
......
......@@ -28,13 +28,16 @@ def create_logger(app):
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(module)s %(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S')
# datefmt='%Y-%m-%d %H:%M:%S'
# )
main_formatter = logging.Formatter(
fmt='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = StreamHandler()
file_handler = TimedRotatingFileHandler(
filename=log_path, when='D', backupCount=10)
filename=log_path, when='D', backupCount=10
)
for handler in [console_handler, file_handler]:
handler.setFormatter(main_formatter)
......
......@@ -48,12 +48,19 @@ class Client:
def recv(self, size):
return self.chan.recv(size)
def close(self):
logger.info("Client {} close".format(self))
return self.chan.close()
def __getattr__(self, item):
return getattr(self.chan, item)
def __str__(self):
return "<%s from %s:%s>" % (self.user, self.addr[0], self.addr[1])
def __del__(self):
logger.info("GC client object: {}".format(self))
class Server:
"""
......@@ -96,10 +103,18 @@ class Server:
def record_command_async(self):
def func():
logger.info("Start server command record thread: {}".format(self))
for recorder in self.recorders:
recorder.start()
while not self.stop_evt.is_set():
_input, _output = self.command_queue.get()
if _input is None:
break
for recorder in self.recorders:
recorder.record_command(datetime.datetime.now(), _input, _output)
logger.info("Exit server command record thread: {}".format(self))
for recorder in self.recorders:
recorder.done()
thread = threading.Thread(target=func)
thread.start()
......@@ -126,8 +141,8 @@ class Server:
self.command_queue.put((self._input, self._output))
del self.input_data[:]
del self.output_data[:]
self._input = ""
self._output = ""
# self._input = ""
# self._output = ""
self._in_input_state = True
return self.chan.send(b)
......@@ -141,8 +156,11 @@ class Server:
return data
def close(self):
logger.info("Closed server {}".format(self))
self.chan.close()
return self.chan.transport.close()
self.stop_evt.set()
self.chan.transport.close()
self.command_queue.put((None, None))
@staticmethod
def _have_enter_char(s):
......@@ -163,9 +181,10 @@ class Server:
return getattr(self.chan, item)
def __str__(self):
return "<%s@%s:%s>" % (self.system_user.username,
self.asset.hostname,
self.asset.port)
return "<To: {}>".format(str(self.asset))
def __del__(self):
logger.info("GC server object: {}".format(self))
class WSProxy:
......
......@@ -7,6 +7,7 @@ import threading
import time
import os
import logging
import base64
logger = logging.getLogger(__file__)
......@@ -86,12 +87,12 @@ class LocalFileReplayRecorder(ReplayRecorder):
self.data_f.write(data)
def start(self):
logger.debug("Session {} start".format(self.session.id))
self.data_f.write("Session {} started on {}\n".format(self.session.id, time.asctime()).encode("utf-8"))
logger.info("Session record start: {}".format(self.session))
self.data_f.write("Session {} started on {}\n".format(self.session, time.asctime()).encode("utf-8"))
def done(self):
logger.debug("Session {} record done".format(self.session.id))
self.data_f.write("Session {} done on {}\n".format(self.session.id, time.asctime()).encode("utf-8"))
logger.debug("Session record done: {}".format(self.session))
self.data_f.write("Session {} done on {}\n".format(self.session, time.asctime()).encode("utf-8"))
for f in (self.data_f, self.time_f):
try:
f.close()
......@@ -102,21 +103,23 @@ class LocalFileReplayRecorder(ReplayRecorder):
class LocalFileCommandRecorder(CommandRecorder):
def __init__(self, app, session):
super().__init__(app, session)
self.cmd_filename = ""
self.cmd_f = None
self.session_dir = ""
self.prepare_file()
def prepare_file(self):
session_dir = os.path.join(
self.session_dir = os.path.join(
self.app.config["SESSION_DIR"],
self.session.date_created.strftime("%Y-%m-%d"),
str(self.session.id)
)
if not os.path.isdir(session_dir):
os.makedirs(session_dir)
if not os.path.isdir(self.session_dir):
os.makedirs(self.session_dir)
cmd_filename = os.path.join(session_dir, "cmd.txt")
self.cmd_filename = os.path.join(self.session_dir, "command.txt")
try:
self.cmd_f = open(cmd_filename, "w")
self.cmd_f = open(self.cmd_filename, "wb")
except IOError as e:
logger.debug(e)
self.done()
......@@ -132,17 +135,20 @@ class LocalFileCommandRecorder(CommandRecorder):
pass
def done(self):
pass
try:
self.cmd_f.close()
except:
pass
class ServerReplayRecorder(LocalFileReplayRecorder):
def done(self):
super().done()
self.push_records()
self.push_record()
def archive_replay(self):
filename = os.path.join(self.session_dir, "archive.tar.bz2")
def archive_record(self):
filename = os.path.join(self.session_dir, "replay.tar.bz2")
logger.debug("Start archive log: {}".format(filename))
tar = tarfile.open(filename, "w:bz2")
os.chdir(self.session_dir)
......@@ -153,16 +159,72 @@ class ServerReplayRecorder(LocalFileReplayRecorder):
tar.close()
return filename
def push_replay_record(self, archive):
def push_archive_record(self, archive):
logger.debug("Start push replay record to server")
return self.app.service.push_session_replay(archive, str(self.session.id))
def push_records(self):
def push_record(self):
logger.info("Start push replay record to server")
def func():
archive = self.archive_record()
for i in range(1, 5):
result = self.push_archive_record(archive)
if not result:
logger.error("Push replay error, try again")
time.sleep(5)
continue
else:
break
thread = threading.Thread(target=func)
thread.start()
class ServerCommandRecorder(LocalFileCommandRecorder):
def record_command(self, now, _input, _output):
logger.debug("File recorder command: ({},{})".format(_input, _output))
self.cmd_f.write("{} {} {}\n".format(
int(now.timestamp()),
base64.b64encode(_input.encode("utf-8")).decode('utf-8'),
base64.b64encode(_output.encode("utf-8")).decode('utf-8'),
).encode('utf-8'))
def start(self):
pass
def done(self):
super().done()
self.push_record()
def archive_record(self):
filename = os.path.join(self.session_dir, "command.tar.bz2")
logger.debug("Start archive command record: {}".format(filename))
tar = tarfile.open(filename, "w:bz2")
os.chdir(self.session_dir)
cmd_filename = os.path.basename(self.cmd_filename)
tar.add(cmd_filename)
tar.close()
return filename
def push_archive_record(self, archive):
logger.debug("Start push command record to server")
return self.app.service.push_session_replay(archive, str(self.session.id))
def push_record(self):
logger.info("Start push command record to server")
def func():
archive = self.archive_replay()
result = self.push_replay_record(archive)
if not result:
logger.error("Push replay error")
archive = self.archive_record()
for i in range(1, 5):
result = self.push_archive_record(archive)
if not result:
logger.error("Push command record error, try again")
time.sleep(5)
continue
else:
break
thread = threading.Thread(target=func)
thread.start()
......@@ -37,14 +37,14 @@ class Session:
:param silent: If true not send welcome message
:return:
"""
logger.info("Session %s add watcher %s" % (self, watcher))
logger.info("Session add watcher: {} -> {} ".format(self.id, watcher))
if not silent:
watcher.send("Welcome to watch session {}\r\n".format(self.id).encode("utf-8"))
self.sel.register(watcher, selectors.EVENT_READ)
self.watchers.append(watcher)
def remove_watcher(self, watcher):
logger.info("Session %s remove watcher %s" % (self, watcher))
logger.info("Session %s remove watcher %s" % (self.id, watcher))
self.sel.unregister(watcher)
self.watchers.remove(watcher)
......@@ -57,7 +57,7 @@ class Session:
"""
logger.info("Session %s add share %s" % (self.id, sharer))
if not silent:
sharer.send("Welcome to join session {}\r\n"
sharer.send("Welcome to join session: {}\r\n"
.format(self.id).encode("utf-8"))
self.sel.register(sharer, selectors.EVENT_READ)
self.sharers.append(sharer)
......@@ -81,7 +81,7 @@ class Session:
Bridge clients with server
:return:
"""
logger.info("Start bridge session {}".format(self.id))
logger.info("Start bridge session: {}".format(self.id))
self.sel.register(self.client, selectors.EVENT_READ)
self.sel.register(self.server, selectors.EVENT_READ)
while not self.stop_evt.is_set():
......@@ -90,26 +90,33 @@ class Session:
data = sock.recv(BUF_SIZE)
if sock == self.server:
if len(data) == 0:
msg = "Server close the connection: {}".format(self.server)
logger.info(msg)
for watcher in [self.client] + self.watchers + self.sharers:
watcher.send(msg.encode('utf-8'))
self.close()
break
for watcher in [self.client] + self.watchers + self.sharers:
watcher.send(data)
elif sock == self.client:
if len(data) == 0:
msg = "Client close the connection: {}".format(self.client)
logger.info(msg)
for watcher in self.watchers + self.sharers:
watcher.send("Client {} close the session".format(self.client).encode("utf-8"))
watcher.send(msg.encode("utf-8"))
self.close()
break
self.server.send(data)
elif sock in self.sharers:
if len(data) == 0:
logger.info("Sharer {} leave session {}".format(sock, self.id))
logger.info("Sharer {} leave the session {}".format(sock, self.id))
self.remove_sharer(sock)
self.server.send(data)
elif sock in self.watchers:
if len(data) == 0:
logger.info("Watcher {} leave session {}".format(sock, self.id))
logger.info("End session {} bride".format(self.id))
self.watchers.remove(sock)
logger.info("Watcher {} leave the session {}".format(sock, self.id))
logger.info("Session stop event set: {}".format(self.id))
def set_size(self, width, height):
logger.debug("Resize server chan size {}*{}".format(width, height))
......@@ -119,6 +126,7 @@ class Session:
def func():
parent, child = socket.socketpair()
self.add_watcher(parent)
logger.info("Start record replay thread: {}".format(self.id))
for recorder in self.recorders:
recorder.start()
while not self.stop_evt.is_set():
......@@ -132,19 +140,19 @@ class Session:
break
for recorder in self.recorders:
recorder.record_replay(now, timedelta, size, data)
logger.debug("Stop event set, exit record replay")
logger.info("Exit record replay thread: {}".format(self.id))
for recorder in self.recorders:
recorder.done()
thread = threading.Thread(target=func)
thread.start()
def close(self):
logger.debug("Session {} closing".format(self.id))
logger.info("Close the session: {} ".format(self.id))
self.stop_evt.set()
self.date_finished = datetime.datetime.now()
self.server.close()
for c in self.watchers + self.sharers:
c.close()
self.server.close()
def to_json(self):
return {
......@@ -164,6 +172,9 @@ class Session:
def __repr__(self):
return self.id
def __del__(self):
logger.info("Session {} object has been GC")
......
......@@ -17,6 +17,7 @@ BACKLOG = 5
class SSHServer:
def __init__(self, app):
self.app = app
self.stop_evt = threading.Event()
......@@ -44,12 +45,12 @@ class SSHServer:
while not self.stop_evt.is_set():
try:
sock, addr = self.sock.accept()
logger.info("Get ssh request from %s: %s" % (addr[0], addr[1]))
logger.info("Get ssh request from {}: {}".format(addr[0], addr[1]))
thread = threading.Thread(target=self.handle, args=(sock, addr))
thread.daemon = True
thread.start()
except Exception as e:
logger.error("SSH server error: %s" % e)
logger.error("Start SSH server error: {}".format(e))
def handle(self, sock, addr):
transport = paramiko.Transport(sock, gss_kex=False)
......@@ -65,20 +66,20 @@ class SSHServer:
transport.start_server(server=server)
except paramiko.SSHException:
logger.warning("SSH negotiation failed")
sys.exit(1)
return
except EOFError:
logger.warning("EOF Error")
sys.exit(1)
logger.warning("Handle EOF Error")
return
chan = transport.accept(10)
if chan is None:
logger.warning("No ssh channel get")
sys.exit(1)
return
server.event.wait(5)
if not server.event.is_set():
logger.warning("Client not request a valid request")
sys.exit(2)
logger.warning("Client not request a valid request, exiting")
return
client = Client(chan, request)
self.app.add_client(client)
......@@ -87,6 +88,7 @@ class SSHServer:
def dispatch(self, client):
request_type = client.request.type
if request_type == 'pty':
logger.info("Request type `pty`, dispatch to interactive mode")
InteractiveServer(self.app, client).interact()
elif request_type == 'exec':
pass
......
......@@ -220,7 +220,7 @@ def wrap_with_line_feed(s, before=0, after=1):
def wrap_with_color(text, color='white', background=None,
bolder=False, underline=False):
bolder_ = '1'
underline_ = '4'
_underline = '4'
color_map = {
'black': '30',
'red': '31',
......@@ -246,7 +246,7 @@ def wrap_with_color(text, color='white', background=None,
if bolder:
wrap_with.append(bolder_)
if underline:
wrap_with.append(underline_)
wrap_with.append(_underline)
if background:
wrap_with.append(background_map.get(background, ''))
wrap_with.append(color_map.get(color, ''))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment