Commit ec62b2fa authored by ibuler's avatar ibuler

[Update] 修改 recorder到proxy中

parent b38e8c9c
...@@ -396,7 +396,7 @@ func (i *InteractiveHandler) Proxy(ctx context.Context) { ...@@ -396,7 +396,7 @@ func (i *InteractiveHandler) Proxy(ctx context.Context) {
// Home := userhome.NewUserSessionHome(sshConn) // Home := userhome.NewUserSessionHome(sshConn)
// logger.Info("session Home ID: ", Home.SessionID()) // logger.Info("session Home ID: ", Home.SessionID())
// //
// err = proxy.Manager.Switch(i.sess.Context(), Home, memChan) // err = proxy.Manager.Session(i.sess.Context(), Home, memChan)
// if err != nil { // if err != nil {
// logger.Error(err) // logger.Error(err)
// } // }
......
...@@ -2,8 +2,9 @@ package proxy ...@@ -2,8 +2,9 @@ package proxy
import ( import (
"bytes" "bytes"
"fmt" "cocogo/pkg/recorder"
"sync" "sync"
"time"
"cocogo/pkg/logger" "cocogo/pkg/logger"
"cocogo/pkg/model" "cocogo/pkg/model"
...@@ -26,12 +27,11 @@ var ( ...@@ -26,12 +27,11 @@ var (
// Parse 解析用户输入输出, 拦截过滤用户输入输出 // Parse 解析用户输入输出, 拦截过滤用户输入输出
type Parser struct { type Parser struct {
session *Session
inputBuf *bytes.Buffer inputBuf *bytes.Buffer
cmdBuf *bytes.Buffer cmdBuf *bytes.Buffer
outputBuf *bytes.Buffer outputBuf *bytes.Buffer
cmdFilterRules []model.SystemUserFilterRule
inputInitial bool inputInitial bool
inputPreState bool inputPreState bool
inputState bool inputState bool
...@@ -44,6 +44,10 @@ type Parser struct { ...@@ -44,6 +44,10 @@ type Parser struct {
cmdInputParser *CmdParser cmdInputParser *CmdParser
cmdOutputParser *CmdParser cmdOutputParser *CmdParser
counter int counter int
cmdFilterRules []model.SystemUserFilterRule
commandRecorder *recorder.CommandRecorder
replayRecorder *recorder.ReplyRecorder
} }
func (p *Parser) Initial() { func (p *Parser) Initial() {
...@@ -61,7 +65,7 @@ func (p *Parser) Initial() { ...@@ -61,7 +65,7 @@ func (p *Parser) Initial() {
// Todo: parseMultipleInput 依然存在问题 // Todo: parseMultipleInput 依然存在问题
// parseInputState 切换用户输入状态 // parseInputState 切换用户输入状态, 并结算命令和结果
func (p *Parser) parseInputState(b []byte) { func (p *Parser) parseInputState(b []byte) {
if p.inVimState || p.zmodemState != "" { if p.inVimState || p.zmodemState != "" {
return return
...@@ -69,22 +73,22 @@ func (p *Parser) parseInputState(b []byte) { ...@@ -69,22 +73,22 @@ func (p *Parser) parseInputState(b []byte) {
p.inputPreState = p.inputState p.inputPreState = p.inputState
if bytes.Contains(b, charEnter) { if bytes.Contains(b, charEnter) {
p.inputState = false p.inputState = false
// 用户输入了Enter,开始结算命令
p.parseCmdInput() p.parseCmdInput()
} else { } else {
p.inputState = true p.inputState = true
// 用户又开始输入,并上次不处于输入状态,开始结算上次命令的结果
if !p.inputPreState { if !p.inputPreState {
p.parseCmdOutput() p.parseCmdOutput()
// 开始记录命令
p.recordCommand()
} }
} }
} }
func (p *Parser) parseCmdInput() { func (p *Parser) parseCmdInput() {
parser := CmdParser{}
parser.Initial()
data := p.cmdBuf.Bytes() data := p.cmdBuf.Bytes()
line := parser.Parse(data) p.command = p.cmdInputParser.Parse(data)
data2 := fmt.Sprintf("[%d] 命令: %s\n", p.counter, line)
fmt.Printf(data2)
p.cmdBuf.Reset() p.cmdBuf.Reset()
p.inputBuf.Reset() p.inputBuf.Reset()
p.counter += 1 p.counter += 1
...@@ -92,14 +96,11 @@ func (p *Parser) parseCmdInput() { ...@@ -92,14 +96,11 @@ func (p *Parser) parseCmdInput() {
func (p *Parser) parseCmdOutput() { func (p *Parser) parseCmdOutput() {
data := p.outputBuf.Bytes() data := p.outputBuf.Bytes()
line := p.cmdOutputParser.Parse(data) p.output = p.cmdOutputParser.Parse(data)
data2 := fmt.Sprintf("[%d] 结果: %s\n", p.counter, line)
_ = fmt.Sprintf("[%d] 结果: %s\n", p.counter, line)
fmt.Printf(data2)
p.outputBuf.Reset() p.outputBuf.Reset()
} }
func (p *Parser) parseInputNewLine(b []byte) []byte { func (p *Parser) replaceInputNewLine(b []byte) []byte {
b = bytes.Replace(b, []byte{'\r', '\r', '\n'}, []byte{'\r'}, -1) b = bytes.Replace(b, []byte{'\r', '\r', '\n'}, []byte{'\r'}, -1)
b = bytes.Replace(b, []byte{'\r', '\n'}, []byte{'\r'}, -1) b = bytes.Replace(b, []byte{'\r', '\n'}, []byte{'\r'}, -1)
b = bytes.Replace(b, []byte{'\n'}, []byte{'\r'}, -1) b = bytes.Replace(b, []byte{'\n'}, []byte{'\r'}, -1)
...@@ -110,23 +111,12 @@ func (p *Parser) ParseUserInput(b []byte) []byte { ...@@ -110,23 +111,12 @@ func (p *Parser) ParseUserInput(b []byte) []byte {
p.once.Do(func() { p.once.Do(func() {
p.inputInitial = true p.inputInitial = true
}) })
nb := p.parseInputNewLine(b) nb := p.replaceInputNewLine(b)
p.inputBuf.Write(nb) p.inputBuf.Write(nb)
p.parseInputState(nb) p.parseInputState(nb)
return b return b
} }
func (p *Parser) parseVimState(b []byte) {
if p.zmodemState == "" && !p.inVimState && bytes.Contains(b, vimEnterMark) {
p.inVimState = true
logger.Debug("In vim state: true")
}
if p.zmodemState == "" && p.inVimState && bytes.Contains(b, vimExitMark) {
p.inVimState = false
logger.Debug("In vim state: false")
}
}
func (p *Parser) parseZmodemState(b []byte) { func (p *Parser) parseZmodemState(b []byte) {
if len(b) < 25 { if len(b) < 25 {
return return
...@@ -150,8 +140,22 @@ func (p *Parser) parseZmodemState(b []byte) { ...@@ -150,8 +140,22 @@ func (p *Parser) parseZmodemState(b []byte) {
} }
} }
func (p *Parser) parseCommand(b []byte) { func (p *Parser) parseVimState(b []byte) {
if !p.inputInitial { if p.zmodemState == "" && !p.inVimState && bytes.Contains(b, vimEnterMark) {
p.inVimState = true
logger.Debug("In vim state: true")
}
if p.zmodemState == "" && p.inVimState && bytes.Contains(b, vimExitMark) {
p.inVimState = false
logger.Debug("In vim state: false")
}
}
// splitCmdStream 将服务器输出流分离到命令buffer和命令输出buffer
func (p *Parser) splitCmdStream(b []byte) {
p.parseVimState(b)
p.parseZmodemState(b)
if p.zmodemState != "" || p.inVimState || p.inputInitial {
return return
} }
if p.inputState { if p.inputState {
...@@ -162,9 +166,7 @@ func (p *Parser) parseCommand(b []byte) { ...@@ -162,9 +166,7 @@ func (p *Parser) parseCommand(b []byte) {
} }
func (p *Parser) ParseServerOutput(b []byte) []byte { func (p *Parser) ParseServerOutput(b []byte) []byte {
p.parseVimState(b) p.splitCmdStream(b)
p.parseZmodemState(b)
p.parseCommand(b)
return b return b
} }
...@@ -172,10 +174,29 @@ func (p *Parser) SetCMDFilterRules(rules []model.SystemUserFilterRule) { ...@@ -172,10 +174,29 @@ func (p *Parser) SetCMDFilterRules(rules []model.SystemUserFilterRule) {
p.cmdFilterRules = rules p.cmdFilterRules = rules
} }
func (p *Parser) SetReplayRecorder() { func (p *Parser) SetReplayRecorder(recorder *recorder.ReplyRecorder) {
p.replayRecorder = recorder
} }
func (p *Parser) SetCommandRecorder() { func (p *Parser) recordCommand() {
cmd := &recorder.Command{
SessionId: p.session.Id,
OrgId: p.session.Org,
Input: p.command,
Output: p.output,
User: p.session.User,
Server: p.session.Server,
SystemUser: p.session.SystemUser,
Timestamp: time.Now(),
}
p.commandRecorder.Record(cmd)
}
func (p *Parser) SetCommandRecorder(recorder *recorder.CommandRecorder) {
p.commandRecorder = recorder
}
func (p *Parser) recordReplay(b []byte) {
p.replayRecorder.Record(b)
} }
package proxy package proxy
import ( import (
"cocogo/pkg/recorder"
"fmt" "fmt"
"io" "io"
"strings" "strings"
...@@ -99,7 +100,9 @@ func (p *ProxyServer) Proxy() { ...@@ -99,7 +100,9 @@ func (p *ProxyServer) Proxy() {
logger.Error("Get system user filter rule error: ", err) logger.Error("Get system user filter rule error: ", err)
} }
sw.parser.SetCMDFilterRules(cmdRules) sw.parser.SetCMDFilterRules(cmdRules)
sw.parser.SetReplayRecorder() replayRecorder := recorder.NewReplyRecord(sw.Id)
sw.parser.SetReplayRecorder(replayRecorder)
cmdR
sw.parser.SetCommandRecorder() sw.parser.SetCommandRecorder()
_ = sw.Bridge() _ = sw.Bridge()
_ = srvConn.Close() _ = srvConn.Close()
......
package recorder package proxy
import ( import (
"compress/gzip" "compress/gzip"
...@@ -12,9 +12,36 @@ import ( ...@@ -12,9 +12,36 @@ import (
"time" "time"
"cocogo/pkg/config" "cocogo/pkg/config"
"cocogo/pkg/recorder/storage" "cocogo/pkg/logger"
) )
type CommandRecorder struct {
Session *Session
}
func NewCommandRecorder(sess *Session) (recorder *CommandRecorder) {
return &CommandRecorder{Session: sess}
}
type Command struct {
SessionId string `json:"session"`
OrgId string `json:"org_id"`
Input string `json:"input"`
Output string `json:"output"`
User string `json:"user"`
Server string `json:"asset"`
SystemUser string `json:"system_user"`
Timestamp time.Time `json:"timestamp"`
}
func (c *CommandRecorder) Record(cmd *Command) {
data, err := json.MarshalIndent(cmd, "", " ")
if err != nil {
logger.Error("Marshal command error: ", err)
}
fmt.Printf("Record cmd: %s\n", data)
}
var conf = config.Conf var conf = config.Conf
func NewReplyRecord(sessionID string) *ReplyRecorder { func NewReplyRecord(sessionID string) *ReplyRecorder {
...@@ -70,7 +97,7 @@ func (r *ReplyRecorder) End(ctx context.Context) { ...@@ -70,7 +97,7 @@ func (r *ReplyRecorder) End(ctx context.Context) {
func (r *ReplyRecorder) uploadReplay() { func (r *ReplyRecorder) uploadReplay() {
_ = GzipCompressFile(r.absFilePath, r.absGzFilePath) _ = GzipCompressFile(r.absFilePath, r.absGzFilePath)
if store := storage.NewStorageServer(); store != nil { if store := NewStorageServer(); store != nil {
store.Upload(r.absGzFilePath, r.target) store.Upload(r.absGzFilePath, r.target)
} }
_ = os.Remove(r.absFilePath) _ = os.Remove(r.absFilePath)
......
package storage package proxy
//var client = service.Client type ReplayStorage interface {
Upload(gZipFile, target string)
}
func NewStorageServer() ReplayStorage {
return nil
}
func NewJmsStorage() ReplayStorage { func NewJmsStorage() ReplayStorage {
//appService := auth.GetGlobalService() //appService := auth.GetGlobalService()
......
...@@ -8,14 +8,14 @@ import ( ...@@ -8,14 +8,14 @@ import (
"time" "time"
) )
func NewSwitch(userConn UserConnection, serverConn ServerConnection) (sw *Switch) { func NewSwitch(userConn UserConnection, serverConn ServerConnection) (sw *Session) {
parser := new(Parser) parser := new(Parser)
parser.Initial() parser.Initial()
sw = &Switch{userConn: userConn, serverConn: serverConn, parser: parser} sw = &Session{userConn: userConn, serverConn: serverConn, parser: parser}
return sw return sw
} }
type Switch struct { type Session struct {
Id string Id string
User string `json:"user"` User string `json:"user"`
Server string `json:"asset"` Server string `json:"asset"`
...@@ -37,7 +37,7 @@ type Switch struct { ...@@ -37,7 +37,7 @@ type Switch struct {
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
} }
func (s *Switch) Initial() { func (s *Session) Initial() {
s.Id = uuid.NewV4().String() s.Id = uuid.NewV4().String()
s.User = s.userConn.User() s.User = s.userConn.User()
s.Server = s.serverConn.Name() s.Server = s.serverConn.Name()
...@@ -47,15 +47,15 @@ func (s *Switch) Initial() { ...@@ -47,15 +47,15 @@ func (s *Switch) Initial() {
s.DateStart = time.Now() s.DateStart = time.Now()
} }
func (s *Switch) preBridge() { func (s *Session) preBridge() {
} }
func (s *Switch) postBridge() { func (s *Session) postBridge() {
} }
func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) { func (s *Session) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) {
defer func() { defer func() {
logger.Debug("Watch window change routine end") logger.Debug("Watch window change routine end")
}() }()
...@@ -77,7 +77,7 @@ func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) ...@@ -77,7 +77,7 @@ func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window)
} }
} }
func (s *Switch) readUserToServer(ctx context.Context) { func (s *Session) readUserToServer(ctx context.Context) {
defer func() { defer func() {
logger.Debug("Read user to server end") logger.Debug("Read user to server end")
}() }()
...@@ -99,7 +99,7 @@ func (s *Switch) readUserToServer(ctx context.Context) { ...@@ -99,7 +99,7 @@ func (s *Switch) readUserToServer(ctx context.Context) {
} }
} }
func (s *Switch) readServerToUser(ctx context.Context) { func (s *Session) readServerToUser(ctx context.Context) {
defer func() { defer func() {
logger.Debug("Read server to user end") logger.Debug("Read server to user end")
}() }()
...@@ -121,7 +121,7 @@ func (s *Switch) readServerToUser(ctx context.Context) { ...@@ -121,7 +121,7 @@ func (s *Switch) readServerToUser(ctx context.Context) {
} }
} }
func (s *Switch) Bridge() (err error) { func (s *Session) Bridge() (err error) {
winCh := s.userConn.WinCh() winCh := s.userConn.WinCh()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s.cancelFunc = cancel s.cancelFunc = cancel
...@@ -131,6 +131,6 @@ func (s *Switch) Bridge() (err error) { ...@@ -131,6 +131,6 @@ func (s *Switch) Bridge() (err error) {
go s.watchWindowChange(ctx, winCh) go s.watchWindowChange(ctx, winCh)
go s.readServerToUser(ctx) go s.readServerToUser(ctx)
s.readUserToServer(ctx) s.readUserToServer(ctx)
logger.Debug("Switch bridge end") logger.Debug("Session bridge end")
return return
} }
package recorder
import (
"time"
)
type CommandRecorder struct {
SessionID string
StartTime time.Time
}
type Command struct {
SessionId string `json:"session"`
OrgId string `json:"org_id"`
Input string `json:"input"`
Output string `json:"output"`
User string `json:"user"`
Server string `json:"asset"`
SystemUser string `json:"system_user"`
Timestamp time.Time `json:"timestamp"`
}
func (c *CommandRecorder) Record(cmd *Command) {
}
package storage
type ReplayStorage interface {
Upload(gZipFile, target string)
}
func NewStorageServer() ReplayStorage {
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment