Commit 4864ccfb authored by ibuler's avatar ibuler

[Update] 修改config

parent 6607637f
...@@ -59,6 +59,10 @@ func (c *Client) SetAuth(auth ClientAuth) { ...@@ -59,6 +59,10 @@ func (c *Client) SetAuth(auth ClientAuth) {
c.Auth = auth c.Auth = auth
} }
func (c *Client) SetHeader(k, v string) {
c.Headers[k] = v
}
func (c *Client) marshalData(data interface{}) (reader io.Reader, error error) { func (c *Client) marshalData(data interface{}) (reader io.Reader, error error) {
dataRaw, err := json.Marshal(data) dataRaw, err := json.Marshal(data)
if err != nil { if err != nil {
......
...@@ -27,7 +27,7 @@ func SessionHandler(sess ssh.Session) { ...@@ -27,7 +27,7 @@ func SessionHandler(sess ssh.Session) {
ctx, cancel := cctx.NewContext(sess) ctx, cancel := cctx.NewContext(sess)
defer cancel() defer cancel()
handler := newInteractiveHandler(sess, ctx.User()) handler := newInteractiveHandler(sess, ctx.User())
logger.Debugf("User Request pty: %s %s", sess.User(), pty.Term) logger.Debugf("User Request pty %s: %s", pty.Term, sess.User())
handler.Dispatch(ctx) handler.Dispatch(ctx)
} else { } else {
utils.IgnoreErrWriteString(sess, "No PTY requested.\n") utils.IgnoreErrWriteString(sess, "No PTY requested.\n")
......
...@@ -2,6 +2,7 @@ package proxy ...@@ -2,6 +2,7 @@ package proxy
import ( import (
"bytes" "bytes"
"cocogo/pkg/i18n"
"fmt" "fmt"
"sync" "sync"
...@@ -25,6 +26,12 @@ var ( ...@@ -25,6 +26,12 @@ var (
charEnter = []byte("\r") charEnter = []byte("\r")
) )
func newParser() *Parser {
parser := &Parser{}
parser.initial()
return parser
}
// Parse 解析用户输入输出, 拦截过滤用户输入输出 // Parse 解析用户输入输出, 拦截过滤用户输入输出
type Parser struct { type Parser struct {
inputBuf *bytes.Buffer inputBuf *bytes.Buffer
...@@ -35,8 +42,7 @@ type Parser struct { ...@@ -35,8 +42,7 @@ type Parser struct {
userOutputChan chan []byte userOutputChan chan []byte
srvInputChan chan []byte srvInputChan chan []byte
srvOutputChan chan []byte srvOutputChan chan []byte
cmdChan chan [2]string
cmdChan chan [2]string
inputInitial bool inputInitial bool
inputPreState bool inputPreState bool
...@@ -54,7 +60,7 @@ type Parser struct { ...@@ -54,7 +60,7 @@ type Parser struct {
closed bool closed bool
} }
func (p *Parser) Initial() { func (p *Parser) initial() {
p.inputBuf = new(bytes.Buffer) p.inputBuf = new(bytes.Buffer)
p.cmdBuf = new(bytes.Buffer) p.cmdBuf = new(bytes.Buffer)
p.outputBuf = new(bytes.Buffer) p.outputBuf = new(bytes.Buffer)
...@@ -65,6 +71,12 @@ func (p *Parser) Initial() { ...@@ -65,6 +71,12 @@ func (p *Parser) Initial() {
p.cmdOutputParser = &CmdParser{} p.cmdOutputParser = &CmdParser{}
p.cmdInputParser.Initial() p.cmdInputParser.Initial()
p.cmdOutputParser.Initial() p.cmdOutputParser.Initial()
p.userInputChan = make(chan []byte, 1024)
p.userOutputChan = make(chan []byte, 1024)
p.srvInputChan = make(chan []byte, 1024)
p.srvOutputChan = make(chan []byte, 1024)
p.cmdChan = make(chan [2]string, 1024)
} }
func (p *Parser) Parse() { func (p *Parser) Parse() {
...@@ -103,7 +115,8 @@ func (p *Parser) parseInputState(b []byte) []byte { ...@@ -103,7 +115,8 @@ func (p *Parser) parseInputState(b []byte) []byte {
// 用户输入了Enter,开始结算命令 // 用户输入了Enter,开始结算命令
p.parseCmdInput() p.parseCmdInput()
if p.IsCommandForbidden() { if p.IsCommandForbidden() {
p.srvOutputChan <- []byte("\r\nCommand ls is forbidden") fbdMsg := utils.WrapperWarn(fmt.Sprintf(i18n.T("Command `%s` is forbidden"), p.command))
p.srvOutputChan <- []byte("\r\n" + fbdMsg)
return []byte{utils.CharCleanLine, '\r'} return []byte{utils.CharCleanLine, '\r'}
} }
} else { } else {
......
package proxy package proxy
import ( import (
"cocogo/pkg/common" "encoding/json"
"cocogo/pkg/logger" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
"cocogo/pkg/common"
"cocogo/pkg/config" "cocogo/pkg/config"
"cocogo/pkg/logger"
"cocogo/pkg/model" "cocogo/pkg/model"
) )
...@@ -25,13 +27,16 @@ func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) { ...@@ -25,13 +27,16 @@ func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) {
storage := NewCommandStorage() storage := NewCommandStorage()
recorder = &CommandRecorder{Session: sess, queue: make(chan *model.Command, 10), storage: storage} recorder = &CommandRecorder{Session: sess, queue: make(chan *model.Command, 10), storage: storage}
go recorder.record() go recorder.record()
recorder.Start()
return recorder return recorder
} }
func NewReplyRecord(sess *SwitchSession) *ReplyRecorder { func NewReplyRecord(sess *SwitchSession) (recorder *ReplyRecorder) {
storage := NewReplayStorage() storage := NewReplayStorage()
srvStorage := &ServerReplayStorage{} srvStorage := &ServerReplayStorage{}
return &ReplyRecorder{SessionID: sess.Id, storage: storage, backOffStorage: srvStorage} recorder = &ReplyRecorder{SessionID: sess.Id, storage: storage, backOffStorage: srvStorage}
recorder.Start()
return recorder
} }
func (c *CommandRecorder) Record(command [2]string) { func (c *CommandRecorder) Record(command [2]string) {
...@@ -62,7 +67,11 @@ func (c *CommandRecorder) record() { ...@@ -62,7 +67,11 @@ func (c *CommandRecorder) record() {
cmdList := make([]*model.Command, 0) cmdList := make([]*model.Command, 0)
for { for {
select { select {
case p := <-c.queue: case p, ok := <-c.queue:
if !ok {
logger.Debug("Session command recorder close: ", c.Session.Id)
return
}
cmdList = append(cmdList, p) cmdList = append(cmdList, p)
if len(cmdList) < 5 { if len(cmdList) < 5 {
continue continue
...@@ -91,13 +100,18 @@ type ReplyRecorder struct { ...@@ -91,13 +100,18 @@ type ReplyRecorder struct {
absGzFilePath string absGzFilePath string
target string target string
file *os.File file *os.File
StartTime time.Time timeStartNano int64
storage ReplayStorage storage ReplayStorage
backOffStorage ReplayStorage backOffStorage ReplayStorage
} }
func (r *ReplyRecorder) Record(b []byte) { func (r *ReplyRecorder) Record(b []byte) {
if len(b) > 0 {
delta := float64(time.Now().UnixNano()-r.timeStartNano) / 1000 / 1000 / 1000
data, _ := json.Marshal(string(b))
_, _ = r.file.WriteString(fmt.Sprintf(`"%.3f":%s`, delta, data))
}
} }
func (r *ReplyRecorder) Start() { func (r *ReplyRecorder) Start() {
...@@ -109,6 +123,7 @@ func (r *ReplyRecorder) Start() { ...@@ -109,6 +123,7 @@ func (r *ReplyRecorder) Start() {
r.absFilePath = filepath.Join(replayDir, r.SessionID) r.absFilePath = filepath.Join(replayDir, r.SessionID)
r.absGzFilePath = filepath.Join(replayDir, today, gzFileName) r.absGzFilePath = filepath.Join(replayDir, today, gzFileName)
r.target = strings.Join([]string{today, gzFileName}, "/") r.target = strings.Join([]string{today, gzFileName}, "/")
r.timeStartNano = time.Now().UnixNano()
err := common.EnsureDirExist(replayDir) err := common.EnsureDirExist(replayDir)
if err != nil { if err != nil {
...@@ -116,6 +131,7 @@ func (r *ReplyRecorder) Start() { ...@@ -116,6 +131,7 @@ func (r *ReplyRecorder) Start() {
return return
} }
logger.Debug("Replay file path: ", r.absFilePath)
r.file, err = os.Create(r.absFilePath) r.file, err = os.Create(r.absFilePath)
if err != nil { if err != nil {
logger.Errorf("Create file %s error: %s\n", r.absFilePath, err) logger.Errorf("Create file %s error: %s\n", r.absFilePath, err)
...@@ -125,22 +141,25 @@ func (r *ReplyRecorder) Start() { ...@@ -125,22 +141,25 @@ func (r *ReplyRecorder) Start() {
func (r *ReplyRecorder) End() { func (r *ReplyRecorder) End() {
_ = r.file.Close() _ = r.file.Close()
go r.uploadReplay()
}
func (r *ReplyRecorder) uploadReplay() {
maxRetry := 3
if !common.FileExists(r.absFilePath) { if !common.FileExists(r.absFilePath) {
logger.Debug("Replay file not found, passed: ", r.absFilePath)
return return
} }
if stat, err := os.Stat(r.absGzFilePath); err == nil && stat.Size() == 0 { if stat, err := os.Stat(r.absFilePath); err == nil && stat.Size() == 0 {
logger.Debug("Replay file is empty, removed: ", r.absFilePath)
_ = os.Remove(r.absFilePath) _ = os.Remove(r.absFilePath)
return return
} }
go r.uploadReplay()
if !common.FileExists(r.absGzFilePath) { if !common.FileExists(r.absGzFilePath) {
logger.Debug("Compress replay file: ", r.absFilePath)
_ = common.GzipCompressFile(r.absFilePath, r.absGzFilePath) _ = common.GzipCompressFile(r.absFilePath, r.absGzFilePath)
_ = os.Remove(r.absFilePath) _ = os.Remove(r.absFilePath)
} }
}
func (r *ReplyRecorder) uploadReplay() {
maxRetry := 3
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
logger.Debug("Upload replay file: ", r.absGzFilePath) logger.Debug("Upload replay file: ", r.absGzFilePath)
......
...@@ -34,12 +34,11 @@ type SwitchSession struct { ...@@ -34,12 +34,11 @@ type SwitchSession struct {
replayRecorder *ReplyRecorder replayRecorder *ReplyRecorder
parser *Parser parser *Parser
cmdRecordChan chan [2]string userConn UserConnection
userConn UserConnection srvConn ServerConnection
srvConn ServerConnection userChan Transport
userChan Transport srvChan Transport
srvChan Transport cancelFunc context.CancelFunc
cancelFunc context.CancelFunc
} }
func (s *SwitchSession) Initial() { func (s *SwitchSession) Initial() {
...@@ -51,19 +50,10 @@ func (s *SwitchSession) Initial() { ...@@ -51,19 +50,10 @@ func (s *SwitchSession) Initial() {
s.RemoteAddr = s.userConn.RemoteAddr() s.RemoteAddr = s.userConn.RemoteAddr()
s.DateStart = time.Now() s.DateStart = time.Now()
s.cmdRecordChan = make(chan [2]string, 1024)
s.cmdRecorder = NewCommandRecorder(s) s.cmdRecorder = NewCommandRecorder(s)
s.replayRecorder = NewReplyRecord(s) s.replayRecorder = NewReplyRecord(s)
parser := &Parser{ s.parser = newParser()
userInputChan: make(chan []byte, 1024),
userOutputChan: make(chan []byte, 1024),
srvInputChan: make(chan []byte, 1024),
srvOutputChan: make(chan []byte, 1024),
cmdChan: s.cmdRecordChan,
}
parser.Initial()
s.parser = parser
} }
func (s *SwitchSession) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) { func (s *SwitchSession) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) {
......
package service package service
import ( import (
"encoding/json"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
...@@ -18,6 +19,8 @@ func Initial() { ...@@ -18,6 +19,8 @@ func Initial() {
keyPath := config.Conf.AccessKeyFile keyPath := config.Conf.AccessKeyFile
client.BaseHost = config.Conf.CoreHost client.BaseHost = config.Conf.CoreHost
authClient.BaseHost = config.Conf.CoreHost authClient.BaseHost = config.Conf.CoreHost
client.SetHeader("X-JMS-ORG", "ROOT")
authClient.SetHeader("X-JMS-ORG", "ROOT")
if !path.IsAbs(config.Conf.AccessKeyFile) { if !path.IsAbs(config.Conf.AccessKeyFile) {
keyPath = filepath.Join(config.Conf.RootPath, keyPath) keyPath = filepath.Join(config.Conf.RootPath, keyPath)
...@@ -26,6 +29,7 @@ func Initial() { ...@@ -26,6 +29,7 @@ func Initial() {
_ = ak.Load() _ = ak.Load()
authClient.Auth = ak authClient.Auth = ak
validateAccessAuth() validateAccessAuth()
MustLoadServerConfigOnce()
go KeepSyncConfigWithServer() go KeepSyncConfigWithServer()
} }
...@@ -52,22 +56,37 @@ func validateAccessAuth() { ...@@ -52,22 +56,37 @@ func validateAccessAuth() {
} }
func MustLoadServerConfigOnce() { func MustLoadServerConfigOnce() {
var data map[string]interface{}
err := authClient.Get(TerminalConfigURL, &data)
if err != nil {
logger.Error("Load config from server error: ", err)
return
}
data["TERMINAL_HOST_KEY"] = "Hidden"
msg, err := json.Marshal(data)
if err != nil {
logger.Error("Marsha server config error: %s", err)
return
}
logger.Debug("Load config from server: " + string(msg))
err = LoadConfigFromServer()
if err != nil {
logger.Error("Load config from server error: ", err)
}
} }
func LoadConfigFromServer(conf *config.Config) (err error) { func LoadConfigFromServer() (err error) {
conf := config.Conf
conf.Mux.Lock() conf.Mux.Lock()
defer conf.Mux.Unlock() defer conf.Mux.Unlock()
err = authClient.Get(TerminalConfigURL, conf) err = authClient.Get(TerminalConfigURL, conf)
if err != nil {
logger.Warn("Sync config with server error: ", err)
}
return err return err
} }
func KeepSyncConfigWithServer() { func KeepSyncConfigWithServer() {
for { for {
err := LoadConfigFromServer(config.Conf) logger.Debug("Sync config from server")
err := LoadConfigFromServer()
if err != nil { if err != nil {
logger.Warn("Sync config with server error: ", err) logger.Warn("Sync config with server error: ", err)
} }
......
...@@ -84,6 +84,5 @@ func PushSessionReplay(sessionID, gZipFile string) { ...@@ -84,6 +84,5 @@ func PushSessionReplay(sessionID, gZipFile string) {
} }
func PushSessionCommand(commands []*model.Command) (err error) { func PushSessionCommand(commands []*model.Command) (err error) {
fmt.Println("Commands: ", commands)
return return
} }
...@@ -41,6 +41,6 @@ func WrapperTitle(text string) string { ...@@ -41,6 +41,6 @@ func WrapperTitle(text string) string {
} }
func WrapperWarn(text string) string { func WrapperWarn(text string) string {
text += "\r\n\r\n" text += "\r\n"
return WrapperString(text, Red) return WrapperString(text, Red)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment