Commit a2d2e325 authored by ibuler's avatar ibuler

[Update] 优化使用sdk

parent 52b94f9e
......@@ -17,7 +17,7 @@ from .sshd import SSHServer
from .httpd import HttpServer
from .logger import create_logger
from .tasks import TaskHandler
from .recorder import get_command_recorder_class, get_replay_recorder_class
from .recorder import ReplayRecorder, CommandRecorder
from .utils import get_logger, register_app, register_service
......@@ -117,21 +117,18 @@ class Coco:
))
self.config.update(configs)
def get_recorder_class(self):
self.replay_recorder_class = get_replay_recorder_class(self.config)
self.command_recorder_class = get_command_recorder_class(self.config)
@staticmethod
def new_command_recorder():
return CommandRecorder()
def new_command_recorder(self):
return self.command_recorder_class()
def new_replay_recorder(self):
return self.replay_recorder_class()
@staticmethod
def new_replay_recorder():
return ReplayRecorder()
def bootstrap(self):
self.make_logger()
self.service.initial()
self.load_extra_conf_from_server()
self.get_recorder_class()
self.keep_heartbeat()
self.monitor_sessions()
self.monitor_sessions_replay()
......
......@@ -4,7 +4,7 @@
import os
import socket
import uuid
import traceback
from copy import deepcopy
from flask_socketio import SocketIO, Namespace, join_room
from flask import Flask, request, current_app, redirect
......
......@@ -8,6 +8,7 @@ import time
import os
import gzip
import json
from copy import deepcopy
import jms_storage
......@@ -20,60 +21,6 @@ BUF_SIZE = 1024
class ReplayRecorder(metaclass=abc.ABCMeta):
def __init__(self, session=None):
self.session = session
@abc.abstractmethod
def record(self, data):
"""
记录replay数据
:param data: 数据 {
"session": "",
"data": "",
"timestamp": ""
}
:return:
"""
@abc.abstractmethod
def session_start(self, session_id):
print("Session start: {}".format(session_id))
pass
@abc.abstractmethod
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
class CommandRecorder:
def __init__(self, session=None):
self.session = session
def record(self, data):
"""
:param data: 数据 {
"session":
"input":
"output":
"user":
"asset":
"system_user":
"timestamp":
}
:return:
"""
def session_start(self, session_id):
print("Session start: {}".format(session_id))
pass
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
class MultiReplayRecorder(ReplayRecorder):
time_start = None
storage = None
......@@ -111,13 +58,9 @@ class MultiReplayRecorder(ReplayRecorder):
self.upload_replay(session_id)
def get_storage(self):
config = current_app.config["REPLAY_STORAGE"]
if config['TYPE'] == 's3':
self.storage = jms_storage.S3Storage(config)
elif config['TYPE'] == 'oss':
self.storage = jms_storage.OSSStorage(config)
else:
self.storage = jms_storage.JMSReplayStorage(app_service)
config = deepcopy(current_app.config["REPLAY_STORAGE"])
config["SERVICE"] = app_service
self.storage = jms_storage.get_object_storage(config)
def upload_replay(self, session_id, times=3):
if times < 1:
......@@ -146,11 +89,15 @@ class MultiReplayRecorder(ReplayRecorder):
def finish_replay(self, times, session_id):
if times < 1:
logger.error("Failed finished session {}'s replay".format(session_id))
logger.error(
"Failed finished session {}'s replay".format(session_id)
)
return False
if app_service.finish_replay(session_id):
logger.info("Success finish session {}'s replay ".format(session_id))
logger.info(
"Success finish session {}'s replay ".format(session_id)
)
return True
else:
msg = "Failed finish session {}'s replay, try {} times"
......@@ -158,17 +105,18 @@ class MultiReplayRecorder(ReplayRecorder):
return self.finish_replay(times - 1, session_id)
class ServerCommandRecorder(CommandRecorder, metaclass=Singleton):
class CommandRecorder(metaclass=Singleton):
batch_size = 10
timeout = 5
no = 0
storage = None
def __init__(self):
super().__init__()
self.queue = MemoryQueue()
self.stop_evt = threading.Event()
self.push_to_server_async()
self.__class__.no += 1
self.get_storage()
def record(self, data):
if data and data['input']:
......@@ -177,72 +125,22 @@ class ServerCommandRecorder(CommandRecorder, metaclass=Singleton):
data['timestamp'] = int(data['timestamp'])
self.queue.put(data)
def get_storage(self):
config = deepcopy(current_app.config["COMMAND_STORAGE"])
config['SERVICE'] = app_service
self.storage = jms_storage.get_log_storage(config)
def push_to_server_async(self):
def func():
while not self.stop_evt.is_set():
data_set = self.queue.mget(self.batch_size, timeout=self.timeout)
logger.debug("<Session command recorder {}> queue size: {}".format(
self.no, self.queue.qsize())
)
if not data_set:
continue
logger.debug("Send {} commands to server".format(len(data_set)))
ok = app_service.push_session_command(data_set)
if not ok:
self.queue.mput(data_set)
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
def session_start(self, session_id):
pass
def session_end(self, session_id):
pass
# def __del__(self):
# print("GC: Session command storage has been gc")
class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
batch_size = 10
timeout = 5
no = 0
default_hosts = ["http://localhost"]
def __init__(self):
super().__init__()
self.queue = MemoryQueue()
self.stop_evt = threading.Event()
self.push_to_es_async()
self.__class__.no += 1
self.store = jms_storage.ESStorage(
current_app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts)
)
if not self.store.ping():
raise AssertionError("ESCommand storage init error")
def record(self, data):
if data and data['input']:
data['input'] = data['input'][:128]
data['output'] = data['output'][:1024]
data['timestamp'] = int(data['timestamp'])
self.queue.put(data)
def push_to_es_async(self):
def func():
while not self.stop_evt.is_set():
data_set = self.queue.mget(self.batch_size,
timeout=self.timeout)
logger.debug(
"<Session command recorder {}> queue size: {}".format(
self.no, self.queue.qsize())
logger.debug("Session command remain push: {}".format(
self.queue.qsize())
)
if not data_set:
continue
logger.debug("Send {} commands to server".format(len(data_set)))
ok = self.store.bulk_save(data_set)
ok = self.storage.bulk_save(data_set)
if not ok:
self.queue.mput(data_set)
......@@ -251,24 +149,9 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
thread.start()
def session_start(self, session_id):
print("Session start: {}".format(session_id))
pass
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
# def __del__(self):
# print("GC: ES command storage has been gc".format(self))
def get_command_recorder_class(config):
command_storage = config["COMMAND_STORAGE"]
storage_type = command_storage.get('TYPE')
if storage_type == "elasticsearch":
return ESCommandRecorder
else:
return ServerCommandRecorder
def get_replay_recorder_class(config):
return MultiReplayRecorder
......@@ -18,7 +18,7 @@ idna==2.6
itsdangerous==0.24
Jinja2==2.10
jmespath==0.9.3
jms-storage==0.0.14
jms-storage==0.0.15
jumpserver-python-sdk==0.0.42
MarkupSafe==1.0
oss2==2.4.0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment