Commit fddf3bed authored by Eric's avatar Eric

修改连接复用

parent a40f2585
...@@ -201,12 +201,12 @@ func LoopRead(read io.Reader, inChan chan<- []byte) { ...@@ -201,12 +201,12 @@ func LoopRead(read io.Reader, inChan chan<- []byte) {
for { for {
buf := make([]byte, 1024) buf := make([]byte, 1024)
nr, err := read.Read(buf) nr, err := read.Read(buf)
if err != nil {
break
}
if nr > 0 { if nr > 0 {
inChan <- buf[:nr] inChan <- buf[:nr]
} }
if err != nil {
break
}
} }
close(inChan) close(inChan)
} }
...@@ -102,6 +102,7 @@ func KeepSyncConfigWithServer(ctx context.Context) { ...@@ -102,6 +102,7 @@ func KeepSyncConfigWithServer(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
logger.Info("Sync config with server exit.") logger.Info("Sync config with server exit.")
return
case <-ticker.C: case <-ticker.C:
err := LoadConfigFromServer() err := LoadConfigFromServer()
if err != nil { if err != nil {
......
...@@ -56,6 +56,9 @@ func (s *SSHClient) increaseRef() { ...@@ -56,6 +56,9 @@ func (s *SSHClient) increaseRef() {
func (s *SSHClient) decreaseRef() { func (s *SSHClient) decreaseRef() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.ref == 0 {
return
}
s.ref-- s.ref--
} }
...@@ -66,9 +69,6 @@ func (s *SSHClient) NewSession() (*gossh.Session, error) { ...@@ -66,9 +69,6 @@ func (s *SSHClient) NewSession() (*gossh.Session, error) {
func (s *SSHClient) Close() error { func (s *SSHClient) Close() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.ref > 1 {
return nil
}
select { select {
case <-s.closed: case <-s.closed:
return nil return nil
...@@ -91,7 +91,6 @@ func KeepAlive(c *gossh.Client, closed <-chan struct{}, keepInterval time.Durati ...@@ -91,7 +91,6 @@ func KeepAlive(c *gossh.Client, closed <-chan struct{}, keepInterval time.Durati
_, _, err := c.SendRequest("keepalive@jumpserver.org", true, nil) _, _, err := c.SendRequest("keepalive@jumpserver.org", true, nil)
if err != nil { if err != nil {
logger.Errorf("SSH client %p keep alive err: ", c, err.Error()) logger.Errorf("SSH client %p keep alive err: ", c, err.Error())
return
} }
} }
...@@ -238,10 +237,10 @@ func newClient(asset *model.Asset, systemUser *model.SystemUser, timeout time.Du ...@@ -238,10 +237,10 @@ func newClient(asset *model.Asset, systemUser *model.SystemUser, timeout time.Du
closed := make(chan struct{}) closed := make(chan struct{})
go KeepAlive(conn, closed, 60) go KeepAlive(conn, closed, 60)
return &SSHClient{ return &SSHClient{
client: conn, client: conn,
username: systemUser.Username, username: systemUser.Username,
mu: new(sync.RWMutex), mu: new(sync.RWMutex),
closed: closed,}, nil closed: closed,}, nil
} }
func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUser, timeout time.Duration, func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUser, timeout time.Duration,
...@@ -255,8 +254,8 @@ func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUse ...@@ -255,8 +254,8 @@ func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUse
if systemUser.Username == "" { if systemUser.Username == "" {
systemUser.Username = client.username systemUser.Username = client.username
} }
logger.Infof("Reuse connection: %s->%s@%s ref: %d", logger.Infof("Reuse connection: %s->%s@%s. SSH client %p current ref: %d",
user.Username, client.username, asset.IP, client.refCount()) user.Username, client.username, asset.IP, client.client, client.refCount())
return client, nil return client, nil
} }
} }
...@@ -287,24 +286,21 @@ func setClientCache(key string, client *SSHClient) { ...@@ -287,24 +286,21 @@ func setClientCache(key string, client *SSHClient) {
} }
func RecycleClient(client *SSHClient) { func RecycleClient(client *SSHClient) {
// 0, 1: delete Cache, close client. // decrease client ref; if ref==0, delete Cache, close client.
// default: client ref decrease.
if client == nil { if client == nil {
return return
} }
switch client.refCount() { client.decreaseRef()
case 0, 1: logger.Debugf("SSH client %p ref -1. current ref: %d", client.client, client.refCount())
if client.refCount() == 0 {
clientLock.Lock() clientLock.Lock()
delete(sshClients, client.key) delete(sshClients, client.key)
clientLock.Unlock() clientLock.Unlock()
err := client.Close() err := client.Close()
if err != nil { if err != nil {
logger.Errorf("Failed to close client %p err: %s ",client.client, err.Error()) logger.Errorf("Failed to close SSH client %p err: %s ", client.client, err.Error())
} else { } else {
logger.Debugf("Success to close client %p",client.client) logger.Debugf("Success to close SSH client %p", client.client)
} }
default:
client.decreaseRef()
logger.Debugf("Reuse client %p Current ref: %d", client.client, client.refCount())
} }
} }
...@@ -595,6 +595,7 @@ func (u *UserSftp) GetSftpClient(asset *model.Asset, sysUser *model.SystemUser) ...@@ -595,6 +595,7 @@ func (u *UserSftp) GetSftpClient(asset *model.Asset, sysUser *model.SystemUser)
} }
sftpClient, err := sftp.NewClient(sshClient.client) sftpClient, err := sftp.NewClient(sshClient.client)
if err != nil { if err != nil {
RecycleClient(sshClient)
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment