Commit 808da201 authored by ibuler's avatar ibuler

Merge branch 'es' into dev

parents c1288ecd 0fb42750
......@@ -42,13 +42,13 @@ class Coco:
'LOG_DIR': os.path.join(BASE_DIR, 'logs'),
'SESSION_DIR': os.path.join(BASE_DIR, 'sessions'),
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
'SSH_PASSWORD_AUTH': True,
'SSH_PUBLIC_KEY_AUTH': True,
'PASSWORD_AUTH': True,
'PUBLIC_KEY_AUTH': True,
'HEARTBEAT_INTERVAL': 5,
'MAX_CONNECTIONS': 500,
'ADMINS': '',
'REPLAY_RECORD_ENGINE': 'server', # local, server
'COMMAND_RECORD_ENGINE': 'server', # local, server, elasticsearch(not yet)
'COMMAND_STORAGE': {'TYPE': 'server'}, # server
'REPLAY_RECORD_ENGINE': 'server',
}
def __init__(self, name=None, root_path=None):
......@@ -93,16 +93,17 @@ class Coco:
def make_logger(self):
create_logger(self)
# Todo: load some config from server like replay and common upload
def load_extra_conf_from_server(self):
pass
configs = self.service.load_config_from_server()
self.config.update(configs)
def initial_recorder(self):
self.replay_recorder_class = get_replay_recorder_class(self)
self.command_recorder_class = get_command_recorder_class(self)
def get_recorder_class(self):
self.replay_recorder_class = get_replay_recorder_class(self.config)
self.command_recorder_class = get_command_recorder_class(self.config)
def new_command_recorder(self):
return self.command_recorder_class(self)
recorder = self.command_recorder_class(self)
return recorder
def new_replay_recorder(self):
return self.replay_recorder_class(self)
......@@ -111,7 +112,7 @@ class Coco:
self.make_logger()
self.service.initial()
self.load_extra_conf_from_server()
self.initial_recorder()
self.get_recorder_class()
self.keep_heartbeat()
self.monitor_sessions()
......
......@@ -272,7 +272,7 @@ class InteractiveServer:
def search_and_proxy(self, opt):
self.search_assets(opt)
if len(self.search_result) == 1:
if self.search_result and len(self.search_result) == 1:
self.proxy(self.search_result[0])
else:
self.display_search_result()
......
......@@ -43,9 +43,9 @@ class SSHInterface(paramiko.ServerInterface):
def get_allowed_auths(self, username):
supported = []
if self.app.config["SSH_PASSWORD_AUTH"]:
if self.app.config["PASSWORD_AUTH"]:
supported.append("password")
if self.app.config["SSH_PUBLIC_KEY_AUTH"]:
if self.app.config["PUBLIC_KEY_AUTH"]:
supported.append("publickey")
return ",".join(supported)
......
......@@ -11,6 +11,8 @@ import gzip
import json
import shutil
from jms_es_sdk import ESStore
from .alignment import MemoryQueue
logger = logging.getLogger(__file__)
......@@ -183,17 +185,69 @@ class ServerCommandRecorder(CommandRecorder, metaclass=Singleton):
print("{} has been gc".format(self))
def get_command_recorder_class(app):
command_engine = app.config["COMMAND_RECORD_ENGINE"]
class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
batch_size = 10
timeout = 5
no = 0
if command_engine == "server":
return ServerCommandRecorder
def __init__(self, app):
super().__init__(app)
self.queue = MemoryQueue()
self.stop_evt = threading.Event()
self.push_to_es_async()
self.__class__.no += 1
self.store = ESStore(**app.config["COMMAND_RECORD_OPTIONS"])
if not self.store.ping():
raise AssertionError("ESCommand storage init error")
def record(self, data):
if data and data['input']:
data['input'] = data['input'][:128]
data['output'] = data['output'][:1024]
data['timestamp'] = int(data['timestamp'])
self.queue.put(data)
def push_to_es_async(self):
def func():
while not self.stop_evt.is_set():
data_set = self.queue.mget(self.batch_size,
timeout=self.timeout)
logger.debug(
"<Session command recorder {}> queue size: {}".format(
self.no, self.queue.qsize())
)
if not data_set:
continue
logger.debug("Send {} commands to server".format(len(data_set)))
ok = self.store.bulk_save(data_set)
if not ok:
self.queue.mput(data_set)
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
def session_start(self, session_id):
pass
def session_end(self, session_id):
pass
def __del__(self):
print("{} has been gc".format(self))
def get_command_recorder_class(config):
command_storage = config["COMMAND_STORAGE"]
if command_storage['TYPE'] == "elasticsearch":
return ESCommandRecorder
else:
return ServerCommandRecorder
def get_replay_recorder_class(app):
replay_engine = app.config["REPLAY_RECORD_ENGINE"]
def get_replay_recorder_class(config):
replay_engine = config["REPLAY_RECORD_ENGINE"]
if replay_engine == "server":
return ServerReplayRecorder
else:
......
......@@ -97,7 +97,7 @@ class SSHServer:
def dispatch(self, client):
request_type = client.request.type
if request_type == 'pty':
if request_type == 'pty' or request_type == 'x11':
logger.info("Request type `pty`, dispatch to interactive mode")
InteractiveServer(self.app, client).interact()
elif request_type == 'exec':
......
......@@ -49,10 +49,10 @@ class Config:
# ASSET_LIST_SORT_BY = 'ip'
# 登录是否支持密码认证
# SSH_PASSWORD_AUTH = True
# PASSWORD_AUTH = True
# 登录是否支持秘钥认证
# SSH_PUBLIC_KEY_AUTH = True
# PUBLIC_KEY_AUTH = True
# 和Jumpserver 保持心跳时间间隔
# HEARTBEAT_INTERVAL = 5
......
......@@ -29,3 +29,4 @@ urllib3==1.22
wcwidth==0.1.7
werkzeug==0.12.2
jumpserver-python-sdk==0.0.23
jms-es-sdk==0.5.1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment