Unverified Commit 9c5ed4af authored by Eric_Lee's avatar Eric_Lee Committed by GitHub

Merge pull request #157 from jumpserver/155dev

[Update] add done chan to prevent from blocking
parents df477c57 f8320787
......@@ -99,13 +99,23 @@ func (p *Parser) ParseStream(userInChan, srvInChan <-chan []byte) (userOut, srvO
return
}
b = p.ParseUserInput(b)
p.userOutputChan <- b
select {
case <-p.closed:
return
case p.userOutputChan <- b:
}
case b, ok := <-srvInChan:
if !ok {
return
}
b = p.ParseServerOutput(b)
p.srvOutputChan <- b
select {
case <-p.closed:
return
case p.srvOutputChan <- b:
}
}
}
}()
......
......@@ -82,7 +82,7 @@ func (cp *CmdParser) initial() {
cp.term.SetEcho(false)
go func() {
logger.Infof("Session %s: %s start", cp.id, cp.name)
defer logger.Infof("Session %s: %s parser close", cp.id, cp.name)
defer logger.Infof("Session %s: %s close", cp.id, cp.name)
loop:
for {
line, err := cp.term.ReadLine()
......
......@@ -122,14 +122,15 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
userInChan chan []byte
srvInChan chan []byte
done chan struct{}
)
parser = newParser(s.ID)
replayRecorder = NewReplyRecord(s.ID)
userInChan = make(chan []byte, 10)
srvInChan = make(chan []byte, 10)
userInChan = make(chan []byte, 1)
srvInChan = make(chan []byte, 1)
done = make(chan struct{})
// 设置parser的命令过滤规则
parser.SetCMDFilterRules(s.cmdRules)
......@@ -137,6 +138,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
userOutChan, srvOutChan := parser.ParseStream(userInChan, srvInChan)
defer func() {
close(done)
_ = userConn.Close()
_ = srvConn.Close()
// 关闭parser
......@@ -148,9 +150,8 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
// 记录命令
go s.recordCommand(parser.cmdRecordChan)
go LoopRead(userConn, userInChan)
go LoopRead(srvConn, srvInChan)
go s.LoopReadFromSrv(done, srvConn, srvInChan)
go s.LoopReadFromUser(done, userConn, userInChan)
winCh := userConn.WinCh()
maxIdleTime := s.MaxIdleTime * time.Minute
lastActiveTime := time.Now()
......@@ -223,13 +224,27 @@ func (s *SwitchSession) MapData() map[string]interface{} {
}
}
func LoopRead(read io.Reader, inChan chan<- []byte) {
defer logger.Debug("loop read end")
func (s *SwitchSession) LoopReadFromUser(done chan struct{}, userConn UserConnection, inChan chan<- []byte) {
defer logger.Infof("Session %s: read from user done", s.ID)
s.LoopRead(done, userConn, inChan)
}
func (s *SwitchSession) LoopReadFromSrv(done chan struct{}, srvConn srvconn.ServerConnection, inChan chan<- []byte) {
defer logger.Infof("Session %s: read from srv done", s.ID)
s.LoopRead(done, srvConn, inChan)
}
func (s *SwitchSession) LoopRead(done chan struct{}, read io.Reader, inChan chan<- []byte) {
loop:
for {
buf := make([]byte, 1024)
nr, err := read.Read(buf)
if nr > 0 {
inChan <- buf[:nr]
select {
case <-done:
break loop
case inChan <- buf[:nr]:
}
}
if err != nil {
break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment