Commit 9d7e92bd authored by ibuler's avatar ibuler

[Update] 修改一些内容

parent f2079123
......@@ -67,10 +67,8 @@ func (p *Parser) initial() {
p.once = sync.Once{}
p.cmdInputParser = &CmdParser{}
p.cmdOutputParser = &CmdParser{}
p.cmdInputParser.Initial()
p.cmdOutputParser.Initial()
p.cmdInputParser = NewCmdParser()
p.cmdOutputParser = NewCmdParser()
p.userInputChan = make(chan []byte, 1024)
p.userOutputChan = make(chan []byte, 1024)
......@@ -116,6 +114,7 @@ func (p *Parser) parseInputState(b []byte) []byte {
p.parseCmdInput()
if p.IsCommandForbidden() {
fbdMsg := utils.WrapperWarn(fmt.Sprintf(i18n.T("Command `%s` is forbidden"), p.command))
p.outputBuf.WriteString(fbdMsg)
p.srvOutputChan <- []byte("\r\n" + fbdMsg)
return []byte{utils.CharCleanLine, '\r'}
}
......@@ -144,9 +143,9 @@ func (p *Parser) parseCmdOutput() {
}
func (p *Parser) replaceInputNewLine(b []byte) []byte {
b = bytes.Replace(b, []byte{'\r', '\r', '\n'}, []byte{'\r'}, -1)
b = bytes.Replace(b, []byte{'\r', '\n'}, []byte{'\r'}, -1)
b = bytes.Replace(b, []byte{'\n'}, []byte{'\r'}, -1)
//b = bytes.Replace(b, []byte{'\r', '\r', '\n'}, []byte{'\r'}, -1)
//b = bytes.Replace(b, []byte{'\r', '\n'}, []byte{'\r'}, -1)
//b = bytes.Replace(b, []byte{'\n'}, []byte{'\r'}, -1)
return b
}
......@@ -202,7 +201,11 @@ func (p *Parser) splitCmdStream(b []byte) {
}
if p.inputState {
p.cmdBuf.Write(b)
} else {
return
}
// outputBuff 最大存储1024, 否则可能撑爆内存
// 如果最后一个字符不是ascii, 可以截断了某个中文字符的一部分,为了安全继续添加
if p.outputBuf.Len() < 1024 || p.outputBuf.Bytes()[p.outputBuf.Len()-1] > 128 {
p.outputBuf.Write(b)
}
}
......
......@@ -2,11 +2,19 @@ package proxy
import (
"bytes"
"strings"
"cocogo/pkg/utils"
"regexp"
"strings"
)
var ps1Pattern, _ = regexp.Compile("^\\[?.*@.*\\]?[\\$#]\\s|mysql>\\s")
func NewCmdParser() *CmdParser {
parser := &CmdParser{}
parser.initial()
return parser
}
type CmdParser struct {
term *utils.Terminal
buf *bytes.Buffer
......@@ -16,15 +24,22 @@ func (cp *CmdParser) Reset() {
cp.buf.Reset()
}
func (cp *CmdParser) Initial() {
func (cp *CmdParser) initial() {
cp.buf = new(bytes.Buffer)
cp.term = utils.NewTerminal(cp.buf, "")
cp.term.SetEcho(false)
}
func (cp *CmdParser) parsePS1(s string) string {
return ps1Pattern.ReplaceAllString(s, "")
}
func (cp *CmdParser) Parse(b []byte) string {
cp.buf.Write(b)
cp.buf.WriteString("\r")
lines, _ := cp.term.ReadLines()
return strings.TrimSpace(strings.Join(lines, "\r\n"))
cp.Reset()
output := strings.TrimSpace(strings.Join(lines, "\r\n"))
output = cp.parsePS1(output)
return output
}
package proxy
import (
"fmt"
"testing"
)
func TestCmdParser_Parse(t *testing.T) {
p := NewCmdParser()
var b = []byte("ifconfig \x08\x1b[K\x08\x1b[K\x08\x1b[K\x08\x1b[K\x08\x1b[K\x08\x1b[Konfig")
data := p.Parse(b)
if data != "ifconfig" {
t.Error("data should be ifconfig but not")
}
b = []byte("ifconfig\xe4\xbd\xa0")
data = p.Parse(b)
fmt.Println("line: ", data)
}
......@@ -16,6 +16,18 @@ import (
var conf = config.Conf
func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) {
recorder = &CommandRecorder{Session: sess}
recorder.initial()
return recorder
}
func NewReplyRecord(sess *SwitchSession) (recorder *ReplyRecorder) {
recorder = &ReplyRecorder{Session: sess}
recorder.initial()
return recorder
}
type CommandRecorder struct {
Session *SwitchSession
storage CommandStorage
......@@ -23,20 +35,11 @@ type CommandRecorder struct {
queue chan *model.Command
}
func NewCommandRecorder(sess *SwitchSession) (recorder *CommandRecorder) {
storage := NewCommandStorage()
recorder = &CommandRecorder{Session: sess, queue: make(chan *model.Command, 10), storage: storage}
go recorder.record()
recorder.Start()
return recorder
}
func NewReplyRecord(sess *SwitchSession) (recorder *ReplyRecorder) {
storage := NewReplayStorage()
srvStorage := &ServerReplayStorage{}
recorder = &ReplyRecorder{SessionID: sess.Id, storage: storage, backOffStorage: srvStorage}
recorder.Start()
return recorder
func (c *CommandRecorder) initial() {
c.queue = make(chan *model.Command, 10)
//c.storage = NewCommandStorage()
c.storage, _ = NewFileCommandStorage("/tmp/abc.log")
go c.record()
}
func (c *CommandRecorder) Record(command [2]string) {
......@@ -56,9 +59,6 @@ func (c *CommandRecorder) Record(command [2]string) {
c.queue <- cmd
}
func (c *CommandRecorder) Start() {
}
func (c *CommandRecorder) End() {
close(c.queue)
}
......@@ -87,14 +87,14 @@ func (c *CommandRecorder) record() {
cmdList = cmdList[:0]
continue
}
if len(cmdList) > 10 {
if len(cmdList) > 1024 {
cmdList = cmdList[1:]
}
}
}
type ReplyRecorder struct {
SessionID string
Session *SwitchSession
absFilePath string
absGzFilePath string
......@@ -106,21 +106,31 @@ type ReplyRecorder struct {
backOffStorage ReplayStorage
}
func (r *ReplyRecorder) initial() {
storage := NewReplayStorage()
backOffStorage := &ServerReplayStorage{}
r.storage = storage
r.backOffStorage = backOffStorage
r.prepare()
}
func (r *ReplyRecorder) Record(b []byte) {
if len(b) > 0 {
delta := float64(time.Now().UnixNano()-r.timeStartNano) / 1000 / 1000 / 1000
data, _ := json.Marshal(string(b))
_, _ = r.file.WriteString(fmt.Sprintf(`"%.3f":%s`, delta, data))
_, _ = r.file.WriteString(fmt.Sprintf(`"%.3f":%s,`, delta, data))
}
}
func (r *ReplyRecorder) Start() {
func (r *ReplyRecorder) prepare() {
sessionId := r.Session.Id
rootPath := conf.RootPath
today := time.Now().UTC().Format("2006-01-02")
gzFileName := r.SessionID + ".replay.gz"
gzFileName := sessionId + ".replay.gz"
replayDir := filepath.Join(rootPath, "data", "replays", today)
r.absFilePath = filepath.Join(replayDir, r.SessionID)
r.absFilePath = filepath.Join(replayDir, sessionId)
r.absGzFilePath = filepath.Join(replayDir, today, gzFileName)
r.target = strings.Join([]string{today, gzFileName}, "/")
r.timeStartNano = time.Now().UnixNano()
......@@ -140,6 +150,7 @@ func (r *ReplyRecorder) Start() {
}
func (r *ReplyRecorder) End() {
_, _ = r.file.WriteString(fmt.Sprintf(`"%.3f":%s}`, 0.0, ""))
_ = r.file.Close()
go r.uploadReplay()
}
......
......@@ -4,6 +4,8 @@ import (
"cocogo/pkg/config"
"cocogo/pkg/model"
"cocogo/pkg/service"
"fmt"
"os"
)
type ReplayStorage interface {
......@@ -45,6 +47,28 @@ func (s *ServerCommandStorage) BulkSave(commands []*model.Command) (err error) {
return service.PushSessionCommand(commands)
}
func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) {
file, err := os.Create(name)
if err != nil {
return
}
storage = &FileCommandStorage{file: file}
return
}
type FileCommandStorage struct {
file *os.File
}
func (f *FileCommandStorage) BulkSave(commands []*model.Command) (err error) {
for _, cmd := range commands {
f.file.WriteString(fmt.Sprintf("命令: %s\n", cmd.Input))
f.file.WriteString(fmt.Sprintf("结果: %s\n", cmd.Output))
f.file.WriteString("---\n")
}
return
}
type ServerReplayStorage struct {
StorageType string
}
......
......@@ -22,9 +22,8 @@ func TestSSHConnection_Config(t *testing.T) {
}
func TestSSHConnection_Connect(t *testing.T) {
client, err := testConnection.Connect()
err := testConnection.Connect(24, 80, "xterm")
if err != nil {
t.Errorf("Connect error %s", err)
}
fmt.Println(string(client.ServerVersion()))
}
......@@ -42,8 +42,8 @@ func (dt *DirectTransport) Chan() <-chan []byte {
}
func (dt *DirectTransport) Keep() {
buf := make([]byte, 1024)
for {
buf := make([]byte, 1024)
n, err := dt.readWriter.Read(buf)
if err != nil {
_ = dt.Close()
......@@ -60,7 +60,7 @@ func (dt *DirectTransport) Keep() {
}
func NewDirectTransport(name string, readWriter io.ReadWriter) Transport {
ch := make(chan []byte, 1024)
ch := make(chan []byte, 1024*32)
tr := DirectTransport{readWriter: readWriter, ch: ch}
go tr.Keep()
return &tr
......
......@@ -6,7 +6,6 @@ package utils
import (
"bytes"
"fmt"
"io"
"strconv"
"sync"
......@@ -796,9 +795,7 @@ func (t *Terminal) readLine() (line string, err error) {
lineOk := false
for !lineOk {
var key rune
fmt.Printf("Before: Rest %d\n", len(rest))
key, rest = bytesToKey(rest, t.pasteActive)
fmt.Printf("After: Key: %v Rest %d\n", key, len(rest))
time.Sleep(10 * time.Millisecond)
if key == utf8.RuneError {
break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment