Commit 7c032cdf authored by Eric's avatar Eric

[Update] recorders(command and replay) upload

parent dd78d954
...@@ -154,7 +154,7 @@ ...@@ -154,7 +154,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:940277eb8ecf4938e0760fecd917bdc24d45d78c22b32b83267682c3409075d2" digest = "1:f8c5898dd07a4ac155a612d8f9479f0b504a278322396db2290708c2a096710f"
name = "golang.org/x/crypto" name = "golang.org/x/crypto"
packages = [ packages = [
"curve25519", "curve25519",
...@@ -164,7 +164,6 @@ ...@@ -164,7 +164,6 @@
"internal/subtle", "internal/subtle",
"poly1305", "poly1305",
"ssh", "ssh",
"ssh/terminal",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "a7099eef26a7fdc97f3ac5f5b2b61f9f136dd16f" revision = "a7099eef26a7fdc97f3ac5f5b2b61f9f136dd16f"
...@@ -172,12 +171,11 @@ ...@@ -172,12 +171,11 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:1a1855ef6bc1338dd3870260716214046cefd69855c5a5a772d44d2791478abc" digest = "1:00a3a12527dd7a3af0d24260fd14887ebd69aaf924303b6a4a67f35eb9b6c012"
name = "golang.org/x/sys" name = "golang.org/x/sys"
packages = [ packages = [
"cpu", "cpu",
"unix", "unix",
"windows",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "3a4b5fb9f71f5874b2374ae059bc0e0bcb52e145" revision = "3a4b5fb9f71f5874b2374ae059bc0e0bcb52e145"
...@@ -214,7 +212,6 @@ ...@@ -214,7 +212,6 @@
"github.com/sirupsen/logrus", "github.com/sirupsen/logrus",
"github.com/xlab/treeprint", "github.com/xlab/treeprint",
"golang.org/x/crypto/ssh", "golang.org/x/crypto/ssh",
"golang.org/x/crypto/ssh/terminal",
"gopkg.in/natefinch/lumberjack.v2", "gopkg.in/natefinch/lumberjack.v2",
"gopkg.in/yaml.v2", "gopkg.in/yaml.v2",
] ]
......
...@@ -7,8 +7,11 @@ import ( ...@@ -7,8 +7,11 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"mime/multipart"
"net/http" "net/http"
neturl "net/url" neturl "net/url"
"os"
"path/filepath"
"reflect" "reflect"
"strings" "strings"
"time" "time"
...@@ -224,3 +227,53 @@ func (c *Client) PostForm(url string, data interface{}, res interface{}) (err er ...@@ -224,3 +227,53 @@ func (c *Client) PostForm(url string, data interface{}, res interface{}) (err er
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
return nil return nil
} }
func (c *Client) UploadFile(url string, gFile string, res interface{}, params ...map[string]string) (err error) {
f, err := os.Open(gFile)
if err != nil {
return err
}
buf := new(bytes.Buffer)
bodyWriter := multipart.NewWriter(buf)
gName := filepath.Base(gFile)
part, err := bodyWriter.CreateFormFile("file", gName)
if err != nil {
return err
}
_, err = io.Copy(part, f)
err = bodyWriter.Close()
if err != nil {
return err
}
url = c.parseUrl(url, params)
req, err := http.NewRequest("POST", url, buf)
req.Header.Set("Content-Type", bodyWriter.FormDataContentType())
c.SetReqHeaders(req)
resp, err := c.http.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
msg := fmt.Sprintf("%s %s failed, get code: %d, %s", req.Method, req.URL, resp.StatusCode, string(body))
err = errors.New(msg)
return
}
// If is buffer return the raw response body
if buf, ok := res.(*bytes.Buffer); ok {
buf.Write(body)
return
}
// Unmarshal response body to result struct
if res != nil {
err = json.Unmarshal(body, res)
if err != nil {
msg := fmt.Sprintf("%s %s failed, unmarshal '%s' response failed: %s", req.Method, req.URL, body, err)
err = errors.New(msg)
return
}
}
return
}
...@@ -49,7 +49,8 @@ type Parser struct { ...@@ -49,7 +49,8 @@ type Parser struct {
inputState bool inputState bool
zmodemState string zmodemState string
inVimState bool inVimState bool
once sync.Once once *sync.Once
lock *sync.RWMutex
command string command string
output string output string
...@@ -65,7 +66,8 @@ func (p *Parser) initial() { ...@@ -65,7 +66,8 @@ func (p *Parser) initial() {
p.cmdBuf = new(bytes.Buffer) p.cmdBuf = new(bytes.Buffer)
p.outputBuf = new(bytes.Buffer) p.outputBuf = new(bytes.Buffer)
p.once = sync.Once{} p.once = new(sync.Once)
p.lock = new(sync.RWMutex)
p.cmdInputParser = NewCmdParser() p.cmdInputParser = NewCmdParser()
p.cmdOutputParser = NewCmdParser() p.cmdOutputParser = NewCmdParser()
...@@ -162,6 +164,8 @@ func (p *Parser) parseZmodemState(b []byte) { ...@@ -162,6 +164,8 @@ func (p *Parser) parseZmodemState(b []byte) {
if len(b) < 25 { if len(b) < 25 {
return return
} }
p.lock.Lock()
defer p.lock.Unlock()
if p.zmodemState == "" { if p.zmodemState == "" {
if len(b) > 50 && bytes.Contains(b[:50], zmodemRecvStartMark) { if len(b) > 50 && bytes.Contains(b[:50], zmodemRecvStartMark) {
p.zmodemState = zmodemStateRecv p.zmodemState = zmodemStateRecv
...@@ -227,6 +231,12 @@ func (p *Parser) IsCommandForbidden() bool { ...@@ -227,6 +231,12 @@ func (p *Parser) IsCommandForbidden() bool {
return false return false
} }
func (p *Parser) IsRecvState() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.zmodemState == zmodemStateRecv
}
func (p *Parser) Close() { func (p *Parser) Close() {
if p.closed { if p.closed {
return return
......
...@@ -140,10 +140,6 @@ func (p *ProxyServer) Proxy() { ...@@ -140,10 +140,6 @@ func (p *ProxyServer) Proxy() {
} }
sw := NewSwitchSession(p.UserConn, srvConn) sw := NewSwitchSession(p.UserConn, srvConn)
cmdRules, err := service.GetSystemUserFilterRules(p.SystemUser.Id) sw.SetFilterRules(p.SystemUser.Id)
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
sw.parser.SetCMDFilterRules(cmdRules)
_ = sw.Bridge() _ = sw.Bridge()
} }
...@@ -46,11 +46,31 @@ func (c *CommandRecorder) Record(command [2]string) { ...@@ -46,11 +46,31 @@ func (c *CommandRecorder) Record(command [2]string) {
if command[0] == "" && command[1] == "" { if command[0] == "" && command[1] == "" {
return return
} }
if command[0] == "" {
fmt.Println("command kong=======")
}
var input string
var output string
if len(command[0]) > 128 {
input = command[0][:128]
} else {
input = command[0]
}
i := strings.LastIndexByte(command[1], '\r')
if i > 1024 {
output = output[:1024]
} else if i > 0 {
output = command[1][:i]
} else {
output = command[1]
}
cmd := &model.Command{ cmd := &model.Command{
SessionId: c.Session.Id, SessionId: c.Session.Id,
OrgId: c.Session.Org, OrgId: c.Session.Org,
Input: command[0], Input: input,
Output: command[1], Output: output,
User: c.Session.User, User: c.Session.User,
Server: c.Session.Server, Server: c.Session.Server,
SystemUser: c.Session.SystemUser, SystemUser: c.Session.SystemUser,
...@@ -70,6 +90,7 @@ func (c *CommandRecorder) End() { ...@@ -70,6 +90,7 @@ func (c *CommandRecorder) End() {
func (c *CommandRecorder) record() { func (c *CommandRecorder) record() {
cmdList := make([]*model.Command, 0) cmdList := make([]*model.Command, 0)
maxRetry := 0
for { for {
select { select {
case <-c.closed: case <-c.closed:
...@@ -95,9 +116,11 @@ func (c *CommandRecorder) record() { ...@@ -95,9 +116,11 @@ func (c *CommandRecorder) record() {
cmdList = cmdList[:0] cmdList = cmdList[:0]
continue continue
} }
if len(cmdList) > 1024 {
if maxRetry > 5 {
cmdList = cmdList[1:] cmdList = cmdList[1:]
} }
maxRetry++
} }
} }
...@@ -115,11 +138,8 @@ type ReplyRecorder struct { ...@@ -115,11 +138,8 @@ type ReplyRecorder struct {
} }
func (r *ReplyRecorder) initial() { func (r *ReplyRecorder) initial() {
storage := NewReplayStorage() r.storage = NewReplayStorage()
backOffStorage := &ServerReplayStorage{} r.backOffStorage = defaultReplayStorage
r.storage = storage
r.backOffStorage = backOffStorage
r.prepare() r.prepare()
} }
...@@ -139,7 +159,7 @@ func (r *ReplyRecorder) prepare() { ...@@ -139,7 +159,7 @@ func (r *ReplyRecorder) prepare() {
replayDir := filepath.Join(rootPath, "data", "replays", today) replayDir := filepath.Join(rootPath, "data", "replays", today)
r.absFilePath = filepath.Join(replayDir, sessionId) r.absFilePath = filepath.Join(replayDir, sessionId)
r.absGzFilePath = filepath.Join(replayDir, today, gzFileName) r.absGzFilePath = filepath.Join(replayDir, gzFileName)
r.target = strings.Join([]string{today, gzFileName}, "/") r.target = strings.Join([]string{today, gzFileName}, "/")
r.timeStartNano = time.Now().UnixNano() r.timeStartNano = time.Now().UnixNano()
...@@ -164,7 +184,6 @@ func (r *ReplyRecorder) End() { ...@@ -164,7 +184,6 @@ func (r *ReplyRecorder) End() {
} }
func (r *ReplyRecorder) uploadReplay() { func (r *ReplyRecorder) uploadReplay() {
maxRetry := 3
if !common.FileExists(r.absFilePath) { if !common.FileExists(r.absFilePath) {
logger.Debug("Replay file not found, passed: ", r.absFilePath) logger.Debug("Replay file not found, passed: ", r.absFilePath)
return return
...@@ -179,7 +198,11 @@ func (r *ReplyRecorder) uploadReplay() { ...@@ -179,7 +198,11 @@ func (r *ReplyRecorder) uploadReplay() {
_ = common.GzipCompressFile(r.absFilePath, r.absGzFilePath) _ = common.GzipCompressFile(r.absFilePath, r.absGzFilePath)
_ = os.Remove(r.absFilePath) _ = os.Remove(r.absFilePath)
} }
r.uploadGzipFile(3)
}
func (r *ReplyRecorder) uploadGzipFile(maxRetry int) {
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
logger.Debug("Upload replay file: ", r.absGzFilePath) logger.Debug("Upload replay file: ", r.absGzFilePath)
err := r.storage.Upload(r.absGzFilePath, r.target) err := r.storage.Upload(r.absGzFilePath, r.target)
...@@ -189,9 +212,12 @@ func (r *ReplyRecorder) uploadReplay() { ...@@ -189,9 +212,12 @@ func (r *ReplyRecorder) uploadReplay() {
} }
// 如果还是失败,使用备用storage再传一次 // 如果还是失败,使用备用storage再传一次
if i == maxRetry { if i == maxRetry {
if r.storage == r.backOffStorage {
break
}
logger.Errorf("Using back off storage retry upload") logger.Errorf("Using back off storage retry upload")
r.storage = r.backOffStorage r.storage = r.backOffStorage
r.uploadReplay() r.uploadGzipFile(3)
break break
} }
} }
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"cocogo/pkg/service" "cocogo/pkg/service"
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings"
) )
type ReplayStorage interface { type ReplayStorage interface {
...@@ -16,6 +18,9 @@ type CommandStorage interface { ...@@ -16,6 +18,9 @@ type CommandStorage interface {
BulkSave(commands []*model.Command) error BulkSave(commands []*model.Command) error
} }
var defaultCommandStorage = &ServerCommandStorage{}
var defaultReplayStorage = &ServerReplayStorage{StorageType: "server"}
func NewReplayStorage() ReplayStorage { func NewReplayStorage() ReplayStorage {
cf := config.GetConf().ReplayStorage cf := config.GetConf().ReplayStorage
tp, ok := cf["TYPE"] tp, ok := cf["TYPE"]
...@@ -24,7 +29,7 @@ func NewReplayStorage() ReplayStorage { ...@@ -24,7 +29,7 @@ func NewReplayStorage() ReplayStorage {
} }
switch tp { switch tp {
default: default:
return &ServerReplayStorage{} return defaultReplayStorage
} }
} }
...@@ -36,7 +41,7 @@ func NewCommandStorage() CommandStorage { ...@@ -36,7 +41,7 @@ func NewCommandStorage() CommandStorage {
} }
switch tp { switch tp {
default: default:
return &ServerCommandStorage{} return defaultCommandStorage
} }
} }
...@@ -74,7 +79,6 @@ type ServerReplayStorage struct { ...@@ -74,7 +79,6 @@ type ServerReplayStorage struct {
} }
func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) { func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) {
//sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0] sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0]
//_ = client.PushSessionReplay(gZipFilePath, sessionID) return service.PushSessionReplay(sessionID, gZipFilePath)
return
} }
...@@ -6,8 +6,10 @@ import ( ...@@ -6,8 +6,10 @@ import (
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
"cocogo/pkg/config"
"cocogo/pkg/i18n" "cocogo/pkg/i18n"
"cocogo/pkg/logger" "cocogo/pkg/logger"
"cocogo/pkg/service"
"cocogo/pkg/utils" "cocogo/pkg/utils"
) )
...@@ -25,12 +27,14 @@ type SwitchSession struct { ...@@ -25,12 +27,14 @@ type SwitchSession struct {
Org string `json:"org_id"` Org string `json:"org_id"`
LoginFrom string `json:"login_from"` LoginFrom string `json:"login_from"`
RemoteAddr string `json:"remote_addr"` RemoteAddr string `json:"remote_addr"`
DateStart time.Time `json:"date_start"` DateStart string `json:"date_start"`
DateEnd time.Time `json:"date_end"` DateEnd string `json:"date_end"`
DateActive time.Time `json:"date_last_active"` DateActive time.Time `json:"date_last_active"`
Finished bool `json:"is_finished"` Finished bool `json:"is_finished"`
Closed bool Closed bool
MaxIdleTime int
cmdRecorder *CommandRecorder cmdRecorder *CommandRecorder
replayRecorder *ReplyRecorder replayRecorder *ReplyRecorder
parser *Parser parser *Parser
...@@ -51,8 +55,8 @@ func (s *SwitchSession) Initial() { ...@@ -51,8 +55,8 @@ func (s *SwitchSession) Initial() {
s.SystemUser = s.srvConn.User() s.SystemUser = s.srvConn.User()
s.LoginFrom = s.userConn.LoginFrom() s.LoginFrom = s.userConn.LoginFrom()
s.RemoteAddr = s.userConn.RemoteAddr() s.RemoteAddr = s.userConn.RemoteAddr()
s.DateStart = time.Now().UTC() s.DateStart = time.Now().UTC().Format("2006-01-02 15:04:05 +0000")
s.MaxIdleTime = config.GetConf().MaxIdleTime
s.cmdRecorder = NewCommandRecorder(s) s.cmdRecorder = NewCommandRecorder(s)
s.replayRecorder = NewReplyRecord(s) s.replayRecorder = NewReplyRecord(s)
...@@ -76,17 +80,68 @@ func (s *SwitchSession) recordCmd() { ...@@ -76,17 +80,68 @@ func (s *SwitchSession) recordCmd() {
} }
} }
func (s *SwitchSession) MapData() map[string]interface{} {
var dataEnd interface{}
if s.DateEnd == "" {
dataEnd = nil
} else {
dataEnd = s.DateEnd
}
return map[string]interface{}{
"id": s.Id,
"user": s.User,
"asset": s.Server,
"org_id": s.Org,
"login_from": s.LoginFrom,
"system_user": s.SystemUser,
"remote_addr": s.RemoteAddr,
"is_finished": s.Finished,
"date_start": s.DateStart,
"date_end": dataEnd,
}
}
func (s *SwitchSession) postBridge() { func (s *SwitchSession) postBridge() {
s.DateEnd = time.Now().UTC()
_ = s.userTran.Close() _ = s.userTran.Close()
_ = s.srvTran.Close() _ = s.srvTran.Close()
s.parser.Close() s.parser.Close()
s.replayRecorder.End() s.replayRecorder.End()
s.cmdRecorder.End() s.cmdRecorder.End()
s.finishSession()
}
func (s *SwitchSession) finishSession() {
s.DateEnd = time.Now().UTC().Format("2006-01-02 15:04:05 +0000")
service.FinishSession(s.MapData())
service.FinishReply(s.Id)
logger.Debugf("finish Session: %s", s.Id)
}
func (s *SwitchSession) creatSession() bool {
for i := 0; i < 5; i++ {
if service.CreateSession(s.MapData()) {
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}
func (s *SwitchSession) SetFilterRules(systemUserId string) {
cmdRules, err := service.GetSystemUserFilterRules(systemUserId)
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
s.parser.SetCMDFilterRules(cmdRules)
} }
func (s *SwitchSession) Bridge() (err error) { func (s *SwitchSession) Bridge() (err error) {
if !s.creatSession() {
msg := i18n.T("Connect with api server failed")
msg = utils.WrapperWarn(msg)
utils.IgnoreErrWriteString(s.userConn, msg)
return
}
winCh := s.userConn.WinCh() winCh := s.userConn.WinCh()
s.userTran = NewDirectTransport("", s.userConn) s.userTran = NewDirectTransport("", s.userConn)
s.srvTran = NewDirectTransport("", s.srvConn) s.srvTran = NewDirectTransport("", s.srvConn)
...@@ -100,6 +155,9 @@ func (s *SwitchSession) Bridge() (err error) { ...@@ -100,6 +155,9 @@ func (s *SwitchSession) Bridge() (err error) {
defer s.postBridge() defer s.postBridge()
for { for {
select { select {
// 检测是否超过最大空闲时间
case <-time.After(time.Duration(s.MaxIdleTime) * time.Minute):
return
// 手动结束 // 手动结束
case <-s.ctx.Done(): case <-s.ctx.Done():
return return
...@@ -115,16 +173,25 @@ func (s *SwitchSession) Bridge() (err error) { ...@@ -115,16 +173,25 @@ func (s *SwitchSession) Bridge() (err error) {
s.parser.srvInputChan <- p s.parser.srvInputChan <- p
// Server流入parser数据,经处理发给用户 // Server流入parser数据,经处理发给用户
case p := <-s.parser.srvOutputChan: case p := <-s.parser.srvOutputChan:
_, _ = s.userTran.Write(p) nw, err := s.userTran.Write(p)
if !s.parser.IsRecvState() {
s.replayRecorder.Record(p[:nw])
}
if err != nil {
return err
}
// User发来的数据流流入parser // User发来的数据流流入parser
case p, ok := <-s.userTran.Chan(): case p, ok := <-s.userTran.Chan():
if !ok { if !ok {
return return
} }
s.parser.userInputChan <- p s.parser.userInputChan <- p
// User发来的数据经parser初六,发给Server // User发来的数据经parser处理,发给Server
case p := <-s.parser.userOutputChan: case p := <-s.parser.userOutputChan:
_, _ = s.srvTran.Write(p) _, err = s.srvTran.Write(p)
if err != nil {
return err
}
} }
} }
} }
...@@ -38,7 +38,7 @@ func (uc *UserSSHConnection) WinCh() (winch <-chan ssh.Window) { ...@@ -38,7 +38,7 @@ func (uc *UserSSHConnection) WinCh() (winch <-chan ssh.Window) {
} }
func (uc *UserSSHConnection) LoginFrom() string { func (uc *UserSSHConnection) LoginFrom() string {
return "T" return "ST"
} }
func (uc *UserSSHConnection) RemoteAddr() string { func (uc *UserSSHConnection) RemoteAddr() string {
......
...@@ -42,17 +42,21 @@ func CreateSession(data map[string]interface{}) bool { ...@@ -42,17 +42,21 @@ func CreateSession(data map[string]interface{}) bool {
return false return false
} }
func FinishSession(sid, dataEnd string) { func FinishSession(data map[string]interface{}) {
var res map[string]interface{} var res map[string]interface{}
data := map[string]interface{}{ if sid, ok := data["id"]; ok {
"is_finished": true, playborad := map[string]interface{}{
"date_end": dataEnd, "is_finished": true,
} "date_end": data["date_end"],
Url := fmt.Sprintf(SessionDetailURL, sid) }
err := authClient.Patch(Url, data, &res) Url := fmt.Sprintf(SessionDetailURL, sid)
if err != nil { err := authClient.Patch(Url, playborad, &res)
logger.Error(err) if err != nil {
logger.Error(err)
}
} }
} }
func FinishReply(sid string) bool { func FinishReply(sid string) bool {
...@@ -79,8 +83,14 @@ func FinishTask(tid string) bool { ...@@ -79,8 +83,14 @@ func FinishTask(tid string) bool {
return true return true
} }
func PushSessionReplay(sessionID, gZipFile string) { func PushSessionReplay(sessionID, gZipFile string) (err error) {
var res map[string]interface{}
Url := fmt.Sprintf(SessionReplayURL, sessionID)
err = authClient.UploadFile(Url, gZipFile, &res)
if err != nil {
logger.Error(err)
}
return
} }
func PushSessionCommand(commands []*model.Command) (err error) { func PushSessionCommand(commands []*model.Command) (err error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment