Commit 52b94f9e authored by ibuler's avatar ibuler

[Update] 增加录像监控功能,定时上传失败的日志

parent ae5a90b6
...@@ -17,7 +17,7 @@ from .sshd import SSHServer ...@@ -17,7 +17,7 @@ from .sshd import SSHServer
from .httpd import HttpServer from .httpd import HttpServer
from .logger import create_logger from .logger import create_logger
from .tasks import TaskHandler from .tasks import TaskHandler
from .recorder import get_command_recorder_class, ServerReplayRecorder from .recorder import get_command_recorder_class, get_replay_recorder_class
from .utils import get_logger, register_app, register_service from .utils import get_logger, register_app, register_service
...@@ -56,7 +56,6 @@ class Coco: ...@@ -56,7 +56,6 @@ class Coco:
def __init__(self, root_path=None): def __init__(self, root_path=None):
self.root_path = root_path if root_path else BASE_DIR self.root_path = root_path if root_path else BASE_DIR
self.config = self.config_class(self.root_path, defaults=self.default_config)
self.sessions = [] self.sessions = []
self.clients = [] self.clients = []
self.lock = threading.Lock() self.lock = threading.Lock()
...@@ -67,8 +66,15 @@ class Coco: ...@@ -67,8 +66,15 @@ class Coco:
self.replay_recorder_class = None self.replay_recorder_class = None
self.command_recorder_class = None self.command_recorder_class = None
self._task_handler = None self._task_handler = None
self.config = None
self.init_config()
register_app(self) register_app(self)
def init_config(self):
self.config = self.config_class(
self.root_path, defaults=self.default_config
)
@property @property
def name(self): def name(self):
if self.config['NAME']: if self.config['NAME']:
...@@ -112,7 +118,7 @@ class Coco: ...@@ -112,7 +118,7 @@ class Coco:
self.config.update(configs) self.config.update(configs)
def get_recorder_class(self): def get_recorder_class(self):
self.replay_recorder_class = ServerReplayRecorder self.replay_recorder_class = get_replay_recorder_class(self.config)
self.command_recorder_class = get_command_recorder_class(self.config) self.command_recorder_class = get_command_recorder_class(self.config)
def new_command_recorder(self): def new_command_recorder(self):
...@@ -128,6 +134,7 @@ class Coco: ...@@ -128,6 +134,7 @@ class Coco:
self.get_recorder_class() self.get_recorder_class()
self.keep_heartbeat() self.keep_heartbeat()
self.monitor_sessions() self.monitor_sessions()
self.monitor_sessions_replay()
def heartbeat(self): def heartbeat(self):
_sessions = [s.to_json() for s in self.sessions] _sessions = [s.to_json() for s in self.sessions]
...@@ -156,6 +163,26 @@ class Coco: ...@@ -156,6 +163,26 @@ class Coco:
thread = threading.Thread(target=func) thread = threading.Thread(target=func)
thread.start() thread.start()
def monitor_sessions_replay(self):
interval = 10
recorder = self.new_replay_recorder()
log_dir = os.path.join(self.config['LOG_DIR'])
def func():
while not self.stop_evt.is_set():
active_sessions = [str(session.id) for session in self.sessions]
for filename in os.listdir(log_dir):
session_id = filename.split('.')[0]
if len(session_id) != 36:
continue
if session_id not in active_sessions:
recorder.file_path = os.path.join(log_dir, filename)
recorder.upload_replay(session_id, 1)
time.sleep(interval)
thread = threading.Thread(target=func)
thread.start()
def monitor_sessions(self): def monitor_sessions(self):
interval = self.config["HEARTBEAT_INTERVAL"] interval = self.config["HEARTBEAT_INTERVAL"]
......
...@@ -38,6 +38,9 @@ class InteractiveServer: ...@@ -38,6 +38,9 @@ class InteractiveServer:
@search_result.setter @search_result.setter
def search_result(self, value): def search_result(self, value):
if not value:
self._search_result = value
return
value = self.filter_system_users(value) value = self.filter_system_users(value)
self._search_result = value self._search_result = value
...@@ -88,7 +91,7 @@ class InteractiveServer: ...@@ -88,7 +91,7 @@ class InteractiveServer:
result = [] result = []
# 所有的 # 所有的
if q == '': if q in ('', None):
result = self.assets result = self.assets
# 用户输入的是数字,可能想使用id唯一键搜索 # 用户输入的是数字,可能想使用id唯一键搜索
elif q.isdigit() and self.search_result and \ elif q.isdigit() and self.search_result and \
...@@ -242,6 +245,7 @@ class InteractiveServer: ...@@ -242,6 +245,7 @@ class InteractiveServer:
self.search_assets(opt) self.search_assets(opt)
if self.search_result and len(self.search_result) == 1: if self.search_result and len(self.search_result) == 1:
asset = self.search_result[0] asset = self.search_result[0]
self.search_result = None
if asset.platform == "Windows": if asset.platform == "Windows":
self.client.send(warning( self.client.send(warning(
_("终端不支持登录windows, 请使用web terminal访问")) _("终端不支持登录windows, 请使用web terminal访问"))
......
...@@ -249,11 +249,12 @@ class WSProxy: ...@@ -249,11 +249,12 @@ class WSProxy:
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
data = self.child.recv(BUF_SIZE) data = self.child.recv(BUF_SIZE)
except OSError: except (OSError, EOFError):
continue self.close()
break
if len(data) == 0: if len(data) == 0:
self.ws.emit("logout", {"room": self.room_id}, room=self.room_id)
self.close() self.close()
break
data = data.decode(errors="ignore") data = data.decode(errors="ignore")
self.ws.emit("data", {'data': data, 'room': self.room_id}, self.ws.emit("data", {'data': data, 'room': self.room_id},
room=self.room_id) room=self.room_id)
...@@ -266,6 +267,7 @@ class WSProxy: ...@@ -266,6 +267,7 @@ class WSProxy:
thread.start() thread.start()
def close(self): def close(self):
self.ws.emit("logout", {"room": self.room_id}, room=self.room_id)
self.stop_event.set() self.stop_event.set()
try: try:
self.child.shutdown(1) self.child.shutdown(1)
......
...@@ -73,7 +73,7 @@ class CommandRecorder: ...@@ -73,7 +73,7 @@ class CommandRecorder:
pass pass
class ServerReplayRecorder(ReplayRecorder): class MultiReplayRecorder(ReplayRecorder):
time_start = None time_start = None
storage = None storage = None
...@@ -81,6 +81,7 @@ class ServerReplayRecorder(ReplayRecorder): ...@@ -81,6 +81,7 @@ class ServerReplayRecorder(ReplayRecorder):
super().__init__() super().__init__()
self.file = None self.file = None
self.file_path = None self.file_path = None
self.get_storage()
def record(self, data): def record(self, data):
""" """
...@@ -107,48 +108,44 @@ class ServerReplayRecorder(ReplayRecorder): ...@@ -107,48 +108,44 @@ class ServerReplayRecorder(ReplayRecorder):
def session_end(self, session_id): def session_end(self, session_id):
self.file.write('"0":""}') self.file.write('"0":""}')
self.file.close() self.file.close()
if self.upload_replay(session_id): self.upload_replay(session_id)
logger.info("Succeed to push {}'s {}".format(session_id, "record"))
def get_storage(self):
config = current_app.config["REPLAY_STORAGE"]
if config['TYPE'] == 's3':
self.storage = jms_storage.S3Storage(config)
elif config['TYPE'] == 'oss':
self.storage = jms_storage.OSSStorage(config)
else: else:
logger.error("Failed to push {}'s {}".format(session_id, "record")) self.storage = jms_storage.JMSReplayStorage(app_service)
def upload_replay(self, session_id): def upload_replay(self, session_id, times=3):
configs = app_service.load_config_from_server() if times < 1:
logger.debug("upload_replay print config: {}".format(configs)) if self.storage.type == 'jms':
self.storage = jms_storage.init(configs["REPLAY_STORAGE"]) return False
if not self.storage: else:
self.storage = jms_storage.jms(app_service) self.storage = jms_storage.JMSReplayStorage(app_service)
if self.push_file(3, session_id): self.upload_replay(session_id, times=3)
ok, msg = self.push_to_storage(session_id)
if not ok:
msg = 'Failed push replay file: {}, try again {}'.format(msg, times)
logger.warn(msg)
self.upload_replay(session_id, times-1)
else:
msg = 'Success push replay file: {}'.format(session_id)
logger.info(msg)
self.finish_replay(3, session_id)
os.unlink(self.file_path) os.unlink(self.file_path)
return True return True
else:
return False
def push_to_storage(self, session_id): def push_to_storage(self, session_id):
dt = time.strftime('%Y-%m-%d', time.localtime(self.time_start)) dt = time.strftime('%Y-%m-%d', time.localtime(self.time_start))
target = dt + '/' + session_id + '.replay.gz' target = dt + '/' + session_id + '.replay.gz'
return self.storage.upload_file(self.file_path, target) return self.storage.upload(self.file_path, target)
def push_file(self, times, session_id):
if times < 0:
if self.storage.type() == 'jms':
return False
else:
msg = "Failed push session {}'s replay log to storage".format(session_id)
logger.error(msg)
self.storage = jms_storage.jms(app_service)
return self.push_file(3, session_id)
if self.push_to_storage(session_id):
logger.info("Success push session: {}'s replay log to storage ".format(session_id))
return True
else:
msg = "Failed push session {}'s replay log to storage, try {} times".format(session_id, times)
logger.error(msg)
return self.push_file(times - 1, session_id)
def finish_replay(self, times, session_id): def finish_replay(self, times, session_id):
if times < 0: if times < 1:
logger.error("Failed finished session {}'s replay".format(session_id)) logger.error("Failed finished session {}'s replay".format(session_id))
return False return False
...@@ -156,7 +153,8 @@ class ServerReplayRecorder(ReplayRecorder): ...@@ -156,7 +153,8 @@ class ServerReplayRecorder(ReplayRecorder):
logger.info("Success finish session {}'s replay ".format(session_id)) logger.info("Success finish session {}'s replay ".format(session_id))
return True return True
else: else:
logger.error("Failed finish session {}'s replay, try {} times".format(session_id, times)) msg = "Failed finish session {}'s replay, try {} times"
logger.error(msg.format(session_id, times))
return self.finish_replay(times - 1, session_id) return self.finish_replay(times - 1, session_id)
...@@ -219,7 +217,7 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton): ...@@ -219,7 +217,7 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
self.stop_evt = threading.Event() self.stop_evt = threading.Event()
self.push_to_es_async() self.push_to_es_async()
self.__class__.no += 1 self.__class__.no += 1
self.store = jms_storage.ESStore( self.store = jms_storage.ESStorage(
current_app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts) current_app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts)
) )
if not self.store.ping(): if not self.store.ping():
...@@ -271,7 +269,6 @@ def get_command_recorder_class(config): ...@@ -271,7 +269,6 @@ def get_command_recorder_class(config):
else: else:
return ServerCommandRecorder return ServerCommandRecorder
#
# def get_replay_recorder_class(config): def get_replay_recorder_class(config):
# ServerReplayRecorder.client = jms_storage.init(config["REPLAY_STORAGE"]) return MultiReplayRecorder
# return ServerReplayRecorder
...@@ -18,8 +18,7 @@ idna==2.6 ...@@ -18,8 +18,7 @@ idna==2.6
itsdangerous==0.24 itsdangerous==0.24
Jinja2==2.10 Jinja2==2.10
jmespath==0.9.3 jmespath==0.9.3
jms-es-sdk==0.5.2 jms-storage==0.0.14
jms-storage==0.0.12
jumpserver-python-sdk==0.0.42 jumpserver-python-sdk==0.0.42
MarkupSafe==1.0 MarkupSafe==1.0
oss2==2.4.0 oss2==2.4.0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment