Commit 86cf253e authored by ibuler's avatar ibuler

[Fixture] 使用gevent, 使用proxy server

parent 73a012f8
......@@ -7,3 +7,5 @@ BASE_DIR = os.path.abspath(os.path.dirname(__file__))
from .app import app, socket_io
from . import authentication, views
import logger
from flask import request
......@@ -5,6 +5,7 @@ import time
from flask import Flask
import socketio
from flask_socketio import SocketIO
from .conf import config
from .service import service
......@@ -16,8 +17,9 @@ __version__ = '0.4.0'
class Luna(Flask):
app_service = service
service = service
clients = {}
proxy_list = {}
active = True
def run(self, host=None, port=None, debug=None, **options):
......@@ -38,6 +40,7 @@ class Luna(Flask):
async_mode = 'threading'
app = Luna(__name__, template_folder='dist')
socket_io = SocketIO(app)
app.config.update(**config)
socket_io = socketio.Server(logger=False, async_mode=async_mode)
app.wsgi_app = socketio.Middleware(socket_io, app.wsgi_app)
#socket_io = socketio.Server(logger=False, async_mode=async_mode)
#app.wsgi_app = socketio.Middleware(socket_io, app.wsgi_app)
......@@ -9,10 +9,6 @@ from jms import UserService
from . import app
def is_authenticate(user_service):
pass
def login_required(func=None, login_url=None):
if func is None:
return partial(login_required, login_url=login_url)
......@@ -31,7 +27,9 @@ def login_required(func=None, login_url=None):
g.user_service = UserService(endpoint=app.config['JUMPSERVER_ENDPOINT'])
g.user_service.auth_from_session(session_id, csrf_token)
if g.user_service.is_authenticated():
user = g.user_service.is_authenticated()
if user:
g.user = user
return func(*args, **kwargs)
else:
return redirect(login_url)
......
# ~*~ coding: utf-8 ~*~
import socket
from .app import socket_io
class ProxyChannel(object):
def __init__(self, sid):
self.sid = sid
self.srv, self.cli = socket.socketpair()
self.win_width = 80
self.win_height = 24
def fileno(self):
return self.srv.fileno()
def send(self, s):
"""Proxy server中使用select, 通过socketio发送给用户"""
return socket_io.emit('data', s, root=self.sid)
def recv(self, size):
"""Proxy server使用select, 接受socketio客户端数据发送来的数据"""
return self.srv.recv(size)
def write(self, s):
"""socket io写数据,proxy可以通过recv收到"""
print('Client write message: %s' % s)
self.cli.send(s)
def set_win_size(self, size):
self.win_width, self.win_height = size
# ~*~ coding: utf-8 ~*~
import socket
import threading
import paramiko
import re
import time
import logging
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import time
import socket
import select
from flask import request, g
from jms.utils import TtyIOParser
import threading
import paramiko
from jms.utils import wrap_with_line_feed as wr, wrap_with_warning as warning
from jms.utils import TtyIOParser
# from .globals import request, g
from .tasks import command_queue, record_queue
from .app import app
logger = logging.getLogger(__file__)
class ProxyChannel(object):
serial = 1
def __init__(self):
object.__setattr__(self, 'f', StringIO())
lock = threading.Lock()
lock.acquire()
try:
ProxyChannel.serial += 1
object.__setattr__(self, 'serial', ProxyChannel.serial)
finally:
lock.release()
def send(self, s):
return self.f.write(s)
def recv(self, size):
return self.f.read(size)
def fileno(self):
return self.serial
def __getattr__(self, item):
return getattr(self.f, item)
def __setattr__(self, key, value):
return setattr(self.f, key, value)
logger = app.logger
class ProxyServer(object):
......@@ -69,24 +34,29 @@ class ProxyServer(object):
IGNORE_OUTPUT_COMMAND = [re.compile(r'^cat\s+'),
re.compile(r'^tailf?\s+')]
def __init__(self, app, asset, system_user):
def __init__(self, app, user, asset, system_user, client_channel):
self.app = app
self.user = user
self.asset = asset
self.system_user = system_user
self.service = app.service
self.backend_channel = None
self.ssh = None
# If is first input, will clear the output data: ssh banner and PS1
self.is_first_input = True
self.in_input_state = False
# This ssh session command serial no
self.in_vim_state = False
self.command_no = 1
self.client_channel = client_channel
self.change_win_size_event = threading.Event()
self.input = ''
self.output = ''
self.output_data = []
self.input_data = []
self.history = {}
self.in_vim_state = False
# This ssh session command serial no
self.command_no = 1
# If is first input, will clear the output data: ssh banner and PS1
self.is_first_input = True
self.in_input_state = False
self.ssh = None
self.backend_channel = None
self.proxy_log_id = 0
def is_finish_input(self, s):
for char in s:
......@@ -95,15 +65,16 @@ class ProxyServer(object):
return False
def get_output(self):
parser = TtyIOParser(width=request.win_width,
height=request.win_height)
width = self.client_channel.win_width
height = self.client_channel.win_heigth
parser = TtyIOParser(width=width, height=height)
self.output = parser.parse_output(b''.join(self.output_data))
if self.input:
data = {
'proxy_log_id': g.proxy_log_id,
'user': request.user.username,
'asset': request.asset.ip,
'system_user': request.system_user.username,
'proxy_log_id': self.proxy_log_id,
'user': self.user.username,
'asset': self.asset.ip,
'system_user': self.system_user.username,
'command_no': self.command_no,
'command': self.input,
'output': self.output[:100],
......@@ -113,37 +84,39 @@ class ProxyServer(object):
self.command_no += 1
def get_input(self):
parser = TtyIOParser(width=request.win_width,
height=request.win_height)
width = self.client_channel.win_width
height = self.client_channel.win_heigth
parser = TtyIOParser(width=width, height=height)
self.input = parser.parse_input(b''.join(self.input_data))
# Todo: App check user permission
def validate_user_asset_permission(self, user_id, asset_id, system_user_id):
def validate_user_asset_permission(self):
# Todo: Only test
return True
return self.service.validate_user_asset_permission(
user_id, asset_id, system_user_id)
self.user.id, self.asset.id, self.system_user.id)
def get_asset_auth(self, system_user):
return self.service.get_system_user_auth_info(system_user)
def connect(self, term=b'xterm', width=80, height=24, timeout=10):
user = self.user
asset = self.asset
system_user = self.system_user
if not self.validate_user_asset_permission(
request.user.id, asset.id, system_user.id):
client_channel = self.client_channel
if not self.validate_user_asset_permission():
logger.warning('User %s have no permission connect %s with %s' %
(request.user.username,
asset.ip, system_user.username))
(user.username, asset.ip, system_user.username))
return None
self.ssh = ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
password, private_key = self.get_asset_auth(self.system_user)
password, private_key = self.get_asset_auth(system_user)
data = {"user": request.user.username, "asset": self.asset.ip,
"system_user": self.system_user.username, "login_type": "ST",
data = {"user": user.username, "asset": asset.ip,
"system_user": system_user.username, "login_type": "WT",
"date_start": time.time(), "is_failed": 0}
g.proxy_log_id = proxy_log_id = self.service.send_proxy_log(data)
self.proxy_log_id = proxy_log_id = self.service.send_proxy_log(data)
try:
g.client_channel.send(
client_channel.send(
wr('Connecting %s@%s:%s ... ' %
(system_user.username, asset.ip, asset.port)))
ssh.connect(hostname=asset.ip, port=asset.port,
......@@ -172,7 +145,7 @@ class ProxyServer(object):
logger.info(msg)
if failed:
g.client_channel.send(wr(warning(msg+'\r\n')))
client_channel.send(wr(warning(msg+'\r\n')))
data = {
"proxy_log_id": proxy_log_id,
"date_finished": time.time(),
......@@ -194,77 +167,82 @@ class ProxyServer(object):
def proxy(self):
self.backend_channel = backend_channel = self.connect()
self.app.proxy_list[g.proxy_log_id] = [g.client_channel, backend_channel]
client_channel = self.client_channel
self.app.proxy_list[self.proxy_log_id] = \
[self.client_channel, backend_channel]
#while True:
# r, w, x = select.select([client_channel], [], [])
# if client_channel in r:
# data = client_channel.recv(100)
# if len(data) == 0:
# break
# print('Receive from client: %s' % data)
#return
if backend_channel is None:
return
while True:
try:
r, w, x = select.select([g.client_channel, backend_channel], [], [])
except:
pass
r, w, x = select.select([client_channel, backend_channel], [], [])
if request.change_win_size_event.is_set():
request.change_win_size_event.clear()
backend_channel.resize_pty(width=request.win_width,
height=request.win_height)
#if self.change_win_size_event.is_set():
# self.change_win_size_event.clear()
# width = self.client_channel.win_width
# height = self.client_channel.win_heigth
# backend_channel.resize_pty(width=width, height=height)
if g.client_channel in r:
if client_channel in r:
# Get output of the command
self.is_first_input = False
if self.in_input_state is False:
self.get_output()
del self.output_data[:]
print('Recive from client')
# self.is_first_input = False
# if self.in_input_state is False:
# self.get_output()
# del self.output_data[:]
self.in_input_state = True
client_data = g.client_channel.recv(1024)
# self.in_input_state = True
client_data = client_channel.recv(1024)
if self.is_finish_input(client_data):
self.in_input_state = False
self.get_input()
del self.input_data[:]
# if self.is_finish_input(client_data):
# self.in_input_state = False
# self.get_input()
# del self.input_data[:]
if len(client_data) == 0:
logger.info('Logout from ssh server %(host)s: %(username)s' % {
'host': request.environ['REMOTE_ADDR'],
'username': request.user.username,
})
break
backend_channel.send(client_data)
if backend_channel in r:
backend_data = backend_channel.recv(1024)
if self.in_input_state:
self.input_data.append(backend_data)
else:
self.output_data.append(backend_data)
# if self.in_input_state:
# self.input_data.append(backend_data)
# else:
# self.output_data.append(backend_data)
if len(backend_data) == 0:
g.client_channel.send(
wr('Disconnect from %s' % request.asset.ip))
logger.info('Logout from asset %(host)s: %(username)s' % {
'host': request.asset.ip,
'username': request.user.username,
})
# client_channel.send(
# wr('Disconnect from %s' % self.asset.ip))
# logger.info('Logout from asset %(host)s: %(username)s' % {
# 'host': self.asset.ip,
# 'username': self.user.username,
# })
break
g.client_channel.send(backend_data)
client_channel.send(backend_data)
if self.is_match_ignore_command(self.input):
output = 'ignore output ...'
else:
output = backend_data
record_data = {
'proxy_log_id': g.proxy_log_id,
'output': output,
'timestamp': time.time(),
}
record_queue.put(record_data)
# record_data = {
# 'proxy_log_id': self.proxy_log_id,
# 'output': output,
# 'timestamp': time.time(),
# }
# record_queue.put(record_data)
data = {
"proxy_log_id": g.proxy_log_id,
"proxy_log_id": self.proxy_log_id,
"date_finished": time.time(),
}
self.service.finish_proxy_log(data)
# ~*~ coding: utf-8 ~*~
import re
import select
import threading
import socket
import collections
import json
import logging
import paramiko
import time
from flask import request, g
from jms.utils import TtyIOParser
from jms.utils import to_dotmap
from .. import app, socket_io
from ..nav import nav
from ..tasks import command_queue, record_queue
from ..proxy import ProxyServer
from ..channel import ProxyChannel
clients = app.clients
logger = logging.getLogger(__file__)
......@@ -26,6 +24,58 @@ __all__ = [
]
@socket_io.on('nav')
def handle_api():
socket_io.emit('nav', json.dumps(nav))
@socket_io.on('connect', namespace='/')
def handle_term_connect():
clients[request.sid] = collections.defaultdict(dict)
@socket_io.on('machine')
def handle_machine(message):
sid = request.sid
clients[sid]['host'] = host = '120.25.240.109'
clients[sid]['port'] = port = 8022
user = to_dotmap({'username': 'root', 'name': 'redhat'})
asset = to_dotmap({'hostname': host, 'ip': host, 'port': 8022})
# win_width = request.cookies.get('col')
# win_height = request.cookies.get('row')
system_user = to_dotmap({'name': 'jms', 'username': 'jms', 'id': 102})
clients[sid]['proxy_chan'] = proxy_chan = ProxyChannel(sid)
# proxy_chan.set_win_size((win_width, win_height))
proxy_server = ProxyServer(app, user, asset, system_user, proxy_chan)
socket_io.start_background_task(proxy_server.proxy)
# t = threading.Thread(target=proxy_server.proxy, args=())
# t.daemon = True
# t.start()
@socket_io.on('data')
def handle_data( message):
sid = request.sid
logger.debug('Receive data: %s' % message)
if clients[sid]['proxy_chan']:
print('Sending to client channel')
clients[sid]['proxy_chan'].write(message)
@socket_io.on('disconnect')
def handle_term_disconnect():
sid = request.sid
del clients[sid]
print('term disconnect')
@socket_io.on('resize')
def handle_term_resize(json):
sid = request.sid
logger.debug('Resize term: %s' % json)
"""
@socket_io.on('nav')
def handle_api(sid):
socket_io.emit('nav', json.dumps(nav), room=sid)
......@@ -40,17 +90,25 @@ def handle_term_connect(sid, environ):
def handle_machine(sid, message):
clients[sid]['host'] = host = '120.25.240.109'
clients[sid]['port'] = port = 8022
t = threading.Thread(target=forward, args=(sid,))
user = to_dotmap({'username': 'root', 'name': 'redhat'})
asset = to_dotmap({'hostname': host, 'ip': host, 'port': 8022})
# win_width = request.cookies.get('col')
# win_height = request.cookies.get('row')
system_user = to_dotmap({'name': 'jms', 'username': 'jms', 'id': 102})
clients[sid]['proxy_chan'] = proxy_chan = ProxyChannel(sid)
# proxy_chan.set_win_size((win_width, win_height))
proxy_server = ProxyServer(app, user, asset, system_user, proxy_chan)
t = threading.Thread(target=proxy_server.proxy, args=())
t.daemon = True
t.start()
socket_io.emit('data', 'Connect to %s:%s \r\n' % (host, port), room=sid)
@socket_io.on('data')
def handle_data(sid, message):
logger.debug('Receive data: %s' % message)
if clients[sid]['chan']:
clients[sid]['chan'].send(message)
if clients[sid]['proxy_chan']:
print('Sending to client channel')
clients[sid]['proxy_chan'].write(message)
@socket_io.on('disconnect')
......@@ -62,11 +120,12 @@ def handle_term_disconnect(sid):
@socket_io.on('resize')
def handle_term_resize(sid, json):
logger.debug('Resize term: %s' % json)
"""
### Only for test ###
def forward(sid):
print(request)
try:
host = clients[sid]['host']
port = clients[sid]['port']
......@@ -86,3 +145,14 @@ def forward(sid):
break
socket_io.emit('data', data, room=sid)
del clients[sid]
def handle(p):
while True:
r, w, x = select.select([p], [], [])
if p in r:
data = p.recv(1024)
if len(data) == 0:
break
print("Recieve client: %s" % data)
#!/usr/bin/env python
# ~*~ coding: utf-8 ~*~
from luna import app
import sys
import os
from luna import app, socket_io
host = app.config['BIND_HOST']
port = app.config['LISTEN_PORT']
......@@ -17,7 +18,9 @@ if __name__ == '__main__':
pass
try:
app.run(threaded=True, host=host, port=port)
socket_io.run(app)
# app.run(threaded=True, host=host, port=port)
except KeyboardInterrupt:
app.stop()
sys.exit()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment