Commit 859cc410 authored by ibuler's avatar ibuler

[Update] 修改logger为session级别

parent 457fc034
......@@ -14,9 +14,8 @@ from .config import Config
from .sshd import SSHServer
from .httpd import HttpServer
from .logger import create_logger
from .alignment import get_queue
from .record import get_recorder, START_SENTINEL, END_SENTINEL
from .tasks import TaskHandler
from .recorder import get_command_recorder_class, get_replay_recorder_class
__version__ = '0.4.0'
......@@ -38,7 +37,7 @@ class Coco:
'ACCESS_KEY_ENV': 'COCO_ACCESS_KEY',
'ACCESS_KEY_FILE': os.path.join(BASE_DIR, 'keys', '.access_key'),
'SECRET_KEY': None,
'LOG_LEVEL': 'INFO',
'LOG_LEVEL': 'DEBUG',
'LOG_DIR': os.path.join(BASE_DIR, 'logs'),
'SESSION_DIR': os.path.join(BASE_DIR, 'sessions'),
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
......@@ -49,11 +48,6 @@ class Coco:
'ADMINS': '',
'REPLAY_RECORD_ENGINE': 'server', # local, server
'COMMAND_RECORD_ENGINE': 'server', # local, server, elasticsearch(not yet)
'QUEUE_ENGINE': 'memory',
'QUEUE_MAX_SIZE': 0,
'MAX_PUSH_THREADS': 5,
'MAX_RECORD_INPUT_LENGTH': 128,
'MAX_RECORD_OUTPUT_LENGTH': 1024,
}
def __init__(self, name=None, root_path=None):
......@@ -67,10 +61,8 @@ class Coco:
self._service = None
self._sshd = None
self._httpd = None
self._replay_queue = None
self._command_queue = None
self._replay_recorder = None
self._command_recorder = None
self.replay_recorder_class = None
self.command_recorder_class = None
self._task_handler = None
@property
......@@ -104,20 +96,22 @@ class Coco:
def load_extra_conf_from_server(self):
pass
def initial_queue(self):
self._replay_queue, self._command_queue = get_queue(self.config)
def initial_recorder(self):
self._replay_recorder, self._command_recorder = get_recorder(self)
self.replay_recorder_class = get_replay_recorder_class(self)
self.command_recorder_class = get_command_recorder_class(self)
def new_command_recorder(self):
return self.command_recorder_class(self)
def new_replay_recorder(self):
return self.replay_recorder_class(self)
def bootstrap(self):
self.make_logger()
self.service.initial()
self.load_extra_conf_from_server()
self.initial_queue()
self.initial_recorder()
self.keep_heartbeat()
self.keep_push_record()
self.monitor_sessions()
def heartbeat(self):
......@@ -143,29 +137,6 @@ class Coco:
thread = threading.Thread(target=func)
thread.start()
def keep_push_record(self):
threads = []
def worker(q, callback, size=10):
while not self.stop_evt.is_set():
data_set = q.mget(size)
if data_set and not callback(data_set):
q.mput(data_set)
for i in range(self.config['MAX_PUSH_THREADS']):
t = threading.Thread(target=worker, args=(
self._command_queue, self._command_recorder.record_command,
))
threads.append(t)
t = threading.Thread(target=worker, args=(
self._replay_queue, self._replay_recorder.record_replay,
))
threads.append(t)
for t in threads:
t.start()
logger.info("Start push record process: {}".format(t))
def monitor_sessions(self):
interval = self.config["HEARTBEAT_INTERVAL"]
......@@ -241,8 +212,6 @@ class Coco:
with self.lock:
self.sessions.append(session)
self.heartbeat()
self.put_command_start_queue(session)
self.put_replay_start_queue(session)
def remove_session(self, session):
with self.lock:
......@@ -250,66 +219,7 @@ class Coco:
for i in range(10):
if self.heartbeat():
self.sessions.remove(session)
self.put_command_done_queue(session)
self.put_replay_done_queue(session)
break
else:
time.sleep(1)
def put_replay_queue(self, session, data):
logger.info("Put replay data: {} {}".format(session, data))
self._replay_queue.put({
"session": session.id,
"data": data,
"timestamp": time.time()
})
def put_replay_start_queue(self, session):
self._replay_queue.put({
"session": session.id,
"data": START_SENTINEL,
"timestamp": time.time()
})
def put_replay_done_queue(self, session):
self._replay_queue.put({
"session": session.id,
"data": END_SENTINEL,
"timestamp": time.time()
})
def put_command_queue(self, session, _input, _output):
logger.debug("Put command data: {} {} {}".format(session, _input, _output))
if not _input:
return
self._command_queue.put({
"session": session.id,
"input": _input[:128],
"output": _output[:1024],
"user": session.client.user.username,
"asset": session.server.asset.hostname,
"system_user": session.server.system_user.username,
"timestamp": int(time.time())
})
def put_command_start_queue(self, session):
self._command_queue.put({
"session": session.id,
"input": START_SENTINEL,
"output": START_SENTINEL,
"user": session.client.user.username,
"asset": session.server.asset.hostname,
"system_user": session.server.system_user.username,
"timestamp": int(time.time())
})
def put_command_done_queue(self, session):
self._command_queue.put({
"session": session.id,
"input": END_SENTINEL,
"output": END_SENTINEL,
"user": session.client.user.username,
"asset": session.server.asset.hostname,
"system_user": session.server.system_user.username,
"timestamp": int(time.time())
})
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import io
import os
import paramiko
import logging
import socket
from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room
......@@ -13,7 +11,7 @@ import uuid
# Todo: Remove for future
from jms.models import User
from .models import Request, Client, WSProxy
from .forward import ProxyServer
from .proxy import ProxyServer
__version__ = '0.4.0'
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
......
......@@ -15,7 +15,7 @@ from .utils import wrap_with_line_feed as wr, wrap_with_title as title, \
wrap_with_primary as primary, wrap_with_warning as warning, \
is_obj_attr_has, is_obj_attr_eq, sort_assets, TtyIOParser, \
ugettext as _
from .forward import ProxyServer
from .proxy import ProxyServer
logger = logging.getLogger(__file__)
......@@ -115,15 +115,15 @@ class InteractiveServer:
return self._sentinel
elif opt.startswith("/"):
self.search_and_display(opt.lstrip("/"))
elif opt in ['p', 'P', '3']:
elif opt in ['p', 'P']:
self.display_assets()
elif opt in ['g', 'G', '4']:
elif opt in ['g', 'G']:
self.display_asset_groups()
elif opt.startswith("g") and opt.lstrip("g").isdigit():
self.display_group_assets(int(opt.lstrip("g")))
elif opt in ['q', 'Q', '0']:
elif opt in ['q', 'Q']:
return self._sentinel
elif opt in ['h', 'H', '9']:
elif opt in ['h', 'H']:
self.display_banner()
else:
self.search_and_proxy(opt)
......
......@@ -26,14 +26,14 @@ def create_logger(app):
log_path = os.path.join(log_dir, 'coco.log')
logger = logging.getLogger()
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(module)s %(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
main_formatter = logging.Formatter(
fmt='%(asctime)s [%(levelname)s] %(message)s',
fmt='%(asctime)s [%(module)s %(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
console_handler = StreamHandler()
file_handler = TimedRotatingFileHandler(
filename=log_path, when='D', backupCount=10
......@@ -43,3 +43,4 @@ def create_logger(app):
handler.setFormatter(main_formatter)
logger.addHandler(handler)
logger.setLevel(level)
logging.getLogger("requests").setLevel(logging.WARNING)
......@@ -8,7 +8,6 @@ import weakref
from . import char
from . import utils
from .record import START_SENTINEL, END_SENTINEL
BUF_SIZE = 4096
logger = logging.getLogger(__file__)
......@@ -87,24 +86,23 @@ class Server:
self._input_initial = False
self._in_vim_state = False
self.filters = []
self._input = ""
self._output = ""
self._session_ref = None
@property
def session(self):
return self._session_ref() if self._session_ref is not None else None
def add_filter(self, _filter):
self.filters.append(_filter)
def fileno(self):
return self.chan.fileno()
def set_session(self, session):
self._session_ref = weakref.ref(session)
@property
def session(self):
if self._session_ref:
return self._session_ref()
else:
return None
def send(self, b):
if isinstance(b, str):
b = b.encode("utf-8")
......@@ -125,14 +123,11 @@ class Server:
self.session.put_command(self._input, self._output)
del self.input_data[:]
del self.output_data[:]
# self._input = ""
# self._output = ""
self._in_input_state = True
return self.chan.send(b)
def recv(self, size):
data = self.chan.recv(size)
self.session.put_replay(data)
if self._input_initial:
if self._in_input_state:
self.input_data.append(data)
......
......@@ -27,7 +27,6 @@ class ProxyServer:
self.request = client.request
self.server = None
self.connecting = True
self.session = None
@property
def app(self):
......@@ -38,11 +37,17 @@ class ProxyServer:
self.server = self.get_server_conn(asset, system_user)
if self.server is None:
return
self.session = Session(self.app, self.client, self.server)
self.app.add_session(self.session)
command_recorder = self.app.new_command_recorder()
replay_recorder = self.app.new_replay_recorder()
session = Session(
self.client, self.server,
command_recorder=command_recorder,
replay_recorder=replay_recorder,
)
self.app.add_session(session)
self.watch_win_size_change_async()
self.session.bridge()
self.app.remove_session(self.session)
session.bridge()
self.app.remove_session(session)
def validate_permission(self, asset, system_user):
"""
......
......@@ -4,80 +4,92 @@
import abc
import logging
import threading
import os
from .alignment import MemoryQueue
logger = logging.getLogger(__file__)
BUF_SIZE = 1024
START_SENTINEL = object()
END_SENTINEL = object()
class Singleton(type):
def __init__(cls, *args, **kwargs):
cls.__instance = None
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
else:
return cls.__instance
class ReplayRecorder(metaclass=abc.ABCMeta):
def __init__(self, app):
def __init__(self, app, session=None):
self.app = app
self.session = session
@abc.abstractmethod
def record_replay(self, data_set):
def record(self, data):
"""
记录replay数据
:param data_set: 数据集 [{"session": "", "data": "", "timestamp": ""},]
:param data: 数据 {
"session": "",
"data": "",
"timestamp": ""
}
:return:
"""
for data in data_set:
if data["data"] is START_SENTINEL:
data_set.remove(data)
self.session_start(data["session"])
if data["data"] is END_SENTINEL:
data_set.remove(data)
self.session_end(data["session"])
@abc.abstractmethod
def session_start(self, session_id):
print("Session start")
print("Session start: {}".format(session_id))
pass
@abc.abstractmethod
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
class CommandRecorder(metaclass=abc.ABCMeta):
def __init__(self, app):
class CommandRecorder:
def __init__(self, app, session=None):
self.app = app
self.session = session
@abc.abstractmethod
def record_command(self, data_set):
def record(self, data):
"""
:param data_set: 数据集
[("session", "input", "output", "user",
"asset", "system_user", "timestamp"),]
:param data: 数据 {
"session":
"input":
"output":
"user":
"asset":
"system_user":
"timestamp":
}
:return:
"""
for data in data_set:
if data["input"] is START_SENTINEL:
data_set.remove(data)
self.session_start(data["session"])
if data["input"] is END_SENTINEL:
data_set.remove(data)
self.session_end(data["session"])
@abc.abstractmethod
def session_start(self, session_id):
print("Session start: {}".format(session_id))
pass
@abc.abstractmethod
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
class ServerReplayRecorder(ReplayRecorder):
filelist = dict()
def __init__(self, app):
super().__init__(app)
self.file = None
def record_replay(self, data_set):
def record(self, data):
"""
:param data_set:
:param data:
[{
"session": session.id,
"data": data,
......@@ -86,52 +98,84 @@ class ServerReplayRecorder(ReplayRecorder):
:return:
"""
# Todo: <liuzheng712@gmail.com>
super().record_replay(data_set)
for data in data_set:
try:
ServerReplayRecorder.filelist[data["session"]].write(str(data) + '\n')
except KeyError:
logger.error("session ({})file does not exist!".format(data["session"]))
except ValueError:
logger.error("session ({}) file cloesd!".format(data["session"]))
return True
self.file.write(data)
def session_start(self, session_id):
ServerReplayRecorder.filelist[session_id] = open('logs/' + session_id + '.log', 'a')
print("When session {} start exec".format(session_id))
self.file = open(os.path.join(
self.app.config['LOG_DIR'], session_id + '.replay'
), 'a')
def session_end(self, session_id):
ServerReplayRecorder.filelist[session_id].close()
# Todo: upload the file
print("When session {} end start".format(session_id))
self.file.close()
def push_to_server(self):
pass
class ServerCommandRecorder(CommandRecorder):
def record_command(self, data_set):
if not data_set:
return True
super().record_command(data_set)
return self.app.service.push_session_command(data_set)
def __del__(self):
print("{} has been gc".format(self))
del self.file
class ServerCommandRecorder(CommandRecorder, metaclass=Singleton):
batch_size = 10
timeout = 5
no = 0
def __init__(self, app):
super().__init__(app)
self.queue = MemoryQueue()
self.stop_evt = threading.Event()
self.push_to_server_async()
self.__class__.no += 1
def record(self, data):
if data and data['input']:
data['input'] = data['input'][:128]
data['output'] = data['output'][:1024]
data['timestamp'] = int(data['timestamp'])
self.queue.put(data)
def push_to_server_async(self):
def func():
while not self.stop_evt.is_set():
data_set = self.queue.mget(self.batch_size, timeout=self.timeout)
logger.debug("<Session command recorder {}> queue size: {}".format(
self.no, self.queue.qsize())
)
if not data_set:
continue
logger.debug("Send {} commands to server".format(len(data_set)))
ok = self.app.service.push_session_command(data_set)
if not ok:
self.queue.mput(data_set)
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
def session_start(self, session_id):
print("When session {} start exec".format(session_id))
def session_end(self, session_id):
self.stop_evt.set()
print("When session {} end start".format(session_id))
def __del__(self):
print("{} has been gc".format(self))
def get_recorder(app):
replay_engine = app.config["REPLAY_RECORD_ENGINE"]
command_engine = app.config["COMMAND_RECORD_ENGINE"]
if replay_engine == "server":
replay_recorder = ServerReplayRecorder(app)
else:
replay_recorder = ServerReplayRecorder(app)
def get_command_recorder_class(app):
command_engine = app.config["COMMAND_RECORD_ENGINE"]
if command_engine == "server":
command_recorder = ServerCommandRecorder(app)
return ServerCommandRecorder
else:
command_recorder = ServerCommandRecorder(app)
return ServerCommandRecorder
return replay_recorder, command_recorder
def get_replay_recorder_class(app):
replay_engine = app.config["REPLAY_RECORD_ENGINE"]
if replay_engine == "server":
return ServerReplayRecorder
else:
return ServerReplayRecorder
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import threading
import uuid
import logging
import datetime
import selectors
import weakref
import time
BUF_SIZE = 1024
......@@ -16,9 +15,8 @@ logger = logging.getLogger(__file__)
class Session:
def __init__(self, app, client, server):
def __init__(self, client, server, command_recorder=None, replay_recorder=None):
self.id = str(uuid.uuid4())
self._app = weakref.ref(app)
self.client = client # Master of the session, it's a client sock
self.server = server # Server channel
self._watchers = [] # Only watch session
......@@ -28,15 +26,9 @@ class Session:
self.date_finished = None
self.stop_evt = threading.Event()
self.sel = selectors.DefaultSelector()
self._command_recorder = command_recorder
self._replay_recorder = replay_recorder
self.server.set_session(self)
self._replay_queue = None
self._command_queue = None
self._replay_recorder = None
self._command_recorder = None
@property
def app(self):
return self._app()
def add_watcher(self, watcher, silent=False):
"""
......@@ -79,6 +71,40 @@ class Session:
self.sel.unregister(sharer)
self._sharers.remove(sharer)
def set_command_recorder(self, recorder):
self._command_recorder = recorder
def set_replay_recorder(self, recorder):
self._replay_recorder = recorder
def put_command(self, _input, _output):
if not _input:
return
self._command_recorder.record({
"session": self.id,
"input": _input,
"output": _output,
"user": self.client.user.username,
"asset": self.server.asset.hostname,
"system_user": self.server.system_user.username,
"timestamp": time.time(),
})
def put_replay(self, data):
self._replay_recorder.record({
"session": self.id,
"data": data,
"timestamp": time.time(),
})
def pre_bridge(self):
self._replay_recorder.session_start(self.id)
self._command_recorder.session_start(self.id)
def post_bridge(self):
self._replay_recorder.session_end(self.id)
self._command_recorder.session_end(self.id)
def terminate(self):
msg = b"Terminate by administrator\r\n"
self.client.send(msg)
......@@ -90,6 +116,7 @@ class Session:
:return:
"""
logger.info("Start bridge session: {}".format(self.id))
self.pre_bridge()
self.sel.register(self.client, selectors.EVENT_READ)
self.sel.register(self.server, selectors.EVENT_READ)
while not self.stop_evt.is_set():
......@@ -130,15 +157,10 @@ class Session:
logger.debug("Resize server chan size {}*{}".format(width, height))
self.server.resize_pty(width=width, height=height)
def put_command(self, _input, _output):
self.app.put_command_queue(self, _input, _output)
def put_replay(self, data):
self.app.put_replay_queue(self, data)
def close(self):
logger.info("Close the session: {} ".format(self.id))
self.stop_evt.set()
self.post_bridge()
self.date_finished = datetime.datetime.now()
self.server.close()
......
......@@ -21,9 +21,6 @@ APP_NAME = "coco"
# 监听的HTTP/WS端口号,默认5000
# HTTPD_PORT = 5000
# 是否开启DEBUG
# DEBUG = True
# 项目使用的ACCESS KEY, 默认会注册,并保存到 ACCESS_KEY_STORE中,
# 如果有需求, 可以写到配置文件中, 格式 access_key_id:access_key_secret
# ACCESS_KEY = None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment