Commit fe7bcb81 authored by ibuler's avatar ibuler

[Feture] 完成命令上传api

parent ae977b66
...@@ -3,7 +3,6 @@ import os ...@@ -3,7 +3,6 @@ import os
import time import time
import threading import threading
import logging import logging
import multiprocessing
from jms.service import AppService from jms.service import AppService
...@@ -11,9 +10,8 @@ from .config import Config ...@@ -11,9 +10,8 @@ from .config import Config
from .sshd import SSHServer from .sshd import SSHServer
from .httpd import HttpServer from .httpd import HttpServer
from .logging import create_logger from .logging import create_logger
from .queue import MemoryQueue from .queue import get_queue
from .record import ServerCommandRecorder, ServerReplayRecorder, \ from .record import get_recorder, START_SENTINEL, END_SENTINEL
START_SENTINEL, DONE_SENTINEL
__version__ = '0.4.0' __version__ = '0.4.0'
...@@ -38,18 +36,19 @@ class Coco: ...@@ -38,18 +36,19 @@ class Coco:
'LOG_LEVEL': 'INFO', 'LOG_LEVEL': 'INFO',
'LOG_DIR': os.path.join(BASE_DIR, 'logs'), 'LOG_DIR': os.path.join(BASE_DIR, 'logs'),
'SESSION_DIR': os.path.join(BASE_DIR, 'sessions'), 'SESSION_DIR': os.path.join(BASE_DIR, 'sessions'),
'REPLAY_STORE_ENGINE': 'server', # local, server
'COMMAND_STORE_ENGINE': 'server', # local, server, elasticsearch(not yet)
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip 'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
'SSH_PASSWORD_AUTH': True, 'SSH_PASSWORD_AUTH': True,
'SSH_PUBLIC_KEY_AUTH': True, 'SSH_PUBLIC_KEY_AUTH': True,
'HEARTBEAT_INTERVAL': 5, 'HEARTBEAT_INTERVAL': 5,
'MAX_CONNECTIONS': 500, 'MAX_CONNECTIONS': 500,
'ADMINS': '', 'ADMINS': '',
'REPLAY_RECORD_ENGINE': 'server', # local, server
'COMMAND_RECORD_ENGINE': 'server', # local, server, elasticsearch(not yet)
'QUEUE_ENGINE': 'memory', 'QUEUE_ENGINE': 'memory',
'QUEUE_MAX_SIZE': 0, 'QUEUE_MAX_SIZE': 0,
'MAX_PUSH_THREADS': 5, 'MAX_PUSH_THREADS': 5,
# 'MAX_RECORD_OUTPUT_LENGTH': 4096, 'MAX_RECORD_INPUT_LENGTH': 128,
'MAX_RECORD_OUTPUT_LENGTH': 1024,
} }
def __init__(self, name=None, root_path=None): def __init__(self, name=None, root_path=None):
...@@ -63,10 +62,10 @@ class Coco: ...@@ -63,10 +62,10 @@ class Coco:
self._service = None self._service = None
self._sshd = None self._sshd = None
self._httpd = None self._httpd = None
self._command_queue = None
self._replay_queue = None self._replay_queue = None
self._command_recorder = None self._command_queue = None
self._replay_recorder = None self._replay_recorder = None
self._command_recorder = None
@property @property
def service(self): def service(self):
...@@ -94,26 +93,39 @@ class Coco: ...@@ -94,26 +93,39 @@ class Coco:
pass pass
def initial_queue(self): def initial_queue(self):
logger.debug("Initial app queue") self._replay_queue, self._command_queue = get_queue(self.config)
queue_size = int(self.config['QUEUE_MAX_SIZE'])
# Todo: For other queue
if self.config['QUEUE_ENGINE'] == 'memory':
self._command_queue = MemoryQueue(queue_size)
self._replay_queue = MemoryQueue(queue_size)
else:
self._command_queue = MemoryQueue(queue_size)
self._replay_queue = MemoryQueue(queue_size)
def initial_recorder(self): def initial_recorder(self):
if self.config['REPLAY_STORE_ENGINE'] == 'server': self._replay_recorder, self._command_recorder = get_recorder(self)
self._replay_recorder = ServerReplayRecorder(self)
else: def bootstrap(self):
self._replay_recorder = ServerReplayRecorder(self) self.make_logger()
self.service.initial()
self.load_extra_conf_from_server()
self.initial_queue()
self.initial_recorder()
self.keep_heartbeat()
self.keep_push_record()
self.monitor_sessions()
if self.config['COMMAND_STORE_ENGINE'] == 'server': def heartbeat(self):
self._command_recorder = ServerCommandRecorder(self) _sessions = [s.to_json() for s in self.sessions]
tasks = self.service.terminal_heartbeat(_sessions)
if tasks:
self.handle_task(tasks)
if tasks is False:
return False
else: else:
self._command_recorder = ServerCommandRecorder(self) return True
def keep_heartbeat(self):
def func():
while not self.stop_evt.is_set():
self.heartbeat()
time.sleep(self.config["HEARTBEAT_INTERVAL"])
thread = threading.Thread(target=func)
thread.start()
def keep_push_record(self): def keep_push_record(self):
threads = [] threads = []
...@@ -121,12 +133,14 @@ class Coco: ...@@ -121,12 +133,14 @@ class Coco:
def push_command(q, callback, size=10): def push_command(q, callback, size=10):
while not self.stop_evt.is_set(): while not self.stop_evt.is_set():
data_set = q.mget(size) data_set = q.mget(size)
callback(data_set) if data_set and not callback(data_set):
q.mput(data_set)
def push_replay(q, callback, size=10): def push_replay(q, callback, size=10):
while not self.stop_evt.is_set(): while not self.stop_evt.is_set():
data_set = q.mget(size) data_set = q.mget(size)
callback(data_set) if data_set and not callback(data_set):
q.mput(data_set)
for i in range(self.config['MAX_PUSH_THREADS']): for i in range(self.config['MAX_PUSH_THREADS']):
t = threading.Thread(target=push_command, args=( t = threading.Thread(target=push_command, args=(
...@@ -142,33 +156,6 @@ class Coco: ...@@ -142,33 +156,6 @@ class Coco:
t.start() t.start()
logger.info("Start push record process: {}".format(t)) logger.info("Start push record process: {}".format(t))
def bootstrap(self):
self.make_logger()
self.initial_queue()
self.initial_recorder()
self.service.initial()
self.load_extra_conf_from_server()
self.keep_push_record()
self.keep_heartbeat()
self.monitor_sessions()
def heartbeat(self):
_sessions = [s.to_json() for s in self.sessions]
tasks = self.service.terminal_heartbeat(_sessions)
if tasks:
self.handle_task(tasks)
logger.info("Command queue size: {}".format(self._command_queue.qsize()))
logger.info("Replay queue size: {}".format(self._replay_queue.qsize()))
def keep_heartbeat(self):
def func():
while not self.stop_evt.is_set():
self.heartbeat()
time.sleep(self.config["HEARTBEAT_INTERVAL"])
thread = threading.Thread(target=func)
thread.start()
def monitor_sessions(self): def monitor_sessions(self):
interval = self.config["HEARTBEAT_INTERVAL"] interval = self.config["HEARTBEAT_INTERVAL"]
...@@ -246,57 +233,73 @@ class Coco: ...@@ -246,57 +233,73 @@ class Coco:
def add_session(self, session): def add_session(self, session):
with self.lock: with self.lock:
self.sessions.append(session) self.sessions.append(session)
self.put_command_start_queue(session)
self.put_replay_done_queue(session)
self.heartbeat() self.heartbeat()
self.put_command_start_queue(session)
self.put_replay_start_queue(session)
def remove_session(self, session): def remove_session(self, session):
with self.lock: with self.lock:
logger.info("Remove session: {}".format(session)) logger.info("Remove session: {}".format(session))
self.sessions.remove(session) for i in range(10):
self.put_command_done_queue(session) if self.heartbeat():
self.put_replay_done_queue(session) self.sessions.remove(session)
self.heartbeat() self.put_command_done_queue(session)
self.put_replay_done_queue(session)
else:
time.sleep(1)
def put_replay_queue(self, session, data): def put_replay_queue(self, session, data):
logger.debug("Put replay data: {} {}".format(session, data)) logger.debug("Put replay data: {} {}".format(session, data))
self._replay_queue.put(( self._replay_queue.put({
session.id, data, time.time() "session": session.id,
)) "data": data,
"timestamp": time.time()
})
def put_replay_start_queue(self, session): def put_replay_start_queue(self, session):
self._replay_queue.put(( self._replay_queue.put({
session.id, START_SENTINEL, time.time() "session": session.id,
)) "data": START_SENTINEL,
"timestamp": time.time()
})
def put_replay_done_queue(self, session): def put_replay_done_queue(self, session):
self._replay_queue.put(( self._replay_queue.put({
session.id, DONE_SENTINEL, time.time() "session": session.id,
)) "data": END_SENTINEL,
"timestamp": time.time()
})
def put_command_queue(self, session, _input, _output): def put_command_queue(self, session, _input, _output):
logger.debug("Put command data: {} {} {}".format(session, _input, _output)) logger.debug("Put command data: {} {} {}".format(session, _input, _output))
self._command_queue.put(( self._command_queue.put({
session.id, _input[:128], _output[:1024], session.client.user.username, "session": session.id,
session.server.asset.hostname, session.server.system_user.username, "input": _input[:128],
time.time() "output": _output[:1024],
)) "user": session.client.user.username,
"asset": session.server.asset.hostname,
"system_user": session.server.system_user.username,
"timestamp": int(time.time())
})
def put_command_start_queue(self, session): def put_command_start_queue(self, session):
self._command_queue.put(( self._command_queue.put({
session.id, START_SENTINEL, START_SENTINEL, "session": session.id,
session.client.user.username, "input": START_SENTINEL,
session.server.asset.hostname, "output": START_SENTINEL,
session.server.system_user.username, "user": session.client.user.username,
time.time() "asset": session.server.asset.hostname,
)) "system_user": session.server.system_user.username,
"timestamp": int(time.time())
})
def put_command_done_queue(self, session): def put_command_done_queue(self, session):
self._command_queue.put(( self._command_queue.put({
session.id, DONE_SENTINEL, DONE_SENTINEL, "session": session.id,
session.client.user.username, "input": END_SENTINEL,
session.server.asset.hostname, "output": END_SENTINEL,
session.server.system_user.username, "user": session.client.user.username,
time.time() "asset": session.server.asset.hostname,
)) "system_user": session.server.system_user.username,
"timestamp": int(time.time())
})
\ No newline at end of file
...@@ -39,6 +39,7 @@ class ProxyServer: ...@@ -39,6 +39,7 @@ class ProxyServer:
self.app.add_session(self.session) self.app.add_session(self.session)
self.watch_win_size_change_async() self.watch_win_size_change_async()
self.session.bridge() self.session.bridge()
self.app.remove_session(self.session)
def validate_permission(self, asset, system_user): def validate_permission(self, asset, system_user):
""" """
......
...@@ -6,7 +6,7 @@ import weakref ...@@ -6,7 +6,7 @@ import weakref
from . import char from . import char
from . import utils from . import utils
from .record import START_SENTINEL, DONE_SENTINEL from .record import START_SENTINEL, END_SENTINEL
BUF_SIZE = 4096 BUF_SIZE = 4096
logger = logging.getLogger(__file__) logger = logging.getLogger(__file__)
......
...@@ -14,6 +14,25 @@ class MultiQueueMixin: ...@@ -14,6 +14,25 @@ class MultiQueueMixin:
break break
return items return items
def mput(self, data_set):
for i in data_set:
self.put(i)
class MemoryQueue(MultiQueueMixin, queue.Queue): class MemoryQueue(MultiQueueMixin, queue.Queue):
pass pass
def get_queue(config):
queue_engine = config['QUEUE_ENGINE']
queue_size = config['QUEUE_MAX_SIZE']
if queue_engine == "server":
replay_queue = MemoryQueue(queue_size)
command_queue = MemoryQueue(queue_size)
else:
replay_queue = MemoryQueue(queue_size)
command_queue = MemoryQueue(queue_size)
return replay_queue, command_queue
...@@ -9,7 +9,7 @@ logger = logging.getLogger(__file__) ...@@ -9,7 +9,7 @@ logger = logging.getLogger(__file__)
BUF_SIZE = 1024 BUF_SIZE = 1024
START_SENTINEL = object() START_SENTINEL = object()
DONE_SENTINEL = object() END_SENTINEL = object()
class ReplayRecorder(metaclass=abc.ABCMeta): class ReplayRecorder(metaclass=abc.ABCMeta):
...@@ -21,21 +21,20 @@ class ReplayRecorder(metaclass=abc.ABCMeta): ...@@ -21,21 +21,20 @@ class ReplayRecorder(metaclass=abc.ABCMeta):
def record_replay(self, data_set): def record_replay(self, data_set):
""" """
记录replay数据 记录replay数据
:param data_set: 数据集 [("session", "data", "timestamp"),] :param data_set: 数据集 [{"session": "", "data": "", "timestamp": ""},]
:return: :return:
""" """
for data in data_set: for data in data_set:
if data[1] is START_SENTINEL: if data["data"] is START_SENTINEL:
data_set.remove(data) data_set.remove(data)
self.session_start(data[0]) self.session_start(data["session"])
if data[1] is DONE_SENTINEL: if data["data"] is END_SENTINEL:
data_set.remove(data) data_set.remove(data)
self.session_done(data[0]) self.session_end(data["session"])
@abc.abstractmethod @abc.abstractmethod
def session_done(self, session_id): def session_end(self, session_id):
pass pass
@abc.abstractmethod @abc.abstractmethod
...@@ -56,45 +55,67 @@ class CommandRecorder(metaclass=abc.ABCMeta): ...@@ -56,45 +55,67 @@ class CommandRecorder(metaclass=abc.ABCMeta):
:return: :return:
""" """
for data in data_set: for data in data_set:
if data[1] is START_SENTINEL: if data["input"] is START_SENTINEL:
data_set.remove(data) data_set.remove(data)
self.session_start(data[0]) self.session_start(data["session"])
if data[1] is DONE_SENTINEL: if data["input"] is END_SENTINEL:
data_set.remove(data) data_set.remove(data)
self.session_done(data[0]) self.session_end(data["session"])
@abc.abstractmethod @abc.abstractmethod
def session_start(self, session_id): def session_start(self, session_id):
pass pass
@abc.abstractmethod @abc.abstractmethod
def session_done(self, session_id): def session_end(self, session_id):
pass pass
class ServerReplayRecorder(ReplayRecorder): class ServerReplayRecorder(ReplayRecorder):
def record_replay(self, data_set): def record_replay(self, data_set):
"""
:param data_set:
:return:
"""
# Todo: <liuzheng712@gmail.com>
super().record_replay(data_set) super().record_replay(data_set)
print(data_set)
def session_start(self, session_id): def session_start(self, session_id):
print("Session {} start".format(session_id)) print("When session {} start exec".format(session_id))
def session_done(self, session_id): def session_end(self, session_id):
print("Session {} done".format(session_id)) print("When session {} end start".format(session_id))
class ServerCommandRecorder(CommandRecorder): class ServerCommandRecorder(CommandRecorder):
def record_command(self, data_set): def record_command(self, data_set):
if not data_set:
return True
super().record_command(data_set) super().record_command(data_set)
print(data_set) return self.app.service.push_session_command(data_set)
def session_start(self, session_id): def session_start(self, session_id):
print("Session {} start".format(session_id)) print("When session {} start exec".format(session_id))
def session_end(self, session_id):
print("When session {} end start".format(session_id))
def get_recorder(app):
replay_engine = app.config["REPLAY_RECORD_ENGINE"]
command_engine = app.config["COMMAND_RECORD_ENGINE"]
if replay_engine == "server":
replay_recorder = ServerReplayRecorder(app)
else:
replay_recorder = ServerReplayRecorder(app)
def session_done(self, session_id): if command_engine == "server":
print("Session {} done".format(session_id)) command_recorder = ServerCommandRecorder(app)
else:
command_recorder = ServerCommandRecorder(app)
return replay_recorder, command_recorder
#!coding: utf-8 #!coding: utf-8
import os
import threading import threading
import uuid import uuid
import socket
import logging import logging
import datetime import datetime
import time
import selectors import selectors
import weakref import weakref
...@@ -30,6 +27,10 @@ class Session: ...@@ -30,6 +27,10 @@ class Session:
self.stop_evt = threading.Event() self.stop_evt = threading.Event()
self.sel = selectors.DefaultSelector() self.sel = selectors.DefaultSelector()
self.server.set_session(self) self.server.set_session(self)
self._replay_queue = None
self._command_queue = None
self._replay_recorder = None
self._command_recorder = None
@property @property
def app(self): def app(self):
...@@ -133,8 +134,6 @@ class Session: ...@@ -133,8 +134,6 @@ class Session:
self.stop_evt.set() self.stop_evt.set()
self.date_finished = datetime.datetime.now() self.date_finished = datetime.datetime.now()
self.server.close() self.server.close()
for c in self._watchers + self._sharers:
c.close()
def to_json(self): def to_json(self):
return { return {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment