Commit f8320787 authored by Eric's avatar Eric

[Update] add done chan to prevent from blocking

parent df477c57
...@@ -99,13 +99,23 @@ func (p *Parser) ParseStream(userInChan, srvInChan <-chan []byte) (userOut, srvO ...@@ -99,13 +99,23 @@ func (p *Parser) ParseStream(userInChan, srvInChan <-chan []byte) (userOut, srvO
return return
} }
b = p.ParseUserInput(b) b = p.ParseUserInput(b)
p.userOutputChan <- b select {
case <-p.closed:
return
case p.userOutputChan <- b:
}
case b, ok := <-srvInChan: case b, ok := <-srvInChan:
if !ok { if !ok {
return return
} }
b = p.ParseServerOutput(b) b = p.ParseServerOutput(b)
p.srvOutputChan <- b select {
case <-p.closed:
return
case p.srvOutputChan <- b:
}
} }
} }
}() }()
......
...@@ -82,7 +82,7 @@ func (cp *CmdParser) initial() { ...@@ -82,7 +82,7 @@ func (cp *CmdParser) initial() {
cp.term.SetEcho(false) cp.term.SetEcho(false)
go func() { go func() {
logger.Infof("Session %s: %s start", cp.id, cp.name) logger.Infof("Session %s: %s start", cp.id, cp.name)
defer logger.Infof("Session %s: %s parser close", cp.id, cp.name) defer logger.Infof("Session %s: %s close", cp.id, cp.name)
loop: loop:
for { for {
line, err := cp.term.ReadLine() line, err := cp.term.ReadLine()
......
...@@ -122,14 +122,15 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo ...@@ -122,14 +122,15 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
userInChan chan []byte userInChan chan []byte
srvInChan chan []byte srvInChan chan []byte
done chan struct{}
) )
parser = newParser(s.ID) parser = newParser(s.ID)
replayRecorder = NewReplyRecord(s.ID) replayRecorder = NewReplyRecord(s.ID)
userInChan = make(chan []byte, 10) userInChan = make(chan []byte, 1)
srvInChan = make(chan []byte, 10) srvInChan = make(chan []byte, 1)
done = make(chan struct{})
// 设置parser的命令过滤规则 // 设置parser的命令过滤规则
parser.SetCMDFilterRules(s.cmdRules) parser.SetCMDFilterRules(s.cmdRules)
...@@ -137,6 +138,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo ...@@ -137,6 +138,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
userOutChan, srvOutChan := parser.ParseStream(userInChan, srvInChan) userOutChan, srvOutChan := parser.ParseStream(userInChan, srvInChan)
defer func() { defer func() {
close(done)
_ = userConn.Close() _ = userConn.Close()
_ = srvConn.Close() _ = srvConn.Close()
// 关闭parser // 关闭parser
...@@ -148,9 +150,8 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo ...@@ -148,9 +150,8 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
// 记录命令 // 记录命令
go s.recordCommand(parser.cmdRecordChan) go s.recordCommand(parser.cmdRecordChan)
go LoopRead(userConn, userInChan) go s.LoopReadFromSrv(done, srvConn, srvInChan)
go LoopRead(srvConn, srvInChan) go s.LoopReadFromUser(done, userConn, userInChan)
winCh := userConn.WinCh() winCh := userConn.WinCh()
maxIdleTime := s.MaxIdleTime * time.Minute maxIdleTime := s.MaxIdleTime * time.Minute
lastActiveTime := time.Now() lastActiveTime := time.Now()
...@@ -223,13 +224,27 @@ func (s *SwitchSession) MapData() map[string]interface{} { ...@@ -223,13 +224,27 @@ func (s *SwitchSession) MapData() map[string]interface{} {
} }
} }
func LoopRead(read io.Reader, inChan chan<- []byte) { func (s *SwitchSession) LoopReadFromUser(done chan struct{}, userConn UserConnection, inChan chan<- []byte) {
defer logger.Debug("loop read end") defer logger.Infof("Session %s: read from user done", s.ID)
s.LoopRead(done, userConn, inChan)
}
func (s *SwitchSession) LoopReadFromSrv(done chan struct{}, srvConn srvconn.ServerConnection, inChan chan<- []byte) {
defer logger.Infof("Session %s: read from srv done", s.ID)
s.LoopRead(done, srvConn, inChan)
}
func (s *SwitchSession) LoopRead(done chan struct{}, read io.Reader, inChan chan<- []byte) {
loop:
for { for {
buf := make([]byte, 1024) buf := make([]byte, 1024)
nr, err := read.Read(buf) nr, err := read.Read(buf)
if nr > 0 { if nr > 0 {
inChan <- buf[:nr] select {
case <-done:
break loop
case inChan <- buf[:nr]:
}
} }
if err != nil { if err != nil {
break break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment