Commit 983a9d8c authored by ibuler's avatar ibuler

Merge branch 'dev' of bitbucket.org:jumpserver/coco into dev

parents a21aa4e7 8ff95546
......@@ -16,7 +16,7 @@ from .sshd import SSHServer
from .httpd import HttpServer
from .logger import create_logger
from .tasks import TaskHandler
from .recorder import get_command_recorder_class, get_replay_recorder_class
from .recorder import get_command_recorder_class, ServerReplayRecorder
from .utils import get_logger
......@@ -109,7 +109,7 @@ class Coco:
self.config.update(configs)
def get_recorder_class(self):
self.replay_recorder_class = get_replay_recorder_class(self.config)
self.replay_recorder_class = ServerReplayRecorder
self.command_recorder_class = get_command_recorder_class(self.config)
def new_command_recorder(self):
......@@ -233,6 +233,9 @@ class Coco:
def remove_session(self, session):
with self.lock:
logger.info("Remove session: {}".format(session))
self.sessions.remove(session)
self.service.finish_session(session.to_json())
try:
logger.info("Remove session: {}".format(session))
self.sessions.remove(session)
self.service.finish_session(session.to_json())
except ValueError:
logger.warning("Remove session: {} fail, maybe already removed".format(session))
\ No newline at end of file
......@@ -9,9 +9,8 @@ import os
import gzip
import json
import shutil
import boto3 # AWS S3 sdk
from jms_es_sdk import ESStore
import jms_storage
from .utils import get_logger
from .alignment import MemoryQueue
......@@ -125,14 +124,19 @@ class ServerReplayRecorder(ReplayRecorder):
with open(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay'), 'rb') as f_in, \
gzip.open(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if self.push_to_server(session_id):
if self.upload_replay(session_id):
logger.info("Succeed to push {}'s {}".format(session_id, "record"))
else:
logger.error("Failed to push {}'s {}".format(session_id, "record"))
self.push_to_server(session_id)
def push_to_server(self, session_id):
if self.upload_replay(3, session_id):
self.upload_replay(session_id)
def upload_replay(self, session_id):
configs = self.app.service.load_config_from_server()
logger.debug("upload_replay print config: {}".format(configs))
self.client = jms_storage.init(configs["REPLAY_STORAGE"])
if not self.client:
self.client = jms_storage.jms(self.app.service)
if self.push_storage(3, session_id):
if self.finish_replay(3, session_id):
return True
else:
......@@ -140,21 +144,27 @@ class ServerReplayRecorder(ReplayRecorder):
else:
return False
def push_local(self, session_id):
return self.app.service.push_session_replay(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'),
session_id)
def push_to_storage(self, session_id):
return self.client.upload_file(
os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'),
time.strftime('%Y-%m-%d', time.localtime(self.starttime)) + '/' + session_id + '.replay.gz')
def upload_replay(self, times, session_id):
def push_storage(self, times, session_id):
if times > 0:
if self.push_local(session_id):
logger.info("success push session: {}'s replay log ".format(session_id))
if self.push_to_storage(session_id):
logger.info("success push session: {}'s replay log to storage ".format(session_id))
return True
else:
logger.error("failed report session {}'s replay log, try {} times".format(session_id, times))
return self.upload_replay(times - 1, session_id)
logger.error(
"failed report session {}'s replay log to storage, try {} times".format(session_id, times))
return self.push_storage(times - 1, session_id)
else:
logger.error("failed report session {}'s replay log".format(session_id))
return False
logger.error("failed report session {}'s replay log storage, try to push to local".format(session_id))
if self.client.type() == 'jms':
return False
else:
self.client = jms_storage.jms(self.app.service)
return self.push_storage(3, session_id)
def finish_replay(self, times, session_id):
if times > 0:
......@@ -232,7 +242,7 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
self.stop_evt = threading.Event()
self.push_to_es_async()
self.__class__.no += 1
self.store = ESStore(app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts))
self.store = jms_storage.ESStore(app.config["COMMAND_STORAGE"].get("HOSTS", self.default_hosts))
if not self.store.ping():
raise AssertionError("ESCommand storage init error")
......@@ -273,58 +283,6 @@ class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
print("{} has been gc".format(self))
class S3ReplayRecorder(ServerReplayRecorder):
def __init__(self, app):
super().__init__(app)
self.bucket = app.config["REPLAY_STORAGE"].get("BUCKET", "jumpserver")
self.REGION = app.config["REPLAY_STORAGE"].get("REGION", None)
self.ACCESS_KEY = app.config["REPLAY_STORAGE"].get("ACCESS_KEY", None)
self.SECRET_KEY = app.config["REPLAY_STORAGE"].get("SECRET_KEY", None)
if self.ACCESS_KEY and self.REGION and self.SECRET_KEY:
self.s3 = boto3.client('s3',
region_name=self.REGION,
aws_access_key_id=self.ACCESS_KEY,
aws_secret_access_key=self.SECRET_KEY)
else:
self.s3 = boto3.client('s3')
def push_to_s3(self, session_id):
logger.debug("push to s3")
try:
self.s3.upload_file(
os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'),
self.bucket,
time.strftime('%Y-%m-%d', time.localtime(
self.starttime)) + '/' + session_id + '.replay.gz')
return True
except:
return False
def upload_replay(self, times, session_id):
if times > 0:
if self.push_to_s3(session_id):
logger.info("success push session: {}'s replay log to S3 ".format(session_id))
return True
else:
logger.error("failed report session {}'s replay log to S3, try {} times".format(session_id, times))
return self.upload_replay(times - 1, session_id)
else:
logger.error("failed report session {}'s replay log S3, try to push to local".format(session_id))
return self.upload_replay_to_local(3, session_id)
def upload_replay_to_local(self, times, session_id):
if times > 0:
if self.push_local(session_id):
logger.info("success push session: {}'s replay log ".format(session_id))
return True
else:
logger.error("failed report session {}'s replay log, try {} times".format(session_id, times))
return self.upload_replay_to_local(times - 1, session_id)
else:
logger.error("failed report session {}'s replay log".format(session_id))
return False
def get_command_recorder_class(config):
command_storage = config["COMMAND_STORAGE"]
storage_type = command_storage.get('TYPE')
......@@ -334,12 +292,7 @@ def get_command_recorder_class(config):
else:
return ServerCommandRecorder
def get_replay_recorder_class(config):
replay_storage = config["REPLAY_STORAGE"]
logger.debug(replay_storage)
storage_type = replay_storage.get('TYPE')
if storage_type == "s3":
return S3ReplayRecorder
else:
return ServerReplayRecorder
#
# def get_replay_recorder_class(config):
# ServerReplayRecorder.client = jms_storage.init(config["REPLAY_STORAGE"])
# return ServerReplayRecorder
asn1crypto==0.23.0
bcrypt==3.1.4
boto3==1.5.18
botocore==1.8.32
certifi==2017.11.5
boto3==1.6.5
botocore==1.9.5
cachetools==2.0.1
certifi==2018.1.18
cffi==1.11.2
chardet==3.0.4
click==6.7
crcmod==1.7
cryptography==2.1.4
docutils==0.14
dotmap==1.2.20
elasticsearch==6.1.1
Flask==0.12.2
Flask-SocketIO==2.9.2
idna==2.6
itsdangerous==0.24
Jinja2==2.10
jmespath==0.9.3
jms-es-sdk==0.5.2
jms-storage==0.0.11
jumpserver-python-sdk==0.0.32
MarkupSafe==1.0
oss2==2.4.0
paramiko==2.4.0
psutil==5.4.1
pyasn1==0.4.2
pycparser==2.18
PyNaCl==1.2.0
pyte==0.7.0
python-dateutil==2.6.1
python-engineio==2.0.1
python-gssapi==0.6.4
python-socketio==1.8.3
pytz==2017.3
requests==2.18.4
s3transfer==0.1.13
simplejson==3.13.2
six==1.11.0
tornado==4.5.2
urllib3==1.22
wcwidth==0.1.7
werkzeug==0.12.2
jumpserver-python-sdk==0.0.32
jms-es-sdk
Werkzeug==0.12.2
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment