Commit 3f681921 authored by Eric's avatar Eric

[Update] session结构调整

parent 7c032cdf
...@@ -138,8 +138,42 @@ func (p *ProxyServer) Proxy() { ...@@ -138,8 +138,42 @@ func (p *ProxyServer) Proxy() {
logger.Errorf(msg) logger.Errorf(msg)
return return
} }
sw := NewSwitchSession(p)
ok := p.createSession(sw)
if !ok {
msg := i18n.T("Connect with api server failed")
msg = utils.WrapperWarn(msg)
utils.IgnoreErrWriteString(p.UserConn, msg)
return
}
cmdRules := p.GetFilterRules()
sw.SetFilterRules(cmdRules)
_ = sw.Bridge(p.UserConn, srvConn)
p.finishSession(sw)
}
sw := NewSwitchSession(p.UserConn, srvConn) func (p *ProxyServer) createSession(s *SwitchSession) bool {
sw.SetFilterRules(p.SystemUser.Id) data := s.MapData()
_ = sw.Bridge() for i := 0; i < 5; i++ {
if service.CreateSession(data) {
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}
func (p *ProxyServer) finishSession(s *SwitchSession) {
data := s.MapData()
service.FinishSession(data)
service.FinishReply(s.Id)
logger.Debugf("finish Session: %s", s.Id)
}
func (p *ProxyServer) GetFilterRules() []model.SystemUserFilterRule {
cmdRules, err := service.GetSystemUserFilterRules(p.SystemUser.Id)
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
return cmdRules
} }
...@@ -15,19 +15,19 @@ import ( ...@@ -15,19 +15,19 @@ import (
) )
func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) { func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) {
recorder = &CommandRecorder{Session: sess} recorder = &CommandRecorder{sessionID: sess.Id}
recorder.initial() recorder.initial()
return recorder return recorder
} }
func NewReplyRecord(sess *SwitchSession) (recorder *ReplyRecorder) { func NewReplyRecord(sess *SwitchSession) (recorder *ReplyRecorder) {
recorder = &ReplyRecorder{Session: sess} recorder = &ReplyRecorder{sessionID: sess.Id}
recorder.initial() recorder.initial()
return recorder return recorder
} }
type CommandRecorder struct { type CommandRecorder struct {
Session *SwitchSession sessionID string
storage CommandStorage storage CommandStorage
queue chan *model.Command queue chan *model.Command
...@@ -42,41 +42,8 @@ func (c *CommandRecorder) initial() { ...@@ -42,41 +42,8 @@ func (c *CommandRecorder) initial() {
go c.record() go c.record()
} }
func (c *CommandRecorder) Record(command [2]string) { func (c *CommandRecorder) Record(command *model.Command) {
if command[0] == "" && command[1] == "" { c.queue <- command
return
}
if command[0] == "" {
fmt.Println("command kong=======")
}
var input string
var output string
if len(command[0]) > 128 {
input = command[0][:128]
} else {
input = command[0]
}
i := strings.LastIndexByte(command[1], '\r')
if i > 1024 {
output = output[:1024]
} else if i > 0 {
output = command[1][:i]
} else {
output = command[1]
}
cmd := &model.Command{
SessionId: c.Session.Id,
OrgId: c.Session.Org,
Input: input,
Output: output,
User: c.Session.User,
Server: c.Session.Server,
SystemUser: c.Session.SystemUser,
Timestamp: time.Now().Unix(),
}
c.queue <- cmd
} }
func (c *CommandRecorder) End() { func (c *CommandRecorder) End() {
...@@ -99,7 +66,7 @@ func (c *CommandRecorder) record() { ...@@ -99,7 +66,7 @@ func (c *CommandRecorder) record() {
} }
case p, ok := <-c.queue: case p, ok := <-c.queue:
if !ok { if !ok {
logger.Debug("Session command recorder close: ", c.Session.Id) logger.Debug("Session command recorder close: ", c.sessionID)
return return
} }
cmdList = append(cmdList, p) cmdList = append(cmdList, p)
...@@ -125,7 +92,7 @@ func (c *CommandRecorder) record() { ...@@ -125,7 +92,7 @@ func (c *CommandRecorder) record() {
} }
type ReplyRecorder struct { type ReplyRecorder struct {
Session *SwitchSession sessionID string
absFilePath string absFilePath string
absGzFilePath string absGzFilePath string
...@@ -152,7 +119,7 @@ func (r *ReplyRecorder) Record(b []byte) { ...@@ -152,7 +119,7 @@ func (r *ReplyRecorder) Record(b []byte) {
} }
func (r *ReplyRecorder) prepare() { func (r *ReplyRecorder) prepare() {
sessionId := r.Session.Id sessionId := r.sessionID
rootPath := config.GetConf().RootPath rootPath := config.GetConf().RootPath
today := time.Now().UTC().Format("2006-01-02") today := time.Now().UTC().Format("2006-01-02")
gzFileName := sessionId + ".replay.gz" gzFileName := sessionId + ".replay.gz"
......
...@@ -2,6 +2,8 @@ package proxy ...@@ -2,6 +2,8 @@ package proxy
import ( import (
"context" "context"
"fmt"
"strings"
"time" "time"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
...@@ -9,29 +11,24 @@ import ( ...@@ -9,29 +11,24 @@ import (
"cocogo/pkg/config" "cocogo/pkg/config"
"cocogo/pkg/i18n" "cocogo/pkg/i18n"
"cocogo/pkg/logger" "cocogo/pkg/logger"
"cocogo/pkg/service" "cocogo/pkg/model"
"cocogo/pkg/utils" "cocogo/pkg/utils"
) )
func NewSwitchSession(userConn UserConnection, serverConn ServerConnection) (sw *SwitchSession) { func NewSwitchSession(p *ProxyServer) (sw *SwitchSession) {
sw = &SwitchSession{userConn: userConn, srvConn: serverConn} sw = &SwitchSession{p: p}
sw.Initial() sw.Initial()
return sw return sw
} }
type SwitchSession struct { type SwitchSession struct {
Id string Id string
User string `json:"user"` p *ProxyServer
Server string `json:"asset"`
SystemUser string `json:"system_user"` DateStart string
Org string `json:"org_id"` DateEnd string
LoginFrom string `json:"login_from"` DateActive time.Time
RemoteAddr string `json:"remote_addr"` finished bool
DateStart string `json:"date_start"`
DateEnd string `json:"date_end"`
DateActive time.Time `json:"date_last_active"`
Finished bool `json:"is_finished"`
Closed bool
MaxIdleTime int MaxIdleTime int
...@@ -39,8 +36,6 @@ type SwitchSession struct { ...@@ -39,8 +36,6 @@ type SwitchSession struct {
replayRecorder *ReplyRecorder replayRecorder *ReplyRecorder
parser *Parser parser *Parser
userConn UserConnection
srvConn ServerConnection
userTran Transport userTran Transport
srvTran Transport srvTran Transport
...@@ -50,11 +45,6 @@ type SwitchSession struct { ...@@ -50,11 +45,6 @@ type SwitchSession struct {
func (s *SwitchSession) Initial() { func (s *SwitchSession) Initial() {
s.Id = uuid.NewV4().String() s.Id = uuid.NewV4().String()
s.User = s.userConn.User()
s.Server = s.srvConn.Name()
s.SystemUser = s.srvConn.User()
s.LoginFrom = s.userConn.LoginFrom()
s.RemoteAddr = s.userConn.RemoteAddr()
s.DateStart = time.Now().UTC().Format("2006-01-02 15:04:05 +0000") s.DateStart = time.Now().UTC().Format("2006-01-02 15:04:05 +0000")
s.MaxIdleTime = config.GetConf().MaxIdleTime s.MaxIdleTime = config.GetConf().MaxIdleTime
s.cmdRecorder = NewCommandRecorder(s) s.cmdRecorder = NewCommandRecorder(s)
...@@ -66,104 +56,96 @@ func (s *SwitchSession) Initial() { ...@@ -66,104 +56,96 @@ func (s *SwitchSession) Initial() {
} }
func (s *SwitchSession) Terminate() { func (s *SwitchSession) Terminate() {
if !s.Closed { select {
msg := i18n.T("Terminated by administrator") case <-s.ctx.Done():
utils.IgnoreErrWriteString(s.userConn, msg) return
s.cancel() default:
s.Closed = true
} }
s.cancel()
} }
func (s *SwitchSession) recordCmd() { func (s *SwitchSession) recordCommand() {
for cmd := range s.parser.cmdRecordChan { for command := range s.parser.cmdRecordChan {
if command[0] == "" && command[1] == "" {
continue
}
cmd := s.generateCommandResult(command)
s.cmdRecorder.Record(cmd) s.cmdRecorder.Record(cmd)
} }
} }
func (s *SwitchSession) MapData() map[string]interface{} { func (s *SwitchSession) generateCommandResult(command [2]string) *model.Command {
var dataEnd interface{} var input string
if s.DateEnd == "" { var output string
dataEnd = nil if len(command[0]) > 128 {
input = command[0][:128]
} else { } else {
dataEnd = s.DateEnd input = command[0]
} }
return map[string]interface{}{ i := strings.LastIndexByte(command[1], '\r')
"id": s.Id,
"user": s.User, if i > 1024 {
"asset": s.Server, output = output[:1024]
"org_id": s.Org, } else if i > 0 {
"login_from": s.LoginFrom, output = command[1][:i]
"system_user": s.SystemUser, } else {
"remote_addr": s.RemoteAddr, output = command[1]
"is_finished": s.Finished, }
"date_start": s.DateStart,
"date_end": dataEnd, return &model.Command{
SessionId: s.Id,
OrgId: s.p.Asset.OrgID,
Input: input,
Output: output,
User: s.p.User.Username,
Server: s.p.Asset.Hostname,
SystemUser: s.p.SystemUser.Username,
Timestamp: time.Now().Unix(),
} }
} }
func (s *SwitchSession) postBridge() { func (s *SwitchSession) postBridge() {
s.DateEnd = time.Now().UTC().Format("2006-01-02 15:04:05 +0000")
s.finished = true
_ = s.userTran.Close() _ = s.userTran.Close()
_ = s.srvTran.Close() _ = s.srvTran.Close()
s.parser.Close() s.parser.Close()
s.replayRecorder.End() s.replayRecorder.End()
s.cmdRecorder.End() s.cmdRecorder.End()
s.finishSession()
}
func (s *SwitchSession) finishSession() {
s.DateEnd = time.Now().UTC().Format("2006-01-02 15:04:05 +0000")
service.FinishSession(s.MapData())
service.FinishReply(s.Id)
logger.Debugf("finish Session: %s", s.Id)
} }
func (s *SwitchSession) creatSession() bool { func (s *SwitchSession) SetFilterRules(cmdRules []model.SystemUserFilterRule) {
for i := 0; i < 5; i++ {
if service.CreateSession(s.MapData()) {
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}
func (s *SwitchSession) SetFilterRules(systemUserId string) {
cmdRules, err := service.GetSystemUserFilterRules(systemUserId)
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
s.parser.SetCMDFilterRules(cmdRules) s.parser.SetCMDFilterRules(cmdRules)
} }
func (s *SwitchSession) Bridge() (err error) { func (s *SwitchSession) Bridge(userConn UserConnection, srvConn ServerConnection) (err error) {
if !s.creatSession() { winCh := userConn.WinCh()
msg := i18n.T("Connect with api server failed") s.srvTran = NewDirectTransport(s.Id, srvConn)
msg = utils.WrapperWarn(msg) s.userTran = NewDirectTransport(s.Id, userConn)
utils.IgnoreErrWriteString(s.userConn, msg)
return
}
winCh := s.userConn.WinCh()
s.userTran = NewDirectTransport("", s.userConn)
s.srvTran = NewDirectTransport("", s.srvConn)
defer func() { defer func() {
logger.Info("Session bridge done: ", s.Id) logger.Info("Session bridge done: ", s.Id)
}() }()
go s.parser.Parse() go s.parser.Parse()
go s.recordCmd() go s.recordCommand()
defer s.postBridge() defer s.postBridge()
for { for {
select { select {
// 检测是否超过最大空闲时间 // 检测是否超过最大空闲时间
case <-time.After(time.Duration(s.MaxIdleTime) * time.Minute): case <-time.After(time.Duration(s.MaxIdleTime) * time.Minute):
msg := i18n.T(fmt.Sprintf("Connect idle more than %d minutes, disconnect", s.MaxIdleTime))
msg = utils.WrapperWarn(msg)
utils.IgnoreErrWriteString(s.userTran, msg)
return return
// 手动结束 // 手动结束
case <-s.ctx.Done(): case <-s.ctx.Done():
msg := i18n.T("Terminated by administrator")
utils.IgnoreErrWriteString(userConn, msg)
return return
// 监控窗口大小变化 // 监控窗口大小变化
case win := <-winCh: case win := <-winCh:
_ = s.srvConn.SetWinSize(win.Height, win.Width) _ = srvConn.SetWinSize(win.Height, win.Width)
logger.Debugf("Window server change: %d*%d", win.Height, win.Width) logger.Debugf("Window server change: %d*%d", win.Height, win.Width)
// Server发来数据流入parser中 // Server发来数据流入parser中
case p, ok := <-s.srvTran.Chan(): case p, ok := <-s.srvTran.Chan():
...@@ -195,3 +177,22 @@ func (s *SwitchSession) Bridge() (err error) { ...@@ -195,3 +177,22 @@ func (s *SwitchSession) Bridge() (err error) {
} }
} }
} }
func (s *SwitchSession) MapData() map[string]interface{} {
var dataEnd interface{}
if s.DateEnd != "" {
dataEnd = s.DateEnd
}
return map[string]interface{}{
"id": s.Id,
"user": s.p.User.Name,
"asset": s.p.Asset.Hostname,
"org_id": s.p.Asset.OrgID,
"login_from": s.p.UserConn.LoginFrom(),
"system_user": s.p.SystemUser.Username,
"remote_addr": s.p.UserConn.RemoteAddr(),
"is_finished": s.finished,
"date_start": s.DateStart,
"date_end": dataEnd,
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment