Commit d3f159c0 authored by 老广's avatar 老广 Committed by Eric_Lee

Dev (#104)

* fix SecureFX SFTP connection issues (#99)

* echo CORE_HOST value when jms_core not ready (#101)

* increase timeout seconds (#102)

* Add log info messages
parent 625655d3
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
while [ "$(curl -I -m 10 -o /dev/null -s -w %{http_code} $CORE_HOST)" != "302" ] while [ "$(curl -I -m 10 -o /dev/null -s -w %{http_code} $CORE_HOST)" != "302" ]
do do
echo "wait for jms_core ready" echo "wait for jms_core $CORE_HOST ready"
sleep 2 sleep 2
done done
......
...@@ -31,7 +31,7 @@ require ( ...@@ -31,7 +31,7 @@ require (
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 // indirect golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3
......
package common
import (
"bytes"
"io/ioutil"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform"
)
func GbkToUtf8(s []byte) ([]byte, error) {
reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewDecoder())
d, e := ioutil.ReadAll(reader)
if e != nil {
return nil, e
}
return d, nil
}
func Utf8ToGbk(s []byte) ([]byte, error) {
reader := transform.NewReader(bytes.NewReader(s), simplifiedchinese.GBK.NewEncoder())
d, e := ioutil.ReadAll(reader)
if e != nil {
return nil, e
}
return d, nil
}
...@@ -27,7 +27,7 @@ func SessionHandler(sess ssh.Session) { ...@@ -27,7 +27,7 @@ func SessionHandler(sess ssh.Session) {
ctx, cancel := cctx.NewContext(sess) ctx, cancel := cctx.NewContext(sess)
defer cancel() defer cancel()
handler := newInteractiveHandler(sess, ctx.User()) handler := newInteractiveHandler(sess, ctx.User())
logger.Debugf("User Request pty: %s %s", sess.User(), pty.Term) logger.Infof("Request %s: User %s request pty %s", handler.sess.ID(), sess.User(), pty.Term)
handler.Dispatch(ctx) handler.Dispatch(ctx)
} else { } else {
utils.IgnoreErrWriteString(sess, "No PTY requested.\n") utils.IgnoreErrWriteString(sess, "No PTY requested.\n")
...@@ -125,16 +125,11 @@ func (h *interactiveHandler) resumeWatchWinSize() { ...@@ -125,16 +125,11 @@ func (h *interactiveHandler) resumeWatchWinSize() {
func (h *interactiveHandler) Dispatch(ctx cctx.Context) { func (h *interactiveHandler) Dispatch(ctx cctx.Context) {
go h.watchWinSizeChange() go h.watchWinSizeChange()
defer logger.Infof("Request %s: User %s stop interactive", h.sess.ID(), h.user.Name)
for { for {
line, err := h.term.ReadLine() line, err := h.term.ReadLine()
if err != nil { if err != nil {
if err != io.EOF { logger.Debugf("User %s close connect", h.user.Name)
logger.Debug("User disconnected")
} else {
logger.Error("Read from user err: ", err)
}
break break
} }
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
...@@ -152,7 +147,7 @@ func (h *interactiveHandler) Dispatch(ctx cctx.Context) { ...@@ -152,7 +147,7 @@ func (h *interactiveHandler) Dispatch(ctx cctx.Context) {
case "r": case "r":
h.refreshAssetsAndNodesData() h.refreshAssetsAndNodesData()
case "q": case "q":
logger.Info("exit session") logger.Debugf("user %s enter to exit", h.user.Name)
return return
default: default:
h.searchAssetOrProxy(line) h.searchAssetOrProxy(line)
...@@ -160,7 +155,7 @@ func (h *interactiveHandler) Dispatch(ctx cctx.Context) { ...@@ -160,7 +155,7 @@ func (h *interactiveHandler) Dispatch(ctx cctx.Context) {
default: default:
switch { switch {
case line == "exit", line == "quit": case line == "exit", line == "quit":
logger.Info("exit session") logger.Debugf("user %s enter to exit", h.user.Name)
return return
case strings.Index(line, "/") == 0: case strings.Index(line, "/") == 0:
searchWord := strings.TrimSpace(line[1:]) searchWord := strings.TrimSpace(line[1:])
......
...@@ -6,11 +6,12 @@ import ( ...@@ -6,11 +6,12 @@ import (
"sync" "sync"
"github.com/gliderlabs/ssh" "github.com/gliderlabs/ssh"
"github.com/jumpserver/koko/pkg/logger" "github.com/jumpserver/koko/pkg/logger"
uuid "github.com/satori/go.uuid"
) )
type WrapperSession struct { type WrapperSession struct {
Uuid string
Sess ssh.Session Sess ssh.Session
inWriter io.WriteCloser inWriter io.WriteCloser
outReader io.ReadCloser outReader io.ReadCloser
...@@ -18,12 +19,12 @@ type WrapperSession struct { ...@@ -18,12 +19,12 @@ type WrapperSession struct {
} }
func (w *WrapperSession) initial() { func (w *WrapperSession) initial() {
w.Uuid = uuid.NewV4().String()
w.initReadPip() w.initReadPip()
go w.readLoop() go w.readLoop()
} }
func (w *WrapperSession) readLoop() { func (w *WrapperSession) readLoop() {
defer logger.Debug("session loop break")
buf := make([]byte, 1024*8) buf := make([]byte, 1024*8)
for { for {
nr, err := w.Sess.Read(buf) nr, err := w.Sess.Read(buf)
...@@ -38,7 +39,7 @@ func (w *WrapperSession) readLoop() { ...@@ -38,7 +39,7 @@ func (w *WrapperSession) readLoop() {
} }
} }
_ = w.inWriter.Close() _ = w.inWriter.Close()
logger.Infof("Request %s read loop break", w.Uuid)
} }
func (w *WrapperSession) Read(p []byte) (int, error) { func (w *WrapperSession) Read(p []byte) (int, error) {
...@@ -94,6 +95,10 @@ func (w *WrapperSession) Pty() ssh.Pty { ...@@ -94,6 +95,10 @@ func (w *WrapperSession) Pty() ssh.Pty {
return pty return pty
} }
func (w *WrapperSession) ID() string {
return w.Uuid
}
func NewWrapperSession(sess ssh.Session) *WrapperSession { func NewWrapperSession(sess ssh.Session) *WrapperSession {
w := &WrapperSession{ w := &WrapperSession{
Sess: sess, Sess: sess,
......
...@@ -76,3 +76,7 @@ func (c *Client) SetWinSize(size ssh.Window) { ...@@ -76,3 +76,7 @@ func (c *Client) SetWinSize(size ssh.Window) {
default: default:
} }
} }
func (c *Client) ID() string {
return c.Uuid
}
\ No newline at end of file
...@@ -92,7 +92,7 @@ func sftpHostConnectorView(wr http.ResponseWriter, req *http.Request) { ...@@ -92,7 +92,7 @@ func sftpHostConnectorView(wr http.ResponseWriter, req *http.Request) {
} }
addUserVolume(sid, userV) addUserVolume(sid, userV)
} }
logger.Debugf("Elfinder connector sid: %s", sid) logger.Debugf("Elfinder connector sid: %s connected", sid)
conf := config.GetConf() conf := config.GetConf()
maxSize := common.ConvertSizeToBytes(conf.ZipMaxSize) maxSize := common.ConvertSizeToBytes(conf.ZipMaxSize)
options := map[string]string{ options := map[string]string{
......
...@@ -7,13 +7,14 @@ import ( ...@@ -7,13 +7,14 @@ import (
) )
func OnELFinderConnect(c *neffos.NSConn, msg neffos.Message) error { func OnELFinderConnect(c *neffos.NSConn, msg neffos.Message) error {
logger.Infof("Request %s: web folder ws connect", c.Conn.ID())
data := EmitSidMsg{Sid: c.Conn.ID()} data := EmitSidMsg{Sid: c.Conn.ID()}
c.Emit("data", neffos.Marshal(data)) c.Emit("data", neffos.Marshal(data))
return nil return nil
} }
func OnELFinderDisconnect(c *neffos.NSConn, msg neffos.Message) (error) { func OnELFinderDisconnect(c *neffos.NSConn, msg neffos.Message) (error) {
logger.Debug("disconnect: ", c.Conn.ID()) logger.Infof("Request %s: web folder ws disconnect", c.Conn.ID())
removeUserVolume(c.Conn.ID()) removeUserVolume(c.Conn.ID())
return nil return nil
} }
...@@ -136,8 +136,8 @@ func OnHostHandler(c *neffos.NSConn, msg neffos.Message) (err error) { ...@@ -136,8 +136,8 @@ func OnHostHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
Asset: &asset, SystemUser: &systemUser, Asset: &asset, SystemUser: &systemUser,
} }
go func() { go func() {
defer logger.Debug("Web proxy process end") defer logger.Infof("Request %s: Web ssh end proxy process", client.Uuid)
logger.Debug("Web ssh start proxy to host") logger.Infof("Request %s: Web ssh start proxy to host", client.Uuid)
proxySrv.Proxy() proxySrv.Proxy()
logoutMsg, _ := json.Marshal(RoomMsg{Room: roomID}) logoutMsg, _ := json.Marshal(RoomMsg{Room: roomID})
// 服务器主动退出 // 服务器主动退出
......
...@@ -26,14 +26,21 @@ var ( ...@@ -26,14 +26,21 @@ var (
charEnter = []byte("\r") charEnter = []byte("\r")
) )
func newParser() *Parser { const (
parser := &Parser{} CommandInputParserName = "Command Input parser"
CommandOutputParserName = "Command output parser"
)
func newParser(sid string) *Parser {
parser := &Parser{id: sid}
parser.initial() parser.initial()
return parser return parser
} }
// Parse 解析用户输入输出, 拦截过滤用户输入输出 // Parse 解析用户输入输出, 拦截过滤用户输入输出
type Parser struct { type Parser struct {
id string
userOutputChan chan []byte userOutputChan chan []byte
srvOutputChan chan []byte srvOutputChan chan []byte
cmdRecordChan chan [2]string cmdRecordChan chan [2]string
...@@ -59,8 +66,8 @@ func (p *Parser) initial() { ...@@ -59,8 +66,8 @@ func (p *Parser) initial() {
p.once = new(sync.Once) p.once = new(sync.Once)
p.lock = new(sync.RWMutex) p.lock = new(sync.RWMutex)
p.cmdInputParser = NewCmdParser() p.cmdInputParser = NewCmdParser(p.id, CommandInputParserName)
p.cmdOutputParser = NewCmdParser() p.cmdOutputParser = NewCmdParser(p.id, CommandOutputParserName)
p.closed = make(chan struct{}) p.closed = make(chan struct{})
p.cmdRecordChan = make(chan [2]string, 1024) p.cmdRecordChan = make(chan [2]string, 1024)
...@@ -81,7 +88,7 @@ func (p *Parser) ParseStream(userInChan, srvInChan <-chan []byte) (userOut, srvO ...@@ -81,7 +88,7 @@ func (p *Parser) ParseStream(userInChan, srvInChan <-chan []byte) (userOut, srvO
close(p.srvOutputChan) close(p.srvOutputChan)
_ = p.cmdOutputParser.Close() _ = p.cmdOutputParser.Close()
_ = p.cmdInputParser.Close() _ = p.cmdInputParser.Close()
logger.Debug("Parser parse stream routine done") logger.Infof("Session %s parser routine done", p.id)
}() }()
for { for {
select { select {
......
...@@ -12,13 +12,16 @@ import ( ...@@ -12,13 +12,16 @@ import (
var ps1Pattern = regexp.MustCompile(`^\[?.*@.*\]?[\\$#]\s|mysql>\s`) var ps1Pattern = regexp.MustCompile(`^\[?.*@.*\]?[\\$#]\s|mysql>\s`)
func NewCmdParser() *CmdParser { func NewCmdParser(sid, name string) *CmdParser {
parser := &CmdParser{} parser := &CmdParser{id: sid, name:name}
parser.initial() parser.initial()
return parser return parser
} }
type CmdParser struct { type CmdParser struct {
id string
name string
term *utils.Terminal term *utils.Terminal
reader io.ReadCloser reader io.ReadCloser
writer io.WriteCloser writer io.WriteCloser
...@@ -62,8 +65,8 @@ func (cp *CmdParser) initial() { ...@@ -62,8 +65,8 @@ func (cp *CmdParser) initial() {
cp.term = utils.NewTerminal(cp, "") cp.term = utils.NewTerminal(cp, "")
cp.term.SetEcho(false) cp.term.SetEcho(false)
go func() { go func() {
logger.Debug("command Parser start") logger.Infof("Session %s: %s start", cp.id, cp.name)
defer logger.Debug("command Parser close") defer logger.Infof("Session %s: %s parser close", cp.id, cp.name)
inloop: inloop:
for { for {
line, err := cp.term.ReadLine() line, err := cp.term.ReadLine()
......
...@@ -212,17 +212,20 @@ func (p *ProxyServer) Proxy() { ...@@ -212,17 +212,20 @@ func (p *ProxyServer) Proxy() {
if !p.preCheckRequisite() { if !p.preCheckRequisite() {
return return
} }
srvConn, err := p.getServerConn()
// 连接后端服务器失败
if err != nil {
p.sendConnectErrorMsg(err)
return
}
// 创建Session // 创建Session
sw, err := CreateSession(p) sw, err := CreateSession(p)
if err != nil { if err != nil {
logger.Error("Create session failed.")
return return
} }
defer RemoveSession(sw) defer RemoveSession(sw)
srvConn, err := p.getServerConn()
// 连接后端服务器失败
if err != nil {
p.sendConnectErrorMsg(err)
return
}
logger.Infof("Session %s bridge start", sw.ID)
_ = sw.Bridge(p.UserConn, srvConn) _ = sw.Bridge(p.UserConn, srvConn)
logger.Infof("Session %s bridge end", sw.ID)
} }
...@@ -49,7 +49,7 @@ func (s *SwitchSession) Initial() { ...@@ -49,7 +49,7 @@ func (s *SwitchSession) Initial() {
s.MaxIdleTime = config.GetConf().MaxIdleTime s.MaxIdleTime = config.GetConf().MaxIdleTime
s.cmdRecorder = NewCommandRecorder(s.ID) s.cmdRecorder = NewCommandRecorder(s.ID)
s.replayRecorder = NewReplyRecord(s.ID) s.replayRecorder = NewReplyRecord(s.ID)
s.parser = newParser() s.parser = newParser(s.ID)
s.ctx, s.cancel = context.WithCancel(context.Background()) s.ctx, s.cancel = context.WithCancel(context.Background())
} }
...@@ -63,6 +63,7 @@ func (s *SwitchSession) Terminate() { ...@@ -63,6 +63,7 @@ func (s *SwitchSession) Terminate() {
} }
func (s *SwitchSession) recordCommand() { func (s *SwitchSession) recordCommand() {
logger.Infof("Session %s record command start", s.ID)
for command := range s.parser.cmdRecordChan { for command := range s.parser.cmdRecordChan {
if command[0] == "" { if command[0] == "" {
continue continue
...@@ -70,6 +71,7 @@ func (s *SwitchSession) recordCommand() { ...@@ -70,6 +71,7 @@ func (s *SwitchSession) recordCommand() {
cmd := s.generateCommandResult(command) cmd := s.generateCommandResult(command)
s.cmdRecorder.Record(cmd) s.cmdRecorder.Record(cmd)
} }
logger.Infof("Session %s record command stop", s.ID)
} }
// generateCommandResult 生成命令结果 // generateCommandResult 生成命令结果
...@@ -120,7 +122,6 @@ func (s *SwitchSession) SetFilterRules(cmdRules []model.SystemUserFilterRule) { ...@@ -120,7 +122,6 @@ func (s *SwitchSession) SetFilterRules(cmdRules []model.SystemUserFilterRule) {
func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerConnection) (err error) { func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerConnection) (err error) {
winCh := userConn.WinCh() winCh := userConn.WinCh()
defer func() { defer func() {
logger.Info("Session bridge done: ", s.ID)
_ = userConn.Close() _ = userConn.Close()
_ = srvConn.Close() _ = srvConn.Close()
s.postBridge() s.postBridge()
......
package proxy
import (
"io"
"github.com/jumpserver/koko/pkg/logger"
)
type Transport interface {
io.WriteCloser
Name() string
Chan() <-chan []byte
}
type DirectTransport struct {
name string
readWriter io.ReadWriteCloser
ch chan []byte
closed bool
}
func (dt *DirectTransport) Name() string {
return dt.name
}
func (dt *DirectTransport) Write(p []byte) (n int, err error) {
return dt.readWriter.Write(p)
}
func (dt *DirectTransport) Close() error {
if dt.closed {
return nil
}
logger.Debug("Close transport")
dt.closed = true
return dt.readWriter.Close()
}
func (dt *DirectTransport) Chan() <-chan []byte {
return dt.ch
}
func (dt *DirectTransport) Keep() {
for {
buf := make([]byte, 1024)
n, err := dt.readWriter.Read(buf)
if err != nil {
_ = dt.Close()
break
}
dt.ch <- buf[:n]
}
close(dt.ch)
}
func NewDirectTransport(name string, readWriter io.ReadWriteCloser) Transport {
ch := make(chan []byte, 1024)
tr := DirectTransport{readWriter: readWriter, ch: ch}
go tr.Keep()
return &tr
}
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
type UserConnection interface { type UserConnection interface {
io.ReadWriteCloser io.ReadWriteCloser
ID() string
WinCh() <-chan ssh.Window WinCh() <-chan ssh.Window
LoginFrom() string LoginFrom() string
RemoteAddr() string RemoteAddr() string
......
...@@ -554,7 +554,7 @@ func (u *UserSftp) LoopPushFTPLog() { ...@@ -554,7 +554,7 @@ func (u *UserSftp) LoopPushFTPLog() {
case 0: case 0:
timeoutSecond = time.Second * 60 timeoutSecond = time.Second * 60
default: default:
timeoutSecond = time.Second * 1 timeoutSecond = time.Second * 10
} }
select { select {
......
...@@ -51,5 +51,5 @@ func StopServer() { ...@@ -51,5 +51,5 @@ func StopServer() {
if err != nil { if err != nil {
logger.Errorf("SSH server close failed: %s", err.Error()) logger.Errorf("SSH server close failed: %s", err.Error())
} }
logger.Debug("Close ssh server") logger.Info("Close ssh server")
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment