Commit c1750831 authored by 广宏伟's avatar 广宏伟

Merged in test (pull request #1)

Merge test
parents bcfeaa2d a51bce5c
.git
logs/*
keys/*
env/
.idea
*.pyc
*.pyo
......@@ -5,3 +6,5 @@
*.log
logs/*
conf.py
host_rsa_key
sessions/*
FROM registry.fit2cloud.com/jumpserver/python:v3
MAINTAINER Jumpserver Team <ibuler@qq.com>
COPY . /opt/coco
WORKDIR /opt/coco
RUN cd requirements && yum -y install $(cat rpm_requirements.txt) && \
pip install -r requirements.txt
VOLUME /opt/coco/logs
VOLUME /opt/coco/keys
RUN cp conf_docker.py conf.py
EXPOSE 2222
CMD python run_server.py
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from .app import Coco
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import queue
class MultiQueueMixin:
def mget(self, size=1, block=True, timeout=5):
items = []
for i in range(size):
try:
items.append(self.get(block=block, timeout=timeout))
except queue.Empty:
break
return items
def mput(self, data_set):
for i in data_set:
self.put(i)
class MemoryQueue(MultiQueueMixin, queue.Queue):
pass
def get_queue(config):
queue_engine = config['QUEUE_ENGINE']
queue_size = config['QUEUE_MAX_SIZE']
if queue_engine == "server":
replay_queue = MemoryQueue(queue_size)
command_queue = MemoryQueue(queue_size)
else:
replay_queue = MemoryQueue(queue_size)
command_queue = MemoryQueue(queue_size)
return replay_queue, command_queue
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import datetime
import os
import time
import threading
import logging
import socket
from jms.service import AppService
from .config import Config
from .sshd import SSHServer
from .ws import WSServer
from .logging import create_logger
from .httpd import HttpServer
from .logger import create_logger
from .tasks import TaskHandler
from .recorder import get_command_recorder_class, get_replay_recorder_class
__version__ = '0.4.0'
......@@ -18,66 +28,147 @@ logger = logging.getLogger(__file__)
class Coco:
config_class = Config
default_config = {
'NAME': 'coco',
'NAME': socket.gethostname(),
'CORE_HOST': 'http://127.0.0.1:8080',
'DEBUG': True,
'BIND_HOST': '0.0.0.0',
'SSHD_PORT': 2222,
'WS_PORT': 5000,
'HTTPD_PORT': 5000,
'ACCESS_KEY': '',
'ACCESS_KEY_ENV': 'COCO_ACCESS_KEY',
'ACCESS_KEY_FILE': os.path.join(BASE_DIR, 'keys', '.access_key'),
'SECRET_KEY': None,
'LOG_LEVEL': 'INFO',
'LOG_LEVEL': 'DEBUG',
'LOG_DIR': os.path.join(BASE_DIR, 'logs'),
'SESSION_DIR': os.path.join(BASE_DIR, 'sessions'),
'ASSET_SORT_BY': 'hostname', # hostname, ip
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
'SSH_PASSWORD_AUTH': True,
'SSH_PUBLIC_KEY_AUTH': True,
'HEARTBEAT_INTERVAL': 5,
'MAX_CONNECTIONS': 500,
'ADMINS': '',
'REPLAY_RECORD_ENGINE': 'server', # local, server
'COMMAND_RECORD_ENGINE': 'server', # local, server, elasticsearch(not yet)
}
def __init__(self, name=None, root_path=None):
self.config = self.config_class(BASE_DIR, defaults=self.default_config)
self.root_path = root_path if root_path else BASE_DIR
self.config = self.config_class(self.root_path, defaults=self.default_config)
self.name = name if name else self.config["NAME"]
self.sessions = []
self.clients = []
self.ws = None
self.root_path = root_path
self.name = name
self.lock = threading.Lock()
self.stop_evt = threading.Event()
if name is None:
self.name = self.config['NAME']
if root_path is None:
self.root_path = BASE_DIR
self.make_logger()
self.sshd = None
self.running = True
self._service = None
self._sshd = None
self._httpd = None
self.replay_recorder_class = None
self.command_recorder_class = None
self._task_handler = None
@property
def service(self):
if self._service is None:
self._service = AppService(self)
return self._service
@property
def sshd(self):
if self._sshd is None:
self._sshd = SSHServer(self)
return self._sshd
@property
def httpd(self):
if self._httpd is None:
self._httpd = HttpServer(self)
return self._httpd
@property
def task_handler(self):
if self._task_handler is None:
self._task_handler = TaskHandler(self)
return self._task_handler
def make_logger(self):
create_logger(self)
def prepare(self):
self.sshd = SSHServer(self)
self.ws = WSServer(self)
# Todo: load some config from server like replay and common upload
def load_extra_conf_from_server(self):
pass
def initial_recorder(self):
self.replay_recorder_class = get_replay_recorder_class(self)
self.command_recorder_class = get_command_recorder_class(self)
def new_command_recorder(self):
return self.command_recorder_class(self)
def new_replay_recorder(self):
return self.replay_recorder_class(self)
def bootstrap(self):
self.make_logger()
self.service.initial()
self.load_extra_conf_from_server()
self.initial_recorder()
self.keep_heartbeat()
self.monitor_sessions()
def heartbeat(self):
pass
_sessions = [s.to_json() for s in self.sessions]
tasks = self.service.terminal_heartbeat(_sessions)
if tasks:
self.handle_task(tasks)
if tasks is False:
return False
else:
return True
def handle_task(self, tasks):
for task in tasks:
self.task_handler.handle(task)
def keep_heartbeat(self):
def func():
while not self.stop_evt.is_set():
self.heartbeat()
time.sleep(self.config["HEARTBEAT_INTERVAL"])
thread = threading.Thread(target=func)
thread.start()
def monitor_sessions(self):
interval = self.config["HEARTBEAT_INTERVAL"]
def func():
while not self.stop_evt.is_set():
for s in self.sessions:
if not s.stop_evt.is_set():
continue
if s.date_finished is None:
self.remove_session(s)
continue
delta = datetime.datetime.now() - s.date_finished
if delta > datetime.timedelta(seconds=interval*5):
self.remove_session(s)
time.sleep(interval)
thread = threading.Thread(target=func)
thread.start()
def run_forever(self):
self.prepare()
self.bootstrap()
print(time.ctime())
print('Coco version %s, more see https://www.jumpserver.org' % __version__)
print('Coco version {}, more see https://www.jumpserver.org'.format(__version__))
print('Quit the server with CONTROL-C.')
try:
if self.config["SSHD_PORT"] != 0:
self.run_sshd()
if self.config['WS_PORT'] != 0:
self.run_ws()
if self.config['HTTPD_PORT'] != 0:
self.run_httpd()
self.stop_evt.wait()
except KeyboardInterrupt:
......@@ -89,8 +180,8 @@ class Coco:
thread.daemon = True
thread.start()
def run_ws(self):
thread = threading.Thread(target=self.ws.run, args=())
def run_httpd(self):
thread = threading.Thread(target=self.httpd.run, args=())
thread.daemon = True
thread.start()
......@@ -98,25 +189,38 @@ class Coco:
for client in self.clients:
self.remove_client(client)
time.sleep(1)
self.stop_evt.set()
self.sshd.shutdown()
self.httpd.shutdown()
logger.info("Grace shutdown the server")
def add_client(self, client):
with self.lock:
self.clients.append(client)
logger.info("New client %s join, total %d now" % (client, len(self.clients)))
logger.info("New client {} join, total {} now".format(client, len(self.clients)))
def remove_client(self, client):
with self.lock:
try:
self.clients.remove(client)
logger.info("Client %s leave, total %d now" % (client, len(self.clients)))
client.send("Closed by server")
logger.info("Client {} leave, total {} now".format(client, len(self.clients)))
client.close()
del client
except:
pass
def monitor_session(self):
pass
def add_session(self, session):
with self.lock:
self.sessions.append(session)
self.heartbeat()
def remove_session(self, session):
with self.lock:
logger.info("Remove session: {}".format(session))
for i in range(10):
if self.heartbeat():
self.sessions.remove(session)
break
else:
time.sleep(1)
#!coding: utf-8
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
BACKSPACE_CHAR = {b'\x08': b'\x08\x1b[K', b'\x7f': b'\x08\x1b[K'}
ENTER_CHAR = [b'\r', b'\n', b'\r\n']
UNSUPPORTED_CHAR = {b'\x15': 'Ctrl-U', b'\x0c': 'Ctrl-L', b'\x05': 'Ctrl-E'}
CLEAR_CHAR = b'\x1b[H\x1b[2J'
BELL_CHAR = b'\x07'
NEW_LINE = b'\r\n'
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
"""
兼容Python版本
"""
import sys
is_py2 = (sys.version_info[0] == 2)
is_py3 = (sys.version_info[0] == 3)
try:
import simplejson as json
except (ImportError, SyntaxError):
import json
if is_py2:
def to_bytes(data):
"""若输入为unicode, 则转为utf-8编码的bytes;其他则原样返回。"""
if isinstance(data, unicode):
return data.encode('utf-8')
else:
return data
def to_string(data):
"""把输入转换为str对象"""
return to_bytes(data)
def to_unicode(data):
"""把输入转换为unicode,要求输入是unicode或者utf-8编码的bytes。"""
if isinstance(data, bytes):
return data.decode('utf-8')
else:
return data
def stringify(input):
if isinstance(input, dict):
return dict([(stringify(key), stringify(value)) for key,value in input.iteritems()])
elif isinstance(input, list):
return [stringify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input
builtin_str = str
bytes = str
str = unicode
elif is_py3:
def to_bytes(data):
"""若输入为str(即unicode),则转为utf-8编码的bytes;其他则原样返回"""
if isinstance(data, str):
return data.encode(encoding='utf-8')
else:
return data
def to_string(data):
"""若输入为bytes,则认为是utf-8编码,并返回str"""
if isinstance(data, bytes):
return data.decode('utf-8')
else:
return data
def to_unicode(data):
"""把输入转换为unicode,要求输入是unicode或者utf-8编码的bytes。"""
return to_string(data)
def stringify(input):
return input
builtin_str = str
bytes = bytes
str = str
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
"""
coco.config
~~~~~~~~~~~~
......@@ -261,3 +264,6 @@ class Config(dict):
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, dict.__repr__(self))
# coding: utf-8
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
class PermissionFailed(Exception):
pass
class NoAppException(Exception):
pass
This diff is collapsed.
This diff is collapsed.
#!coding: utf-8
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import logging
import paramiko
import threading
import weakref
logger = logging.getLogger(__file__)
......@@ -17,9 +20,14 @@ class SSHInterface(paramiko.ServerInterface):
"""
def __init__(self, app, request):
self.app = app
self._app = weakref.ref(app)
self.request = request
self.event = threading.Event()
self.auth_valid = False
@property
def app(self):
return self._app()
def check_auth_interactive(self, username, submethods):
logger.info("Check auth interactive: %s %s" % (username, submethods))
......@@ -34,23 +42,48 @@ class SSHInterface(paramiko.ServerInterface):
return False
def get_allowed_auths(self, username):
# Todo: Check with server settings or self config
return ",".join(["password", "publickkey"])
supported = []
if self.app.config["SSH_PASSWORD_AUTH"]:
supported.append("password")
if self.app.config["SSH_PUBLIC_KEY_AUTH"]:
supported.append("publickey")
return ",".join(supported)
def check_auth_none(self, username):
return paramiko.AUTH_FAILED
def check_auth_password(self, username, password):
return self.validate_auth(username, password=password)
valid = self.validate_auth(username, password=password)
if not valid:
logger.warning("Password and public key auth <%s> failed, reject it" % username)
return paramiko.AUTH_FAILED
else:
logger.info("Password auth <%s> success" % username)
return paramiko.AUTH_SUCCESSFUL
def check_auth_publickey(self, username, key):
return self.validate_auth(username, key=key)
def validate_auth(self, username, password="", key=""):
# Todo: Implement it
self.request.user = "guang"
key = key.get_base64()
valid = self.validate_auth(username, public_key=key)
if not valid:
logger.debug("Public key auth <%s> failed, try to password" % username)
return paramiko.AUTH_FAILED
else:
logger.debug("Public key auth <%s> success" % username)
return paramiko.AUTH_SUCCESSFUL
def validate_auth(self, username, password="", public_key=""):
user, _ = self.app.service.authenticate(
username, password=password, public_key=public_key,
remote_addr=self.request.remote_ip,
)
if user:
self.request.user = user
return True
else:
return False
def check_channel_direct_tcpip_request(self, chanid, origin, destination):
logger.debug("Check channel direct tcpip request: %d %s %s" %
(chanid, origin, destination))
......
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
......@@ -26,17 +26,21 @@ def create_logger(app):
log_path = os.path.join(log_dir, 'coco.log')
logger = logging.getLogger()
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(module)s %(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S')
main_formatter = logging.Formatter(
fmt='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
fmt='%(asctime)s [%(module)s %(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
console_handler = StreamHandler()
file_handler = TimedRotatingFileHandler(
filename=log_path, when='D', backupCount=10)
filename=log_path, when='D', backupCount=10
)
for handler in [console_handler, file_handler]:
handler.setFormatter(main_formatter)
logger.addHandler(handler)
logger.setLevel(level)
logging.getLogger("requests").setLevel(logging.WARNING)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import threading
import datetime
import logging
import weakref
from . import char
from . import utils
BUF_SIZE = 4096
class Decoder:
def __init__(self, **kwargs):
for attr, val in kwargs.items():
setattr(self, attr, val)
@classmethod
def from_json(cls, json_str):
json_dict = json.loads(json_str)
return cls(**json_dict)
@classmethod
def from_multi_json(cls, json_list):
json_dict_list = json.loads(json_list)
return [cls(**json_dict) for json_dict in json_dict_list]
class User(Decoder):
id = ""
username = ""
name = ""
def __str__(self):
return self.name
__repr__ = __str__
class Asset(Decoder):
id = ""
hostname = ""
ip = ""
port = 22
def __str__(self):
return self.hostname
__repr__ = __str__
class SystemUser(Decoder):
id = ""
name = ""
username = ""
password = ""
private_key = None
def __str__(self):
return self.name
__repr__ = __str__
logger = logging.getLogger(__file__)
class Request:
......@@ -87,76 +43,104 @@ class Client:
return self.chan.fileno()
def send(self, b):
if isinstance(b, str):
b = b.encode("utf-8")
return self.chan.send(b)
def recv(self, size):
return self.chan.recv(size)
def close(self):
logger.info("Client {} close".format(self))
return self.chan.close()
def __getattr__(self, item):
return getattr(self.chan, item)
def __str__(self):
return "<%s from %s:%s>" % (self.user, self.addr[0], self.addr[1])
def __del__(self):
logger.info("GC client object: {}".format(self))
class Server:
"""
Server object like client, a wrapper object, a connection to the asset,
Because we don't want to using python dynamic feature, such asset
have the chan and system_user attr.
"""
# Todo: Server name is not very proper
# Todo: Server name is not very suitable
def __init__(self, chan, asset, system_user):
self.chan = chan
self.asset = asset
self.system_user = system_user
self.send_bytes = 0
self.recv_bytes = 0
self.stop_evt = threading.Event()
self.input_data = []
self.output_data = []
self.input = ''
self.output = ''
self._in_input_state = True
self._input_initial = False
self._in_vim_state = False
self._input = ""
self._output = ""
self._session_ref = None
def fileno(self):
return self.chan.fileno()
def set_session(self, session):
self._session_ref = weakref.ref(session)
@property
def session(self):
if self._session_ref:
return self._session_ref()
else:
return None
def send(self, b):
if isinstance(b, str):
b = b.encode("utf-8")
if not self._input_initial:
self._input_initial = True
if self._have_enter_char(b):
self._in_input_state = False
self._input = self._parse_input()
else:
if not self._in_input_state:
print("#" * 30 + " 新周期 " + "#" * 30)
self._parse_input()
self._parse_output()
self._output = self._parse_output()
logger.debug("\n{}\nInput: {}\nOutput: {}\n{}".format(
"#" * 30 + " Command " + "#" * 30,
self._input, self._output,
"#" * 30 + " End " + "#" * 30,
))
self.session.put_command(self._input, self._output)
del self.input_data[:]
del self.output_data[:]
self._in_input_state = True
print("Send: %s" % b)
return self.chan.send(b)
def recv(self, size):
data = self.chan.recv(size)
print("Recv: %s" % data)
self.session.put_replay(data)
if self._input_initial:
if self._in_input_state:
self.input_data.append(data)
else:
self.output_data.append(data)
return data
def close(self):
logger.info("Closed server {}".format(self))
self.chan.close()
return self.chan.transport.close()
self.stop_evt.set()
self.chan.transport.close()
@staticmethod
def _have_enter_char(s):
......@@ -167,18 +151,20 @@ class Server:
def _parse_output(self):
parser = utils.TtyIOParser()
print("\tOutput: %s" % parser.parse_output(self.output_data))
return parser.parse_output(self.output_data)
def _parse_input(self):
parser = utils.TtyIOParser()
print("\tInput: %s" % parser.parse_input(self.input_data))
return parser.parse_input(self.input_data)
def __getattr__(self, item):
return getattr(self.chan, item)
def __str__(self):
return "<%s@%s:%s>" % (self.system_user.username,
self.asset.hostname, self.asset.port)
return "<To: {}>".format(str(self.asset))
def __del__(self):
logger.info("GC server object: {}".format(self))
class WSProxy:
......@@ -201,7 +187,7 @@ class WSProxy:
```
"""
def __init__(self, ws, child):
def __init__(self, ws, child, room):
"""
:param ws: websocket instance or handler, have write_message method
:param child: sock child pair
......@@ -209,7 +195,7 @@ class WSProxy:
self.ws = ws
self.child = child
self.stop_event = threading.Event()
self.room = room
self.auto_forward()
def send(self, msg):
......@@ -230,7 +216,7 @@ class WSProxy:
data = self.child.recv(BUF_SIZE)
if len(data) == 0:
self.close()
self.ws.write_message(json.dumps({"data": data.decode("utf-8")}))
self.ws.emit("data", data.decode("utf-8"), room=self.room)
def auto_forward(self):
thread = threading.Thread(target=self.forward, args=())
......@@ -239,5 +225,5 @@ class WSProxy:
def close(self):
self.stop_event.set()
self.ws.close()
self.child.close()
self.ws.on_disconnect()
# coding: utf-8
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import socket
import threading
import logging
import time
import weakref
import paramiko
import time
from .session import Session
from .models import Server
from .utils import wrap_with_line_feed as wr, wrap_with_warning as warning
logger = logging.getLogger(__file__)
TIMEOUT = 8
BUF_SIZE = 4096
class ProxyServer:
def __init__(self, app, client):
self.app = app
self._app = weakref.ref(app)
self.client = client
self.request = client.request
self.server = None
self.connecting = True
@property
def app(self):
return self._app()
def proxy(self, asset, system_user):
self.send_connecting_message()
self.send_connecting_message(asset, system_user)
self.server = self.get_server_conn(asset, system_user)
if self.server is None:
return
session = Session(self.client, self.server)
self.app.sessions.append(session)
command_recorder = self.app.new_command_recorder()
replay_recorder = self.app.new_replay_recorder()
session = Session(
self.client, self.server,
command_recorder=command_recorder,
replay_recorder=replay_recorder,
)
self.app.add_session(session)
self.watch_win_size_change_async()
session.record_async()
session.bridge()
self.app.sessions.remove(session)
self.app.remove_session(session)
def validate_permission(self, asset, system_user):
"""
Validate use is have the permission to connect this asset using that
system user
验证用户是否有连接改资产的权限
:return: True or False
"""
return True
return self.app.service.validate_user_asset_permission(
self.client.user.id, asset.id, system_user.id
)
def get_system_user_auth(self, system_user):
"""
Get the system user auth ..., using this to connect asset
获取系统用户的认证信息,密码或秘钥
:return: system user have full info
"""
system_user.password, system_user.private_key = \
self.app.service.get_system_user_auth_info(system_user)
def get_server_conn(self, asset, system_user):
logger.info("Connect to %s" % asset.hostname)
logger.info("Connect to {}".format(asset.hostname))
if not self.validate_permission(asset, system_user):
self.client.send(b'No permission')
self.client.send(warning(_('No permission')))
return None
self.get_system_user_auth(system_user)
if True:
server = self.get_ssh_server_conn(asset, system_user)
else:
server = self.get_ssh_server_conn(asset, system_user)
return server
# Todo: Support telnet
def get_telnet_server_conn(self, asset, system_user):
pass
def get_ssh_server_conn(self, asset, system_user):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(asset.ip, port=asset.port,
username=system_user.username,
password=system_user.password,
pkey=system_user.private_key,
timeout=TIMEOUT)
except paramiko.AuthenticationException as e:
self.client.send("[Errno 66] Authentication failed: {}".format(e).encode("utf-8"))
ssh.connect(
asset.ip, port=asset.port, username=system_user.username,
password=system_user.password, pkey=system_user.private_key,
timeout=TIMEOUT, compress=True, auth_timeout=10,
look_for_keys=False
)
except paramiko.AuthenticationException:
admins = self.app.config['ADMINS'] or 'administrator'
self.client.send(warning(wr(
"Authenticate with server failed, contact {}".format(admins),
before=1, after=0
)))
key_fingerprint = system_user.private_key.get_hex() if system_user.private_key else None
logger.error("Connect {}@{}:{} auth failed, password: {}, key: {}".format(
system_user.username, asset.ip, asset.port,
system_user.password, key_fingerprint,
))
return None
except socket.error as e:
self.client.send(" {}".format(e).encode("utf-8"))
self.client.send(wr(" {}".format(e)))
return None
finally:
self.connecting = False
......@@ -94,15 +130,15 @@ class ProxyServer:
thread.daemon = True
thread.start()
def send_connecting_message(self):
def send_connecting_message(self, asset, system_user):
def func():
delay = 0.0
self.client.send('Connecting to {} {:.1f}'.format('abc.com', delay).encode('utf-8'))
self.client.send('Connecting to {}@{} {:.1f}'.format(system_user, asset, delay))
while self.connecting and delay < TIMEOUT:
self.client.send('\x08\x08\x08{:.1f}'.format(delay).encode('utf-8'))
time.sleep(0.1)
delay += 0.1
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import abc
import logging
import threading
import time
import os
import gzip
import json
import shutil
from .alignment import MemoryQueue
logger = logging.getLogger(__file__)
BUF_SIZE = 1024
class Singleton(type):
def __init__(cls, *args, **kwargs):
cls.__instance = None
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
else:
return cls.__instance
class ReplayRecorder(metaclass=abc.ABCMeta):
def __init__(self, app, session=None):
self.app = app
self.session = session
@abc.abstractmethod
def record(self, data):
"""
记录replay数据
:param data: 数据 {
"session": "",
"data": "",
"timestamp": ""
}
:return:
"""
@abc.abstractmethod
def session_start(self, session_id):
print("Session start: {}".format(session_id))
pass
@abc.abstractmethod
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
class CommandRecorder:
def __init__(self, app, session=None):
self.app = app
self.session = session
def record(self, data):
"""
:param data: 数据 {
"session":
"input":
"output":
"user":
"asset":
"system_user":
"timestamp":
}
:return:
"""
def session_start(self, session_id):
print("Session start: {}".format(session_id))
pass
def session_end(self, session_id):
print("Session end: {}".format(session_id))
pass
class ServerReplayRecorder(ReplayRecorder):
def __init__(self, app):
super().__init__(app)
self.file = None
def record(self, data):
"""
:param data:
[{
"session": session.id,
"data": data,
"timestamp": time.time()
},...]
:return:
"""
# Todo: <liuzheng712@gmail.com>
if len(data['data']) > 0:
# print(json.dumps(
# data['data'].decode('utf-8', 'replace')))
self.file.write(
'"' + str(data['timestamp'] - self.starttime) + '":' + json.dumps(
data['data'].decode('utf-8', 'replace')) + ',')
def session_start(self, session_id):
self.starttime = time.time()
self.file = open(os.path.join(
self.app.config['LOG_DIR'], session_id + '.replay'
), 'a')
self.file.write('{')
def session_end(self, session_id):
self.file.write('"0":""}')
self.file.close()
with open(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay'), 'rb') as f_in, \
gzip.open(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay.gz'), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if self.push_to_server(session_id):
logger.info("Succeed to push {}'s {}".format(session_id, "record"))
else:
logger.error("Failed to push {}'s {}".format(session_id, "record"))
def push_to_server(self, session_id):
return self.app.service.push_session_replay(os.path.join(self.app.config['LOG_DIR'], session_id + '.replay'),
session_id)
def __del__(self):
print("{} has been gc".format(self))
del self.file
class ServerCommandRecorder(CommandRecorder, metaclass=Singleton):
batch_size = 10
timeout = 5
no = 0
def __init__(self, app):
super().__init__(app)
self.queue = MemoryQueue()
self.stop_evt = threading.Event()
self.push_to_server_async()
self.__class__.no += 1
def record(self, data):
if data and data['input']:
data['input'] = data['input'][:128]
data['output'] = data['output'][:1024]
data['timestamp'] = int(data['timestamp'])
self.queue.put(data)
def push_to_server_async(self):
def func():
while not self.stop_evt.is_set():
data_set = self.queue.mget(self.batch_size, timeout=self.timeout)
logger.debug("<Session command recorder {}> queue size: {}".format(
self.no, self.queue.qsize())
)
if not data_set:
continue
logger.debug("Send {} commands to server".format(len(data_set)))
ok = self.app.service.push_session_command(data_set)
if not ok:
self.queue.mput(data_set)
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
def session_start(self, session_id):
pass
def session_end(self, session_id):
pass
def __del__(self):
print("{} has been gc".format(self))
def get_command_recorder_class(app):
command_engine = app.config["COMMAND_RECORD_ENGINE"]
if command_engine == "server":
return ServerCommandRecorder
else:
return ServerCommandRecorder
def get_replay_recorder_class(app):
replay_engine = app.config["REPLAY_RECORD_ENGINE"]
if replay_engine == "server":
return ServerReplayRecorder
else:
return ServerReplayRecorder
This diff is collapsed.
#! coding: utf-8
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import os
import logging
......@@ -17,9 +19,10 @@ BACKLOG = 5
class SSHServer:
def __init__(self, app=None):
def __init__(self, app):
self.app = app
self.stop_event = threading.Event()
self.stop_evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host_key_path = os.path.join(self.app.root_path, 'keys', 'host_rsa_key')
......@@ -37,20 +40,19 @@ class SSHServer:
def run(self):
host = self.app.config["BIND_HOST"]
port = self.app.config["SSHD_PORT"]
print('Starting ssh server at %(host)s:%(port)s' %
{"host": host, "port": port})
print('Starting ssh server at {}:{}'.format(host, port))
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((host, port))
self.sock.listen(BACKLOG)
while not self.stop_event.is_set():
while not self.stop_evt.is_set():
try:
sock, addr = self.sock.accept()
logger.info("Get ssh request from %s: %s" % (addr[0], addr[1]))
logger.info("Get ssh request from {}: {}".format(addr[0], addr[1]))
thread = threading.Thread(target=self.handle, args=(sock, addr))
thread.daemon = True
thread.start()
except Exception as e:
logger.error("SSH server error: %s" % e)
logger.error("Start SSH server error: {}".format(e))
def handle(self, sock, addr):
transport = paramiko.Transport(sock, gss_kex=False)
......@@ -65,21 +67,21 @@ class SSHServer:
try:
transport.start_server(server=server)
except paramiko.SSHException:
logger.warning("SSH negotiation failed.")
sys.exit(1)
logger.warning("SSH negotiation failed")
return
except EOFError:
logger.warning("EOF Error")
sys.exit(1)
logger.warning("Handle EOF Error")
return
chan = transport.accept(10)
if chan is None:
logger.warning("No ssh channel get")
sys.exit(1)
return
server.event.wait(5)
if not server.event.is_set():
logger.warning("Client not request a valid request")
sys.exit(2)
logger.warning("Client not request a valid request, exiting")
return
client = Client(chan, request)
self.app.add_client(client)
......@@ -88,7 +90,8 @@ class SSHServer:
def dispatch(self, client):
request_type = client.request.type
if request_type == 'pty':
InteractiveServer(self.app, client).activate()
logger.info("Request type `pty`, dispatch to interactive mode")
InteractiveServer(self.app, client).interact()
elif request_type == 'exec':
pass
elif request_type == 'subsystem':
......@@ -97,4 +100,4 @@ class SSHServer:
client.send("Not support request type: %s" % request_type)
def shutdown(self):
self.stop_event.set()
self.stop_evt.set()
# coding: utf-8
import weakref
import logging
logger = logging.getLogger(__file__)
class TaskHandler:
def __init__(self, app):
self._app = weakref.ref(app)
@property
def app(self):
return self._app()
def handle_kill_session(self, task):
logger.info("Handle kill session task: {}".format(task.args))
session_id = task.args
session = None
for s in self.app.sessions:
if s.id == session_id:
session = s
break
if session:
session.terminate()
self.app.service.finish_task(task.id)
def handle(self, task):
if task.name == "kill_session":
self.handle_kill_session(task)
else:
logger.error("No handler for this task: {}".format(task.name))
#!coding: utf-8
import base64
import calendar
import os
import re
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from __future__ import unicode_literals
import paramiko
from io import StringIO
import hashlib
import re
import os
import threading
import base64
import calendar
import time
import datetime
import gettext
from io import StringIO
import paramiko
import pyte
import pytz
from email.utils import formatdate
from queue import Queue, Empty
from .exception import NoAppException
BASE_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def ssh_key_string_to_obj(text):
......@@ -80,7 +92,8 @@ def content_md5(data):
"""
if isinstance(data, str):
data = hashlib.md5(data.encode('utf-8'))
return base64.b64encode(data.digest())
value = base64.b64encode(data.digest())
return value.decode('utf-8')
_STRPTIME_LOCK = threading.Lock()
......@@ -115,7 +128,7 @@ def iso8601_to_unixtime(time_string):
def make_signature(access_key_secret, date=None):
if isinstance(date, bytes):
date = date.decode("utf-8")
date = bytes.decode(date)
if isinstance(date, int):
date_gmt = http_date(date)
elif date is None:
......@@ -153,7 +166,7 @@ class TtyIOParser(object):
if line.strip():
output.append(line)
self.screen.reset()
return sep.join(output[0:-1])
return sep.join(output[0:-1]).strip()
def parse_input(self, data):
"""
......@@ -175,7 +188,32 @@ class TtyIOParser(object):
command = ''
self.screen.reset()
command = self.clean_ps1_etc(command)
return command
return command.strip()
def is_obj_attr_has(obj, val, attrs=("hostname", "ip", "comment")):
if not attrs:
vals = [val for val in obj.__dict__.values() if isinstance(val, (str, int))]
else:
vals = [getattr(obj, attr) for attr in attrs if
hasattr(obj, attr) and isinstance(hasattr(obj, attr), (str, int))]
for v in vals:
if str(v).find(val) != -1:
return True
return False
def is_obj_attr_eq(obj, val, attrs=("id", "hostname", "ip")):
if not attrs:
vals = [val for val in obj.__dict__.values() if isinstance(val, (str, int))]
else:
vals = [getattr(obj, attr) for attr in attrs if hasattr(obj, attr)]
for v in vals:
if str(v) == str(val):
return True
return False
def wrap_with_line_feed(s, before=0, after=1):
......@@ -187,7 +225,7 @@ def wrap_with_line_feed(s, before=0, after=1):
def wrap_with_color(text, color='white', background=None,
bolder=False, underline=False):
bolder_ = '1'
underline_ = '4'
_underline = '4'
color_map = {
'black': '30',
'red': '31',
......@@ -213,14 +251,19 @@ def wrap_with_color(text, color='white', background=None,
if bolder:
wrap_with.append(bolder_)
if underline:
wrap_with.append(underline_)
wrap_with.append(_underline)
if background:
wrap_with.append(background_map.get(background, ''))
wrap_with.append(color_map.get(color, ''))
is_bytes = True if isinstance(text, bytes) else False
if is_bytes:
text = text.decode("utf-8")
data = '\033[' + ';'.join(wrap_with) + 'm' + text + '\033[0m'
if isinstance(text, bytes):
if is_bytes:
return data.encode('utf-8')
else:
return data
......@@ -238,3 +281,89 @@ def wrap_with_primary(text, bolder=False):
def wrap_with_title(text):
return wrap_with_color(text, color='black', background='green')
def b64encode_as_string(data):
return base64.b64encode(data).decode("utf-8")
def split_string_int(s):
"""Split string or int
example: test-01-02-db => ['test-', '01', '-', '02', 'db']
"""
string_list = []
index = 0
pre_type = None
word = ''
for i in s:
if index == 0:
pre_type = int if i.isdigit() else str
word = i
else:
if pre_type is int and i.isdigit() or pre_type is str and not i.isdigit():
word += i
else:
string_list.append(word.lower() if not word.isdigit() else int(word))
word = i
pre_type = int if i.isdigit() else str
index += 1
string_list.append(word.lower() if not word.isdigit() else int(word))
return string_list
def sort_assets(assets, order_by='hostname'):
if order_by == 'ip':
assets = sorted(assets, key=lambda asset: [int(d) for d in asset.ip.split('.') if d.isdigit()])
else:
assets = sorted(assets, key=lambda asset: getattr(asset, order_by))
return assets
class PKey(object):
@classmethod
def from_string(cls, key_string):
try:
pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
try:
pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
return None
def timestamp_to_datetime_str(ts):
datetime_format = '%Y-%m-%dT%H:%M:%S.%fZ'
dt = datetime.datetime.fromtimestamp(ts, tz=pytz.timezone('UTC'))
return dt.strftime(datetime_format)
class MultiQueue(Queue):
def mget(self, size=1, block=True, timeout=5):
items = []
for i in range(size):
try:
items.append(self.get(block=block, timeout=timeout))
except Empty:
break
return items
def _gettext():
gettext.bindtextdomain("coco", os.path.join(BASE_DIR, "locale"))
gettext.textdomain("coco")
return gettext.gettext
def make_message():
os.makedirs(os.path.join(BASE_DIR, "locale", "zh_CN"))
pass
def compile_message():
pass
ugettext = _gettext()
# coding: utf-8
import socket
import json
import logging
import tornado.web
import tornado.websocket
import tornado.httpclient
import tornado.ioloop
import tornado.gen
from .models import User, Request, Client, WSProxy
from .interactive import InteractiveServer
logger = logging.getLogger(__file__)
class BaseWehSocketHandler:
def prepare(self):
self.app = self.settings["app"]
child, parent = socket.socketpair()
request = Request((self.request.remote_ip, 0))
request.user = self.current_user
self.request.__dict__.update(request.__dict__)
self.client = Client(parent, self.request)
self.proxy = WSProxy(self, child)
self.app.clients.append(self.client)
def get_current_user(self):
return User(id=1, username="guanghongwei", name="广宏伟")
def check_origin(self, origin):
return True
class InteractiveWehSocketHandler(BaseWehSocketHandler, tornado.websocket.WebSocketHandler):
@tornado.web.authenticated
def open(self):
InteractiveServer(self.app, self.client).activate_async()
def on_message(self, message):
try:
message = json.loads(message)
except json.JSONDecodeError:
logger.info("Loads websocket json message failed")
return
if message.get('event'):
self.evt_handle(message)
elif message.get('data'):
self.proxy.send(message)
def on_close(self):
self.proxy.close()
def evt_handle(self, data):
if data['event'] == 'change_size':
try:
self.request.meta['width'] = data['meta']['width']
self.request.meta['height'] = data['meta']['height']
self.request.change_size_event.set()
except KeyError:
pass
class ProxyWehSocketHandler(BaseWehSocketHandler):
pass
class MonitorWehSocketHandler(BaseWehSocketHandler):
pass
class WSServer:
routers = [
(r'/ws/interactive/', InteractiveWehSocketHandler),
(r'/ws/proxy/(?P<asset_id>[0-9]+)/(?P<system_user_id>[0-9]+)/', ProxyWehSocketHandler),
(r'/ws/session/(?P<session_id>[0-9]+)/monitor/', MonitorWehSocketHandler),
]
# prepare may be rewrite it
settings = {
'cookie_secret': '',
'app': None,
'login_url': '/login'
}
def __init__(self, app):
self.app = app
self._prepare()
def _prepare(self):
self.settings['cookie_secret'] = self.app.config['SECRET_KEY']
self.settings['app'] = self.app
def run(self):
host = self.app.config["BIND_HOST"]
port = self.app.config["WS_PORT"]
print('Starting websocket server at %(host)s:%(port)s' %
{"host": host, "port": port})
ws = tornado.web.Application(self.routers, **self.settings)
ws.listen(port=port, address=host)
tornado.ioloop.IOLoop.current().start()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import os
BASE_DIR = os.path.dirname(__file__)
class Config:
"""
Coco config file
"""
# 默认的名字
APP_NAME = os.environ.get("APP_NAME") or "localhost"
# Jumpserver项目的url, api请求注册会使用
CORE_HOST = os.environ.get("CORE_HOST") or 'http://core:8080'
# 启动时绑定的ip, 默认 0.0.0.0
BIND_HOST = '0.0.0.0'
# 监听的SSH端口号, 默认2222
SSHD_PORT = 2222
# 监听的HTTP/WS端口号,默认5000
HTTPD_PORT = 5000
# 项目使用的ACCESS KEY, 默认会注册,并保存到 ACCESS_KEY_STORE中,
# 如果有需求, 可以写到配置文件中, 格式 access_key_id:access_key_secret
ACCESS_KEY = os.environ.get("ACCESS_KEY") or None
# ACCESS KEY 保存的地址, 默认注册后会保存到该文件中
# ACCESS_KEY_STORE = os.path.join(BASE_DIR, 'keys', '.access_key')
# 加密密钥
SECRET_KEY = os.environ.get("SECRET_KEY") or 'SKdfm239LSKdfj())_23jK*^2'
# 设置日志级别 ['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', 'CRITICAL']
LOG_LEVEL = os.environ.get("LOG_LEVEL") or 'INFO'
# 日志存放的目录
LOG_DIR = os.environ.get("LOG_DIR") or os.path.join(BASE_DIR, 'logs')
# Session录像存放目录
SESSION_DIR = os.environ.get("SESSION_DIR") or os.path.join(BASE_DIR, 'sessions')
# 资产显示排序方式, ['ip', 'hostname']
ASSET_LIST_SORT_BY = os.environ.get("SESSION_DIR") or 'ip'
# 登录是否支持密码认证
SSH_PASSWORD_AUTH = bool(os.environ.get("SSH_PASSWORD_AUTH")) if os.environ.get("SSH_PASSWORD_AUTH") else True
# 登录是否支持秘钥认证
SSH_PUBLIC_KEY_AUTH = bool(os.environ.get("SSH_PUBLIC_KEY_AUTH")) if os.environ.get("SSH_PUBLIC_KEY_AUTH") else True
# 和Jumpserver 保持心跳时间间隔
HEARTBEAT_INTERVAL = int(os.environ.get("HEARTBEAT_INTERVAL")) if os.environ.get("HEARTBEAT_INTERVAL") else 5
# Admin的名字,出问题会提示给用户
ADMINS = os.environ.get("ADMINS") or ''
class ConfigDocker(Config):
pass
config = ConfigDocker()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import os
BASE_DIR = os.path.dirname(__file__)
# 项目名称, 会用来向Jumpserver注册, 识别而已, 不能重复
APP_NAME = "coco"
# Jumpserver项目的url, api请求注册会使用
# CORE_HOST = 'http://127.0.0.1:8080'
class Config:
"""
Coco config file
"""
# 项目名称, 会用来向Jumpserver注册, 识别而已, 不能重复
# APP_NAME = "localhost"
# Jumpserver项目的url, api请求注册会使用
# CORE_HOST = os.environ.get("CORE_HOST") or 'http://127.0.0.1:8080'
# 启动时绑定的ip, 默认 0.0.0.0
# BIND_HOST = '0.0.0.0'
# 启动时绑定的ip, 默认 0.0.0.0
# BIND_HOST = '0.0.0.0'
# 监听的SSH端口号, 默认2222
# SSHD_PORT = 2222
# 监听的SSH端口号, 默认2222
# SSHD_PORT = 2222
# 监听的HTTP/WS端口号,默认5000
# HTTPD_PORT = 5000
# 监听的WS端口号,默认5000
# WS_PORT = 5000
# 项目使用的ACCESS KEY, 默认会注册,并保存到 ACCESS_KEY_STORE中,
# 如果有需求, 可以写到配置文件中, 格式 access_key_id:access_key_secret
# ACCESS_KEY = None
# 是否开启DEBUG
# DEBUG = True
# ACCESS KEY 保存的地址, 默认注册后会保存到该文件中
# ACCESS_KEY_STORE = os.path.join(BASE_DIR, 'keys', '.access_key')
# 项目使用的ACCESS KEY, 默认会注册,并保存到 ACCESS_KEY_STORE中,
# 如果有需求, 可以写到配置文件中, 格式 access_key_id:access_key_secret
# ACCESS_KEY = None
# 加密密钥
# SECRET_KEY = None
# ACCESS KEY 保存的地址, 默认注册后会保存到该文件中
# ACCESS_KEY_STORE = os.path.join(BASE_DIR, 'keys', '.access_key')
# 设置日志级别 ['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', 'CRITICAL']
# LOG_LEVEL = 'INFO'
# 加密密钥
# SECRET_KEY = None
# 日志存放的目录
# LOG_DIR = os.path.join(BASE_DIR, 'logs')
# 设置日志级别 ['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', 'CRITICAL']
# LOG_LEVEL = 'INFO'
# Session录像存放目录
# SESSION_DIR = os.path.join(BASE_DIR, 'sessions')
# 日志存放的目录
# LOG_DIR = os.path.join(BASE_DIR, 'logs')
# 资产显示排序方式, ['ip', 'hostname']
# ASSET_LIST_SORT_BY = 'ip'
# Session录像存放目录
# SESSION_DIR = os.path.join(BASE_DIR, 'sessions')
# 登录是否支持密码认证
# SSH_PASSWORD_AUTH = True
# 资产显示排序方式, ['ip', 'hostname']
# ASSET_LIST_SORT_BY = 'ip'
# 登录是否支持秘钥认证
# SSH_PUBLIC_KEY_AUTH = True
# 登录是否支持密码认证
# SSH_PASSWORD_AUTH = True
# 和Jumpserver 保持心跳时间间隔
# HEARTBEAT_INTERVAL = 5
# 登录是否支持秘钥认证
# SSH_PUBLIC_KEY_AUTH = True
# Admin的名字,出问题会提示给用户
# ADMINS = ''
# 和Jumpserver 保持心跳时间间隔
# HEARTBEAT_INTERVAL = 5
# 异步上报统计命令
# COMMAND_PUSH_ASYNC = True
config = Config()
___
|_ |
| |_ _ _ __ ___ _ __ ___ ___ _ ____ _____ _ __
| | | | | '_ ` _ \| '_ \/ __|/ _ \ '__\ \ / / _ \ '__|
/\__/ / |_| | | | | | | |_) \__ \ __/ | \ V / __/ |
\____/ \__,_|_| |_| |_| .__/|___/\___|_| \_/ \___|_|
| |
|_|
paramiko>=2.1.2
python-gssapi>=0.6.4
requests>=2.11.1
pyte>=0.5.2
tornado>=4.5.2
\ No newline at end of file
asn1crypto==0.23.0
bcrypt==3.1.4
certifi==2017.11.5
cffi==1.11.2
chardet==3.0.4
click==6.7
cryptography==2.1.4
Flask==0.12.2
Flask-SocketIO==2.9.2
idna==2.6
itsdangerous==0.24
Jinja2==2.10
MarkupSafe==1.0
paramiko==2.4.0
psutil==5.4.1
pyasn1==0.4.2
pycparser==2.18
PyNaCl==1.2.0
pyte==0.7.0
python-engineio==2.0.1
python-gssapi==0.6.4
python-socketio==1.8.3
pytz==2017.3
requests==2.18.4
simplejson==3.13.2
six==1.11.0
tornado==4.5.2
urllib3==1.22
wcwidth==0.1.7
werkzeug==0.12.2
jumpserver-python-sdk==0.0.20
libffi-devel
\ No newline at end of file
#!/usr/bin/python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import os
......@@ -7,7 +8,7 @@ import sys
from coco import Coco
try:
import conf
from conf import config
except ImportError:
print("Please prepare config file `cp conf_example.py conf.py`")
sys.exit(1)
......@@ -21,7 +22,7 @@ except:
coco = Coco()
coco.config.from_object(conf)
coco.config.from_object(config)
# Todo:
# 0. argparser
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment