Commit 6607637f authored by ibuler's avatar ibuler

[Update] 修改recorder

parent 6cbe3f88
......@@ -104,6 +104,14 @@
revision = "3ee7d812e62a0804a7d0a324e0249ca2db3476d3"
version = "v0.0.4"
[[projects]]
digest = "1:4a49346ca45376a2bba679ca0e83bec949d780d4e927931317904bad482943ec"
name = "github.com/mattn/go-sqlite3"
packages = ["."]
pruneopts = "UT"
revision = "c7c4067b79cc51e6dfdcef5c702e74b1e0fa7c75"
version = "v1.10.0"
[[projects]]
digest = "1:abcdbf03ca6ca13d3697e2186edc1f33863bbdac2b3a44dfa39015e8903f7409"
name = "github.com/olekukonko/tablewriter"
......@@ -207,6 +215,7 @@
"github.com/jarcoal/httpmock",
"github.com/kr/pty",
"github.com/leonelquinteros/gotext",
"github.com/mattn/go-sqlite3",
"github.com/olekukonko/tablewriter",
"github.com/pkg/errors",
"github.com/pkg/sftp",
......
......@@ -74,3 +74,7 @@
[[constraint]]
name = "gopkg.in/natefinch/lumberjack.v2"
version = "2.1.0"
[[constraint]]
name = "github.com/mattn/go-sqlite3"
version = "1.10.0"
......@@ -8,6 +8,7 @@ import (
"cocogo/pkg/cctx"
"cocogo/pkg/common"
"cocogo/pkg/config"
"cocogo/pkg/i18n"
"cocogo/pkg/logger"
"cocogo/pkg/service"
......@@ -56,10 +57,16 @@ func checkAuth(ctx ssh.Context, password, publicKey string) (res ssh.AuthResult)
}
func CheckUserPassword(ctx ssh.Context, password string) ssh.AuthResult {
if !config.Conf.PasswordAuth {
return ssh.AuthFailed
}
return checkAuth(ctx, password, "")
}
func CheckUserPublicKey(ctx ssh.Context, key ssh.PublicKey) ssh.AuthResult {
if !config.Conf.PublicKeyAuth {
return ssh.AuthFailed
}
b := key.Marshal()
publicKey := common.Base64Encode(string(b))
return checkAuth(ctx, "", publicKey)
......@@ -101,6 +108,6 @@ func CheckMFA(ctx ssh.Context, challenger gossh.KeyboardInteractiveChallenge) (r
return
}
func CheckUserNeedMFA(ctx ssh.Context) (methods []string) {
func MFAAuthMethods(ctx ssh.Context) (methods []string) {
return []string{"keyboard-interactive"}
}
package common
import "os"
import (
"compress/gzip"
"io"
"os"
"time"
)
func FileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
......@@ -10,3 +15,32 @@ func FileExists(name string) bool {
}
return true
}
func EnsureDirExist(name string) error {
if !FileExists(name) {
return os.MkdirAll(name, os.ModePerm)
}
return nil
}
func GzipCompressFile(srcPath, dstPath string) error {
sf, err := os.Open(srcPath)
if err != nil {
return err
}
df, err := os.Create(dstPath)
if err != nil {
return err
}
writer := gzip.NewWriter(df)
writer.Name = dstPath
writer.ModTime = time.Now().UTC()
_, err = io.Copy(writer, sf)
if err != nil {
return err
}
if err := writer.Close(); err != nil {
return err
}
return nil
}
package model
package model
type Command struct {
SessionId string `json:"session"`
OrgId string `json:"org_id"`
Input string `json:"input"`
Output string `json:"output"`
User string `json:"user"`
Server string `json:"asset"`
SystemUser string `json:"system_user"`
Timestamp int64 `json:"timestamp"`
}
......@@ -2,11 +2,12 @@ package proxy
import (
"bytes"
"fmt"
"sync"
"cocogo/pkg/logger"
"cocogo/pkg/model"
"cocogo/pkg/utils"
"fmt"
"sync"
)
var (
......@@ -35,7 +36,7 @@ type Parser struct {
srvInputChan chan []byte
srvOutputChan chan []byte
cmdCh chan *[2]string
cmdChan chan [2]string
inputInitial bool
inputPreState bool
......@@ -48,9 +49,9 @@ type Parser struct {
output string
cmdInputParser *CmdParser
cmdOutputParser *CmdParser
counter int
cmdFilterRules []model.SystemUserFilterRule
closed bool
}
func (p *Parser) Initial() {
......@@ -68,22 +69,22 @@ func (p *Parser) Initial() {
func (p *Parser) Parse() {
defer func() {
fmt.Println("Parse done")
logger.Debug("Parser parse routine done")
}()
for {
select {
case ub, ok := <-p.userInputChan:
case b, ok := <-p.userInputChan:
if !ok {
return
}
b := p.ParseUserInput(ub)
b = p.ParseUserInput(b)
p.userOutputChan <- b
case sb, ok := <-p.srvInputChan:
case b, ok := <-p.srvInputChan:
if !ok {
return
}
b := p.ParseServerOutput(sb)
b = p.ParseServerOutput(b)
p.srvOutputChan <- b
}
}
......@@ -110,6 +111,7 @@ func (p *Parser) parseInputState(b []byte) []byte {
// 用户又开始输入,并上次不处于输入状态,开始结算上次命令的结果
if !p.inputPreState {
p.parseCmdOutput()
p.cmdChan <- [2]string{p.command, p.output}
}
}
return b
......@@ -118,7 +120,6 @@ func (p *Parser) parseInputState(b []byte) []byte {
func (p *Parser) parseCmdInput() {
data := p.cmdBuf.Bytes()
p.command = p.cmdInputParser.Parse(data)
fmt.Println("parse Command is ", p.command)
p.cmdBuf.Reset()
p.inputBuf.Reset()
}
......@@ -186,7 +187,6 @@ func (p *Parser) splitCmdStream(b []byte) {
if p.zmodemState != "" || p.inVimState || !p.inputInitial {
return
}
fmt.Println("Input state: ", p.inputState)
if p.inputState {
p.cmdBuf.Write(b)
} else {
......@@ -210,3 +210,15 @@ func (p *Parser) IsCommandForbidden() bool {
}
return false
}
func (p *Parser) Close() {
if p.closed {
return
}
close(p.userInputChan)
close(p.userOutputChan)
close(p.srvInputChan)
close(p.srvOutputChan)
close(p.cmdChan)
p.closed = true
}
package proxy
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"cocogo/pkg/common"
"cocogo/pkg/logger"
"os"
"path/filepath"
"strings"
"time"
"cocogo/pkg/config"
"cocogo/pkg/logger"
"cocogo/pkg/model"
)
var conf = config.Conf
type CommandRecorder struct {
Session *SwitchSession
storage CommandStorage
queue chan *model.Command
}
func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) {
return &CommandRecorder{Session: sess}
storage := NewCommandStorage()
recorder = &CommandRecorder{Session: sess, queue: make(chan *model.Command, 10), storage: storage}
go recorder.record()
return recorder
}
type Command struct {
SessionId string `json:"session"`
OrgId string `json:"org_id"`
Input string `json:"input"`
Output string `json:"output"`
User string `json:"user"`
Server string `json:"asset"`
SystemUser string `json:"system_user"`
Timestamp time.Time `json:"timestamp"`
func NewReplyRecord(sess *SwitchSession) *ReplyRecorder {
storage := NewReplayStorage()
srvStorage := &ServerReplayStorage{}
return &ReplyRecorder{SessionID: sess.Id, storage: storage, backOffStorage: srvStorage}
}
func (c *CommandRecorder) Record(cmd *Command) {
data, err := json.MarshalIndent(cmd, "", " ")
if err != nil {
logger.Error("Marshal command error: ", err)
func (c *CommandRecorder) Record(command [2]string) {
if command[0] == "" && command[1] == "" {
return
}
fmt.Printf("Record cmd: %s\n", data)
cmd := &model.Command{
SessionId: c.Session.Id,
OrgId: c.Session.Org,
Input: command[0],
Output: command[1],
User: c.Session.User,
Server: c.Session.Server,
SystemUser: c.Session.SystemUser,
Timestamp: time.Now().Unix(),
}
c.queue <- cmd
}
var conf = config.Conf
func (c *CommandRecorder) Start() {
}
func NewReplyRecord(sessionID string) *ReplyRecorder {
rootPath := conf.RootPath
currentData := time.Now().UTC().Format("2006-01-02")
gzFileName := sessionID + ".replay.gz"
absFilePath := filepath.Join(rootPath, "data", "replays", currentData, sessionID)
absGzFilePath := filepath.Join(rootPath, "data", "replays", currentData, gzFileName)
target := strings.Join([]string{currentData, gzFileName}, "/")
return &ReplyRecorder{
SessionID: sessionID,
FileName: sessionID,
absFilePath: absFilePath,
gzFileName: gzFileName,
absGzFilePath: absGzFilePath,
StartTime: time.Now().UTC(),
target: target,
func (c *CommandRecorder) End() {
close(c.queue)
}
func (c *CommandRecorder) record() {
cmdList := make([]*model.Command, 0)
for {
select {
case p := <-c.queue:
cmdList = append(cmdList, p)
if len(cmdList) < 5 {
continue
}
case <-time.After(time.Second * 5):
if len(cmdList) == 0 {
continue
}
}
err := c.storage.BulkSave(cmdList)
if err == nil {
cmdList = cmdList[:0]
continue
}
if len(cmdList) > 10 {
cmdList = cmdList[1:]
}
}
}
type ReplyRecorder struct {
SessionID string
FileName string
gzFileName string
SessionID string
absFilePath string
absGzFilePath string
target string
WriteF *os.File
file *os.File
StartTime time.Time
storage ReplayStorage
backOffStorage ReplayStorage
}
func (r *ReplyRecorder) Record(b []byte) {
interval := time.Now().UTC().Sub(r.StartTime).Seconds()
data, _ := json.Marshal(string(b))
_, _ = r.WriteF.WriteString(fmt.Sprintf("\"%0.6f\":%s,", interval, data))
}
func (r *ReplyRecorder) Start() {
//auth.MakeSureDirExit(r.absFilePath)
//r.WriteF, _ = os.Create(r.absFilePath)
//_, _ = r.WriteF.Write([]byte("{"))
}
rootPath := conf.RootPath
today := time.Now().UTC().Format("2006-01-02")
gzFileName := r.SessionID + ".replay.gz"
replayDir := filepath.Join(rootPath, "data", "replays", today)
func (r *ReplyRecorder) End(ctx context.Context) {
select {
case <-ctx.Done():
_, _ = r.WriteF.WriteString(`"0":""}`)
_ = r.WriteF.Close()
}
r.uploadReplay()
}
r.absFilePath = filepath.Join(replayDir, r.SessionID)
r.absGzFilePath = filepath.Join(replayDir, today, gzFileName)
r.target = strings.Join([]string{today, gzFileName}, "/")
func (r *ReplyRecorder) uploadReplay() {
_ = GzipCompressFile(r.absFilePath, r.absGzFilePath)
if store := NewStorageServer(); store != nil {
store.Upload(r.absGzFilePath, r.target)
err := common.EnsureDirExist(replayDir)
if err != nil {
logger.Errorf("Create dir %s error: %s\n", replayDir, err)
return
}
_ = os.Remove(r.absFilePath)
_ = os.Remove(r.absGzFilePath)
}
func GzipCompressFile(srcPath, dstPath string) error {
srcf, err := os.Open(srcPath)
r.file, err = os.Create(r.absFilePath)
if err != nil {
return err
logger.Errorf("Create file %s error: %s\n", r.absFilePath, err)
}
dstf, err := os.Create(dstPath)
if err != nil {
return err
_, _ = r.file.Write([]byte("{"))
}
func (r *ReplyRecorder) End() {
_ = r.file.Close()
if !common.FileExists(r.absFilePath) {
return
}
zw := gzip.NewWriter(dstf)
zw.Name = dstPath
zw.ModTime = time.Now().UTC()
_, err = io.Copy(zw, srcf)
if err != nil {
return err
if stat, err := os.Stat(r.absGzFilePath); err == nil && stat.Size() == 0 {
_ = os.Remove(r.absFilePath)
return
}
if err := zw.Close(); err != nil {
return err
go r.uploadReplay()
if !common.FileExists(r.absGzFilePath) {
_ = common.GzipCompressFile(r.absFilePath, r.absGzFilePath)
_ = os.Remove(r.absFilePath)
}
}
return nil
func (r *ReplyRecorder) uploadReplay() {
maxRetry := 3
for i := 0; i <= maxRetry; i++ {
logger.Debug("Upload replay file: ", r.absGzFilePath)
err := r.storage.Upload(r.absGzFilePath, r.target)
if err == nil {
_ = os.Remove(r.absGzFilePath)
break
}
// 如果还是失败,使用备用storage再传一次
if i == maxRetry {
logger.Errorf("Using back off storage retry upload")
r.storage = r.backOffStorage
r.uploadReplay()
break
}
}
}
package proxy
import (
"cocogo/pkg/config"
"cocogo/pkg/model"
"cocogo/pkg/service"
)
type ReplayStorage interface {
Upload(gZipFile, target string)
Upload(gZipFile, target string) error
}
type CommandStorage interface {
BulkSave(commands []*model.Command) error
}
func NewReplayStorage() ReplayStorage {
cf := config.Conf.ReplayStorage
tp, ok := cf["TYPE"]
if !ok {
tp = "server"
}
switch tp {
default:
return &ServerReplayStorage{}
}
}
func NewCommandStorage() CommandStorage {
cf := config.Conf.CommandStorage
tp, ok := cf["TYPE"]
if !ok {
tp = "server"
}
switch tp {
default:
return &ServerCommandStorage{}
}
}
func NewStorageServer() ReplayStorage {
return nil
type ServerCommandStorage struct {
}
func NewJmsStorage() ReplayStorage {
//appService := auth.GetGlobalService()
//return &Server{
// StorageType: "jms",
// service: appService,
//}
return &Server{}
func (s *ServerCommandStorage) BulkSave(commands []*model.Command) (err error) {
return service.PushSessionCommand(commands)
}
type Server struct {
type ServerReplayStorage struct {
StorageType string
}
func (s *Server) Upload(gZipFilePath, target string) {
func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) {
//sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0]
//_ = client.PushSessionReplay(gZipFilePath, sessionID)
return
}
package proxy
import (
"cocogo/pkg/model"
"context"
"time"
......@@ -12,14 +11,8 @@ import (
)
func NewSwitchSession(userConn UserConnection, serverConn ServerConnection) (sw *SwitchSession) {
parser := &Parser{
userInputChan: make(chan []byte, 1024),
userOutputChan: make(chan []byte, 1024),
srvInputChan: make(chan []byte, 1024),
srvOutputChan: make(chan []byte, 1024),
}
parser.Initial()
sw = &SwitchSession{userConn: userConn, serverConn: serverConn, parser: parser}
sw = &SwitchSession{userConn: userConn, srvConn: serverConn}
sw.Initial()
return sw
}
......@@ -37,37 +30,40 @@ type SwitchSession struct {
Finished bool `json:"is_finished"`
Closed bool
srvChan chan []byte
userChan chan []byte
cmdFilterRules []model.SystemUserFilterRule
cmdRecorder *CommandRecorder
replayRecorder *ReplayStorage
replayRecorder *ReplyRecorder
parser *Parser
userConn UserConnection
serverConn ServerConnection
userTran Transport
serverTran Transport
cancelFunc context.CancelFunc
cmdRecordChan chan [2]string
userConn UserConnection
srvConn ServerConnection
userChan Transport
srvChan Transport
cancelFunc context.CancelFunc
}
func (s *SwitchSession) Initial() {
s.Id = uuid.NewV4().String()
s.User = s.userConn.User()
s.Server = s.serverConn.Name()
s.SystemUser = s.serverConn.User()
s.Server = s.srvConn.Name()
s.SystemUser = s.srvConn.User()
s.LoginFrom = s.userConn.LoginFrom()
s.RemoteAddr = s.userConn.RemoteAddr()
s.DateStart = time.Now()
}
func (s *SwitchSession) preBridge() {
}
func (s *SwitchSession) postBridge() {
s.cmdRecordChan = make(chan [2]string, 1024)
s.cmdRecorder = NewCommandRecorder(s)
s.replayRecorder = NewReplyRecord(s)
parser := &Parser{
userInputChan: make(chan []byte, 1024),
userOutputChan: make(chan []byte, 1024),
srvInputChan: make(chan []byte, 1024),
srvOutputChan: make(chan []byte, 1024),
cmdChan: s.cmdRecordChan,
}
parser.Initial()
s.parser = parser
}
func (s *SwitchSession) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) {
......@@ -82,7 +78,7 @@ func (s *SwitchSession) watchWindowChange(ctx context.Context, winCh <-chan ssh.
if !ok {
return
}
err := s.serverConn.SetWinSize(win.Height, win.Width)
err := s.srvConn.SetWinSize(win.Height, win.Width)
if err != nil {
logger.Error("Change server win size err: ", err)
return
......@@ -92,63 +88,104 @@ func (s *SwitchSession) watchWindowChange(ctx context.Context, winCh <-chan ssh.
}
}
func (s *SwitchSession) readUserToServer(ctx context.Context) {
func (s *SwitchSession) readParserToServer(ctx context.Context) {
defer func() {
logger.Debug("Read user to server end")
logger.Debug("Read parser to server end")
}()
for {
select {
case <-ctx.Done():
_ = s.userTran.Close()
return
case p, ok := <-s.userTran.Chan():
case p, ok := <-s.parser.userOutputChan:
if !ok {
s.cancelFunc()
}
s.parser.userInputChan <- p
case p, ok := <-s.parser.userOutputChan:
_, _ = s.srvChan.Write(p)
}
}
}
func (s *SwitchSession) readUserToParser(ctx context.Context) {
defer func() {
logger.Debug("Read user to server end")
}()
for {
select {
case <-ctx.Done():
_ = s.userChan.Close()
return
case p, ok := <-s.userChan.Chan():
if !ok {
s.cancelFunc()
}
_, _ = s.serverTran.Write(p)
s.parser.userInputChan <- p
}
}
}
func (s *SwitchSession) readServerToUser(ctx context.Context) {
func (s *SwitchSession) readServerToParser(ctx context.Context) {
defer func() {
logger.Debug("Read server to user end")
logger.Debug("Read server to parser end")
}()
for {
select {
case <-ctx.Done():
_ = s.serverTran.Close()
_ = s.srvChan.Close()
return
case p, ok := <-s.serverTran.Chan():
case p, ok := <-s.srvChan.Chan():
if !ok {
s.cancelFunc()
}
s.parser.srvInputChan <- p
}
}
}
func (s *SwitchSession) readParserToUser(ctx context.Context) {
defer func() {
logger.Debug("Read parser to user end")
}()
for {
select {
case <-ctx.Done():
return
case p, ok := <-s.parser.srvOutputChan:
if !ok {
s.cancelFunc()
}
_, _ = s.userConn.Write(p)
s.replayRecorder.Record(p)
_, _ = s.userChan.Write(p)
}
}
}
func (s *SwitchSession) recordCmd() {
for cmd := range s.cmdRecordChan {
s.cmdRecorder.Record(cmd)
}
}
func (s *SwitchSession) postBridge() {
s.cmdRecorder.End()
s.replayRecorder.End()
s.parser.Close()
}
func (s *SwitchSession) Bridge() (err error) {
winCh := s.userConn.WinCh()
ctx, cancel := context.WithCancel(context.Background())
s.cancelFunc = cancel
s.userTran = NewDirectTransport("", s.userConn)
s.serverTran = NewDirectTransport("", s.serverConn)
s.userChan = NewDirectTransport("", s.userConn)
s.srvChan = NewDirectTransport("", s.srvConn)
go s.parser.Parse()
go s.watchWindowChange(ctx, winCh)
go s.readServerToUser(ctx)
s.readUserToServer(ctx)
go s.readServerToParser(ctx)
go s.readParserToUser(ctx)
go s.readParserToServer(ctx)
go s.recordCmd()
defer s.postBridge()
s.readUserToParser(ctx)
logger.Debug("Session bridge end")
return
}
......@@ -82,3 +82,8 @@ func FinishTask(tid string) bool {
func PushSessionReplay(sessionID, gZipFile string) {
}
func PushSessionCommand(commands []*model.Command) (err error) {
fmt.Println("Commands: ", commands)
return
}
......@@ -25,21 +25,14 @@ func StartServer() {
srv := ssh.Server{
Addr: conf.BindHost + ":" + strconv.Itoa(conf.SSHPort),
KeyboardInteractiveHandler: auth.CheckMFA,
NextAuthMethodsHandler: auth.CheckUserNeedMFA,
PasswordHandler: auth.CheckUserPassword,
PublicKeyHandler: auth.CheckUserPublicKey,
NextAuthMethodsHandler: auth.MFAAuthMethods,
HostSigners: []ssh.Signer{signer},
Handler: handler.SessionHandler,
SubsystemHandlers: map[string]ssh.SubsystemHandler{},
}
// Set Auth Handler
if conf.PasswordAuth {
srv.PasswordHandler = auth.CheckUserPassword
}
if conf.PublicKeyAuth {
srv.PublicKeyHandler = auth.CheckUserPublicKey
}
if !conf.PasswordAuth && !conf.PublicKeyAuth {
srv.PasswordHandler = auth.CheckUserPassword
}
srv.SetSubsystemHandler("sftp", handler.SftpHandler)
logger.Fatal(srv.ListenAndServe())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment