Commit cf0e86e9 authored by ibuler's avatar ibuler

[Update] 优化tranport

parent 0034e370
...@@ -54,7 +54,7 @@ class SSHConnection: ...@@ -54,7 +54,7 @@ class SSHConnection:
asset.ip, port=asset.port, username=system_user.username, asset.ip, port=asset.port, username=system_user.username,
password=system_user.password, pkey=system_user.private_key, password=system_user.password, pkey=system_user.private_key,
timeout=config['SSH_TIMEOUT'], timeout=config['SSH_TIMEOUT'],
compress=True, auth_timeout=config['SSH_TIMEOUT'], compress=False, auth_timeout=config['SSH_TIMEOUT'],
look_for_keys=False, sock=sock look_for_keys=False, sock=sock
) )
except paramiko.AuthenticationException: except paramiko.AuthenticationException:
...@@ -62,7 +62,7 @@ class SSHConnection: ...@@ -62,7 +62,7 @@ class SSHConnection:
ssh.connect( ssh.connect(
asset.ip, port=asset.port, username=system_user.username, asset.ip, port=asset.port, username=system_user.username,
password=system_user.password, timeout=config['SSH_TIMEOUT'], password=system_user.password, timeout=config['SSH_TIMEOUT'],
compress=True, auth_timeout=config['SSH_TIMEOUT'], compress=False, auth_timeout=config['SSH_TIMEOUT'],
look_for_keys=False, sock=sock, allow_agent=False, look_for_keys=False, sock=sock, allow_agent=False,
) )
transport = ssh.get_transport() transport = ssh.get_transport()
......
...@@ -59,6 +59,7 @@ class Connection(object): ...@@ -59,6 +59,7 @@ class Connection(object):
client.close() client.close()
self.__class__.clients_num -= 1 self.__class__.clients_num -= 1
del self.clients[tid] del self.clients[tid]
del client
logger.info("Client {} leave, total {} now".format( logger.info("Client {} leave, total {} now".format(
client, self.__class__.clients_num client, self.__class__.clients_num
)) ))
...@@ -80,8 +81,11 @@ class Connection(object): ...@@ -80,8 +81,11 @@ class Connection(object):
@classmethod @classmethod
def remove_connection(cls, cid): def remove_connection(cls, cid):
connection = cls.get_connection(cid) connection = cls.get_connection(cid)
if not connection:
return
connection.close() connection.close()
del cls.connections[cid] del cls.connections[cid]
del connection
@classmethod @classmethod
def get_connection(cls, cid): def get_connection(cls, cid):
...@@ -134,7 +138,9 @@ class Client(object): ...@@ -134,7 +138,9 @@ class Client(object):
def close(self): def close(self):
logger.info("Client {} close".format(self)) logger.info("Client {} close".format(self))
return self.chan.close() self.chan.close()
self.chan = None
return
def __getattr__(self, item): def __getattr__(self, item):
return getattr(self.chan, item) return getattr(self.chan, item)
...@@ -343,6 +349,7 @@ class BaseServer(object): ...@@ -343,6 +349,7 @@ class BaseServer(object):
logger.info("Closed server {}".format(self)) logger.info("Closed server {}".format(self))
self.r_input_output_data_filter(b'') self.r_input_output_data_filter(b'')
self.chan.close() self.chan.close()
self.chan = None
def __getattr__(self, item): def __getattr__(self, item):
return getattr(self.chan, item) return getattr(self.chan, item)
...@@ -379,6 +386,9 @@ class Server(BaseServer): ...@@ -379,6 +386,9 @@ class Server(BaseServer):
def close(self): def close(self):
super(Server, self).close() super(Server, self).close()
self.chan.transport.close() self.chan.transport.close()
self.chan.transport = None
self.chan = None
logger.debug("Backend server closed")
if self.sock: if self.sock:
self.sock.transport.close() self.sock.transport.close()
......
...@@ -77,31 +77,33 @@ class SSHServer: ...@@ -77,31 +77,33 @@ class SSHServer:
server = SSHInterface(connection) server = SSHInterface(connection)
try: try:
transport.start_server(server=server) transport.start_server(server=server)
while transport.is_active():
chan = transport.accept()
server.event.wait(5)
if chan is None:
continue
if not server.event.is_set():
logger.warning("Client not request invalid, exiting")
sock.close()
continue
else:
server.event.clear()
client = connection.clients.get(chan.get_id())
client.chan = chan
t = threading.Thread(target=self.dispatch, args=(client,))
t.daemon = True
t.start()
transport.close()
del transport
except paramiko.SSHException: except paramiko.SSHException:
logger.warning("SSH negotiation failed") logger.warning("SSH negotiation failed")
return
except EOFError as e: except EOFError as e:
logger.warning("Handle EOF Error: {}".format(e)) logger.warning("Handle EOF Error: {}".format(e))
return finally:
while transport.is_active(): Connection.remove_connection(connection.id)
chan = transport.accept() del connection
server.event.wait(5)
if chan is None:
continue
if not server.event.is_set():
logger.warning("Client not request invalid, exiting")
sock.close()
return
else:
server.event.clear()
client = connection.clients.get(chan.get_id())
client.chan = chan
t = threading.Thread(target=self.dispatch, args=(client,))
t.daemon = True
t.start()
Connection.remove_connection(connection.id)
@staticmethod @staticmethod
def dispatch(client): def dispatch(client):
...@@ -114,8 +116,10 @@ class SSHServer: ...@@ -114,8 +116,10 @@ class SSHServer:
InteractiveServer(client).interact() InteractiveServer(client).interact()
except IndexError as e: except IndexError as e:
logger.error("Unexpected error occur: {}".format(e)) logger.error("Unexpected error occur: {}".format(e))
connection = Connection.get_connection(client.connection_id) finally:
connection.remove_client(client.id) connection = Connection.get_connection(client.connection_id)
connection.remove_client(client.id)
del client
elif chan_type == 'subsystem': elif chan_type == 'subsystem':
pass pass
else: else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment