Commit fcb50749 authored by ibuler's avatar ibuler

[Update] 使用paramiko channel, 代替ProxyCommand,解决并发引起的gateway问题

parent 16d758ec
...@@ -9,8 +9,6 @@ import threading ...@@ -9,8 +9,6 @@ import threading
import socket import socket
import json import json
import signal import signal
import objgraph
import gc
from jms.service import AppService from jms.service import AppService
...@@ -134,7 +132,6 @@ class Coco: ...@@ -134,7 +132,6 @@ class Coco:
def heartbeat(self): def heartbeat(self):
_sessions = [s.to_json() for s in self.sessions] _sessions = [s.to_json() for s in self.sessions]
tasks = self.service.terminal_heartbeat(_sessions) tasks = self.service.terminal_heartbeat(_sessions)
gc.collect()
if tasks: if tasks:
self.handle_task(tasks) self.handle_task(tasks)
if tasks is False: if tasks is False:
......
...@@ -34,7 +34,7 @@ class SSHConnection: ...@@ -34,7 +34,7 @@ class SSHConnection:
self.get_system_user_auth(system_user) self.get_system_user_auth(system_user)
if asset.domain: if asset.domain:
sock = self.get_proxy_sock(asset) sock = self.get_proxy_sock_v2(asset)
try: try:
ssh.connect( ssh.connect(
...@@ -64,29 +64,54 @@ class SSHConnection: ...@@ -64,29 +64,54 @@ class SSHConnection:
return None, str(e) return None, str(e)
except (socket.error, TimeoutError) as e: except (socket.error, TimeoutError) as e:
return None, str(e) return None, str(e)
return ssh, None return ssh, sock, None
def get_transport(self, asset, system_user): def get_transport(self, asset, system_user):
ssh, msg = self.get_ssh_client(asset, system_user) ssh, sock, msg = self.get_ssh_client(asset, system_user)
if ssh: if ssh:
return ssh.get_transport(), None return ssh.get_transport(), sock, None
else: else:
return None, msg return None, None, msg
def get_channel(self, asset, system_user, term="xterm", width=80, height=24): def get_channel(self, asset, system_user, term="xterm", width=80, height=24):
ssh, msg = self.get_ssh_client(asset, system_user) ssh, sock, msg = self.get_ssh_client(asset, system_user)
if ssh: if ssh:
chan = ssh.invoke_shell(term, width=width, height=height) chan = ssh.invoke_shell(term, width=width, height=height)
return chan, None return chan, sock, None
else: else:
return None, msg return None, sock, msg
def get_sftp(self, asset, system_user): def get_sftp(self, asset, system_user):
ssh, msg = self.get_ssh_client(asset, system_user) ssh, sock, msg = self.get_ssh_client(asset, system_user)
if ssh: if ssh:
return ssh.open_sftp(), None return ssh.open_sftp(), sock, None
else: else:
return None, msg return None, sock, msg
def get_proxy_sock_v2(self, asset):
sock = None
domain = app_service.get_domain_detail_with_gateway(
asset.domain
)
if not domain.has_ssh_gateway():
return None
for i in domain.gateways:
gateway = domain.random_ssh_gateway()
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(gateway.ip, username=gateway.username,
password=gateway.password,
pkey=gateway.private_key_obj)
except(paramiko.AuthenticationException,
paramiko.BadAuthenticationType,
SSHException):
continue
sock = ssh.get_transport().open_channel(
'direct-tcpip', (asset.ip, asset.port), ('127.0.0.1', 0)
)
break
return sock
def get_proxy_sock(self, asset): def get_proxy_sock(self, asset):
sock = None sock = None
......
...@@ -94,8 +94,9 @@ class Server: ...@@ -94,8 +94,9 @@ class Server:
""" """
# Todo: Server name is not very suitable # Todo: Server name is not very suitable
def __init__(self, chan, asset, system_user): def __init__(self, chan, sock, asset, system_user):
self.chan = chan self.chan = chan
self.sock = sock
self.asset = asset self.asset = asset
self.system_user = system_user self.system_user = system_user
self.send_bytes = 0 self.send_bytes = 0
...@@ -168,6 +169,8 @@ class Server: ...@@ -168,6 +169,8 @@ class Server:
self.stop_evt.set() self.stop_evt.set()
self.chan.close() self.chan.close()
self.chan.transport.close() self.chan.transport.close()
if self.sock:
self.sock.transport.close()
@staticmethod @staticmethod
def _have_enter_char(s): def _have_enter_char(s):
......
...@@ -90,14 +90,14 @@ class ProxyServer: ...@@ -90,14 +90,14 @@ class ProxyServer:
width = request.meta.get('width', 80) width = request.meta.get('width', 80)
height = request.meta.get('height', 24) height = request.meta.get('height', 24)
ssh = SSHConnection() ssh = SSHConnection()
chan, msg = ssh.get_channel( chan, sock, msg = ssh.get_channel(
asset, system_user, term=term, width=width, height=height asset, system_user, term=term, width=width, height=height
) )
if not chan: if not chan:
self.client.send(warning(wr(msg, before=1, after=0))) self.client.send(warning(wr(msg, before=1, after=0)))
server = None server = None
else: else:
server = Server(chan, asset, system_user) server = Server(chan, sock, asset, system_user)
self.connecting = False self.connecting = False
self.client.send(b'\r\n') self.client.send(b'\r\n')
return server return server
......
...@@ -2,6 +2,7 @@ import os ...@@ -2,6 +2,7 @@ import os
import tempfile import tempfile
import paramiko import paramiko
import time import time
from .ctx import app_service
from datetime import datetime from datetime import datetime
from .connection import SSHConnection from .connection import SSHConnection
...@@ -16,6 +17,17 @@ class SFTPServer(paramiko.SFTPServerInterface): ...@@ -16,6 +17,17 @@ class SFTPServer(paramiko.SFTPServerInterface):
self._sftp = {} self._sftp = {}
self.hosts = self.get_perm_hosts() self.hosts = self.get_perm_hosts()
def session_ended(self):
super().session_ended()
for _, v in self._sftp.items():
sftp = v['sftp']
sock = v.get('sock')
sftp.close()
if sock:
sock.close()
sock.transport.close()
self._sftp = {}
def get_host_sftp(self, host, su): def get_host_sftp(self, host, su):
asset = self.hosts.get(host) asset = self.hosts.get(host)
system_user = None system_user = None
...@@ -28,18 +40,18 @@ class SFTPServer(paramiko.SFTPServerInterface): ...@@ -28,18 +40,18 @@ class SFTPServer(paramiko.SFTPServerInterface):
raise OSError("No asset or system user explicit") raise OSError("No asset or system user explicit")
if host not in self._sftp: if host not in self._sftp:
ssh = SSHConnection(self.server.app) ssh = SSHConnection()
sftp, msg = ssh.get_sftp(asset, system_user) sftp, sock, msg = ssh.get_sftp(asset, system_user)
if sftp: if sftp:
self._sftp[host] = sftp self._sftp[host] = {'sftp': sftp, 'sock': sock}
return sftp return sftp
else: else:
raise OSError("Can not connect asset sftp server: {}".format(msg)) raise OSError("Can not connect asset sftp server: {}".format(msg))
else: else:
return self._sftp[host] return self._sftp[host]['sftp']
def get_perm_hosts(self): def get_perm_hosts(self):
assets = self.server.app.service.get_user_assets( assets = app_service.get_user_assets(
self.server.request.user self.server.request.user
) )
return {asset.hostname: asset for asset in assets} return {asset.hostname: asset for asset in assets}
...@@ -89,7 +101,7 @@ class SFTPServer(paramiko.SFTPServerInterface): ...@@ -89,7 +101,7 @@ class SFTPServer(paramiko.SFTPServerInterface):
"is_success": is_success, "is_success": is_success,
} }
for i in range(1, 4): for i in range(1, 4):
ok = self.server.app.service.create_ftp_log(data) ok = app_service.create_ftp_log(data)
if ok: if ok:
break break
else: else:
......
...@@ -405,6 +405,8 @@ def item_max_length(_iter, maxi=None, mini=None, key=None): ...@@ -405,6 +405,8 @@ def item_max_length(_iter, maxi=None, mini=None, key=None):
_iter = [key(i) for i in _iter] _iter = [key(i) for i in _iter]
length = [size_of_str_with_zh(s) for s in _iter] length = [size_of_str_with_zh(s) for s in _iter]
if not length:
return 1
if maxi: if maxi:
length.append(maxi) length.append(maxi)
length = max(length) length = max(length)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment