Unverified Commit ceb51aec authored by Eric_Lee's avatar Eric_Lee Committed by GitHub

Merge pull request #152 from jumpserver/dev

Dev
parents 9bbcfa95 a1b2ff4b
...@@ -34,7 +34,7 @@ func SftpHandler(sess ssh.Session) { ...@@ -34,7 +34,7 @@ func SftpHandler(sess ssh.Session) {
logger.Infof("SFTP request %s: Handler start", reqID) logger.Infof("SFTP request %s: Handler start", reqID)
req := sftp.NewRequestServer(sess, handlers) req := sftp.NewRequestServer(sess, handlers)
if err := req.Serve(); err == io.EOF { if err := req.Serve(); err == io.EOF {
logger.Debug("SFTP request %s: Exited session.", reqID) logger.Debugf("SFTP request %s: Exited session.", reqID)
} else if err != nil { } else if err != nil {
logger.Errorf("SFTP request %s: Server completed with error %s", reqID, err) logger.Errorf("SFTP request %s: Server completed with error %s", reqID, err)
} }
...@@ -82,7 +82,7 @@ func (fs *sftpHandler) Filecmd(r *sftp.Request) (err error) { ...@@ -82,7 +82,7 @@ func (fs *sftpHandler) Filecmd(r *sftp.Request) (err error) {
case "Setstat": case "Setstat":
return return
case "Rename": case "Rename":
logger.Debug("%s=>%s", r.Filepath, r.Target) logger.Debugf("%s=>%s", r.Filepath, r.Target)
return fs.Rename(r.Filepath, r.Target) return fs.Rename(r.Filepath, r.Target)
case "Rmdir": case "Rmdir":
err = fs.RemoveDirectory(r.Filepath) err = fs.RemoveDirectory(r.Filepath)
...@@ -91,7 +91,7 @@ func (fs *sftpHandler) Filecmd(r *sftp.Request) (err error) { ...@@ -91,7 +91,7 @@ func (fs *sftpHandler) Filecmd(r *sftp.Request) (err error) {
case "Mkdir": case "Mkdir":
err = fs.MkdirAll(r.Filepath) err = fs.MkdirAll(r.Filepath)
case "Symlink": case "Symlink":
logger.Debug("%s=>%s", r.Filepath, r.Target) logger.Debugf("%s=>%s", r.Filepath, r.Target)
err = fs.Symlink(r.Filepath, r.Target) err = fs.Symlink(r.Filepath, r.Target)
default: default:
return return
......
...@@ -61,6 +61,11 @@ func (w *WrapperSession) Read(p []byte) (int, error) { ...@@ -61,6 +61,11 @@ func (w *WrapperSession) Read(p []byte) (int, error) {
} }
func (w *WrapperSession) Close() error { func (w *WrapperSession) Close() error {
select {
case <-w.closed:
return nil
default:
}
err := w.inWriter.Close() err := w.inWriter.Close()
w.initReadPip() w.initReadPip()
return err return err
......
...@@ -56,8 +56,10 @@ func OnNamespaceConnected(c *neffos.NSConn, msg neffos.Message) error { ...@@ -56,8 +56,10 @@ func OnNamespaceConnected(c *neffos.NSConn, msg neffos.Message) error {
remoteIP = strings.Split(remoteAddr, ",")[0] remoteIP = strings.Split(remoteAddr, ",")[0]
logger.Infof("Accepted %s connect websocket from %s", user.Username, remoteIP) logger.Infof("Accepted %s connect websocket from %s", user.Username, remoteIP)
go func() { go func() {
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
for { for {
<-time.After(30 * time.Second) <-tick.C
if c.Conn.IsClosed() { if c.Conn.IsClosed() {
logger.Infof("User %s from %s websocket connect closed", user.Username, remoteIP) logger.Infof("User %s from %s websocket connect closed", user.Username, remoteIP)
return return
......
...@@ -33,14 +33,29 @@ type CmdParser struct { ...@@ -33,14 +33,29 @@ type CmdParser struct {
} }
func (cp *CmdParser) WriteData(p []byte) (int, error) { func (cp *CmdParser) WriteData(p []byte) (int, error) {
select {
case <-cp.closed:
return 0, io.EOF
default:
}
return cp.writer.Write(p) return cp.writer.Write(p)
} }
func (cp *CmdParser) Write(p []byte) (int, error) { func (cp *CmdParser) Write(p []byte) (int, error) {
select {
case <-cp.closed:
return 0, io.EOF
default:
}
return len(p), nil return len(p), nil
} }
func (cp *CmdParser) Read(p []byte) (int, error) { func (cp *CmdParser) Read(p []byte) (int, error) {
select {
case <-cp.closed:
return 0, io.EOF
default:
}
return cp.reader.Read(p) return cp.reader.Read(p)
} }
...@@ -51,6 +66,7 @@ func (cp *CmdParser) Close() error { ...@@ -51,6 +66,7 @@ func (cp *CmdParser) Close() error {
default: default:
close(cp.closed) close(cp.closed)
} }
_ = cp.reader.Close()
return cp.writer.Close() return cp.writer.Close()
} }
...@@ -94,7 +110,11 @@ func (cp *CmdParser) parsePS1(s string) string { ...@@ -94,7 +110,11 @@ func (cp *CmdParser) parsePS1(s string) string {
// Parse 解析命令或输出 // Parse 解析命令或输出
func (cp *CmdParser) Parse() string { func (cp *CmdParser) Parse() string {
cp.writer.Write([]byte("\r")) select {
case <-cp.closed:
default:
cp.writer.Write([]byte("\r"))
}
cp.lock.Lock() cp.lock.Lock()
defer cp.lock.Unlock() defer cp.lock.Unlock()
output := strings.TrimSpace(strings.Join(cp.currentLines, "\r\n")) output := strings.TrimSpace(strings.Join(cp.currentLines, "\r\n"))
...@@ -102,4 +122,4 @@ func (cp *CmdParser) Parse() string { ...@@ -102,4 +122,4 @@ func (cp *CmdParser) Parse() string {
cp.currentLines = make([]string, 0) cp.currentLines = make([]string, 0)
cp.currentLength = 0 cp.currentLength = 0
return output return output
} }
\ No newline at end of file
...@@ -60,6 +60,8 @@ func (c *CommandRecorder) record() { ...@@ -60,6 +60,8 @@ func (c *CommandRecorder) record() {
maxRetry := 0 maxRetry := 0
logger.Infof("Session %s: Command recorder start", c.sessionID) logger.Infof("Session %s: Command recorder start", c.sessionID)
defer logger.Infof("Session %s: Command recorder close", c.sessionID) defer logger.Infof("Session %s: Command recorder close", c.sessionID)
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for { for {
select { select {
case <-c.closed: case <-c.closed:
...@@ -74,7 +76,7 @@ func (c *CommandRecorder) record() { ...@@ -74,7 +76,7 @@ func (c *CommandRecorder) record() {
if len(cmdList) < 5 { if len(cmdList) < 5 {
continue continue
} }
case <-time.After(time.Second * 5): case <-tick.C:
if len(cmdList) == 0 { if len(cmdList) == 0 {
continue continue
} }
......
...@@ -152,10 +152,19 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo ...@@ -152,10 +152,19 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
go LoopRead(srvConn, srvInChan) go LoopRead(srvConn, srvInChan)
winCh := userConn.WinCh() winCh := userConn.WinCh()
maxIdleTime := s.MaxIdleTime * time.Minute
lastActiveTime := time.Now()
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
for { for {
select { select {
// 检测是否超过最大空闲时间 // 检测是否超过最大空闲时间
case <-time.After(s.MaxIdleTime * time.Minute): case <-tick.C:
now := time.Now()
outTime := lastActiveTime.Add(maxIdleTime)
if !now.After(outTime) {
continue
}
msg := fmt.Sprintf(i18n.T("Connect idle more than %d minutes, disconnect"), s.MaxIdleTime) msg := fmt.Sprintf(i18n.T("Connect idle more than %d minutes, disconnect"), s.MaxIdleTime)
logger.Debugf("Session idle more than %d minutes, disconnect: %s", s.MaxIdleTime, s.ID) logger.Debugf("Session idle more than %d minutes, disconnect: %s", s.MaxIdleTime, s.ID)
msg = utils.WrapperWarn(msg) msg = utils.WrapperWarn(msg)
...@@ -190,6 +199,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo ...@@ -190,6 +199,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
} }
_, err = srvConn.Write(p) _, err = srvConn.Write(p)
} }
lastActiveTime = time.Now()
} }
} }
......
...@@ -548,17 +548,12 @@ func (u *UserSftp) LoopPushFTPLog() { ...@@ -548,17 +548,12 @@ func (u *UserSftp) LoopPushFTPLog() {
dataChan := make(chan *model.FTPLog) dataChan := make(chan *model.FTPLog)
go u.SendFTPLog(dataChan) go u.SendFTPLog(dataChan)
defer close(dataChan) defer close(dataChan)
var timeoutSecond time.Duration
for {
switch len(ftpLogList) {
case 0:
timeoutSecond = time.Second * 60
default:
timeoutSecond = time.Second * 10
}
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for {
select { select {
case <-time.After(timeoutSecond): case <-tick.C:
case logData, ok := <-u.LogChan: case logData, ok := <-u.LogChan:
if !ok { if !ok {
return return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment