feat: jms_storage 0.0.9 test

parent 35f530f3
......@@ -9,9 +9,8 @@ import os
import gzip
import json
import shutil
import boto3 # AWS S3 sdk
from jms_es_sdk import ESStore
import jms_storage
from .utils import get_logger
from .alignment import MemoryQueue
......@@ -93,6 +92,7 @@ class ServerReplayRecorder(ReplayRecorder):
def __init__(self, app):
super().__init__(app)
self.file = None
self.client = None
def record(self, data):
"""
......@@ -125,14 +125,16 @@ class ServerReplayRecorder(ReplayRecorder):
with open(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay'), 'rb') as f_in, \
gzip.open(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if self.push_to_server(session_id):
if self.upload_replay(session_id):
logger.info("Succeed to push {}'s {}".format(session_id, "record"))
else:
logger.error("Failed to push {}'s {}".format(session_id, "record"))
self.push_to_server(session_id)
self.upload_replay(session_id)
def push_to_server(self, session_id):
if self.upload_replay(3, session_id):
def upload_replay(self, session_id):
if not self.client:
self.client = jms_storage.jms(self.app.service)
if self.push_storage(3, session_id):
if self.finish_replay(3, session_id):
return True
else:
......@@ -140,21 +142,27 @@ class ServerReplayRecorder(ReplayRecorder):
else:
return False
def push_local(self, session_id):
return self.app.service.push_session_replay(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'),
session_id)
def push_to_storage(self, session_id):
return self.client.upload_file(
os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'),
time.strftime('%Y-%m-%d', time.localtime(self.starttime)) + '/' + session_id + '.replay.gz')
def upload_replay(self, times, session_id):
def push_storage(self, times, session_id):
if times > 0:
if self.push_local(session_id):
logger.info("success push session: {}'s replay log ".format(session_id))
if self.push_to_storage(session_id):
logger.info("success push session: {}'s replay log to storage ".format(session_id))
return True
else:
logger.error("failed report session {}'s replay log, try {} times".format(session_id, times))
return self.upload_replay(times - 1, session_id)
logger.error(
"failed report session {}'s replay log to storage, try {} times".format(session_id, times))
return self.push_storage(times - 1, session_id)
else:
logger.error("failed report session {}'s replay log".format(session_id))
return False
logger.error("failed report session {}'s replay log storage, try to push to local".format(session_id))
if self.client.type() == 'jms':
return False
else:
self.client = jms_storage.jms(self.app.service)
return self.push_storage(3, session_id)
def finish_replay(self, times, session_id):
if times > 0:
......@@ -232,7 +240,7 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
self.stop_evt = threading.Event()
self.push_to_es_async()
self.__class__.no += 1
self.store = ESStore(app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts))
self.store = jms_storage.ESStore(app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts))
if not self.store.ping():
raise AssertionError("ESCommand storage init error")
......@@ -273,58 +281,6 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
print("{} has been gc".format(self))
class S3ReplayRecorder(ServerReplayRecorder):
def __init__(self, app):
super().__init__(app)
self.bucket = app.config["REPLAY_STORAGE"].get("BUCKET", "jumpserver")
self.REGION = app.config["REPLAY_STORAGE"].get("REGION", None)
self.ACCESS_KEY = app.config["REPLAY_STORAGE"].get("ACCESS_KEY", None)
self.SECRET_KEY = app.config["REPLAY_STORAGE"].get("SECRET_KEY", None)
if self.ACCESS_KEY and self.REGION and self.SECRET_KEY:
self.s3 = boto3.client('s3',
region_name=self.REGION,
aws_access_key_id=self.ACCESS_KEY,
aws_secret_access_key=self.SECRET_KEY)
else:
self.s3 = boto3.client('s3')
def push_to_s3(self, session_id):
logger.debug("push to s3")
try:
self.s3.upload_file(
os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'),
self.bucket,
time.strftime('%Y-%m-%d', time.localtime(
self.starttime)) + '/' + session_id + '.replay.gz')
return True
except:
return False
def upload_replay(self, times, session_id):
if times > 0:
if self.push_to_s3(session_id):
logger.info("success push session: {}'s replay log to S3 ".format(session_id))
return True
else:
logger.error("failed report session {}'s replay log to S3, try {} times".format(session_id, times))
return self.upload_replay(times - 1, session_id)
else:
logger.error("failed report session {}'s replay log S3, try to push to local".format(session_id))
return self.upload_replay_to_local(3, session_id)
def upload_replay_to_local(self, times, session_id):
if times > 0:
if self.push_local(session_id):
logger.info("success push session: {}'s replay log ".format(session_id))
return True
else:
logger.error("failed report session {}'s replay log, try {} times".format(session_id, times))
return self.upload_replay_to_local(times - 1, session_id)
else:
logger.error("failed report session {}'s replay log".format(session_id))
return False
def get_command_recorder_class(config):
command_storage = config["COMMAND_STORAGE"]
storage_type = command_storage.get('TYPE')
......@@ -336,10 +292,5 @@ def get_command_recorder_class(config):
def get_replay_recorder_class(config):
replay_storage = config["REPLAY_STORAGE"]
logger.debug(replay_storage)
storage_type = replay_storage.get('TYPE')
if storage_type == "s3":
return S3ReplayRecorder
else:
return ServerReplayRecorder
ServerReplayRecorder.client = jms_storage.init(config["REPLAY_STORAGE"])
return ServerReplayRecorder
asn1crypto==0.23.0
bcrypt==3.1.4
boto3==1.5.18
botocore==1.8.32
certifi==2017.11.5
boto3==1.6.5
botocore==1.9.5
cachetools==2.0.1
certifi==2018.1.18
cffi==1.11.2
chardet==3.0.4
click==6.7
crcmod==1.7
cryptography==2.1.4
docutils==0.14
dotmap==1.2.20
elasticsearch==6.1.1
Flask==0.12.2
Flask-SocketIO==2.9.2
idna==2.6
itsdangerous==0.24
Jinja2==2.10
jmespath==0.9.3
jms-es-sdk==0.5.2
jms-storage==0.0.9
jumpserver-python-sdk==0.0.31
MarkupSafe==1.0
oss2==2.4.0
paramiko==2.4.0
psutil==5.4.1
pyasn1==0.4.2
pycparser==2.18
PyNaCl==1.2.0
pyte==0.7.0
python-dateutil==2.6.1
python-engineio==2.0.1
python-gssapi==0.6.4
python-socketio==1.8.3
pytz==2017.3
requests==2.18.4
s3transfer==0.1.13
simplejson==3.13.2
six==1.11.0
tornado==4.5.2
urllib3==1.22
wcwidth==0.1.7
werkzeug==0.12.2
jumpserver-python-sdk==0.0.32
jms-es-sdk
Werkzeug==0.12.2
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment