Commit 424f76bf authored by ibuler's avatar ibuler

[Update] 修改支持parser

parent 2b6f937d
package main
package proxy package proxy
import ( import (
"cocogo/pkg/logger"
"fmt" "fmt"
"io" "io"
"net" "net"
...@@ -14,8 +13,8 @@ type ServerConnection interface { ...@@ -14,8 +13,8 @@ type ServerConnection interface {
Writer() io.WriteCloser Writer() io.WriteCloser
Reader() io.Reader Reader() io.Reader
Protocol() string Protocol() string
Connect() error Connect(h, w int, term string) error
SetWinSize(w, h int) SetWinSize(w, h int) error
Close() Close()
} }
...@@ -102,9 +101,8 @@ func (sc *SSHConnection) connect() (client *gossh.Client, err error) { ...@@ -102,9 +101,8 @@ func (sc *SSHConnection) connect() (client *gossh.Client, err error) {
return client, nil return client, nil
} }
func (sc *SSHConnection) Connect(h, w int, term string) (err error) { func (sc *SSHConnection) invokeShell(h, w int, term string) (err error) {
client, err := sc.connect() sess, err := sc.client.NewSession()
sess, err := client.NewSession()
if err != nil { if err != nil {
return return
} }
...@@ -116,7 +114,6 @@ func (sc *SSHConnection) Connect(h, w int, term string) (err error) { ...@@ -116,7 +114,6 @@ func (sc *SSHConnection) Connect(h, w int, term string) (err error) {
} }
err = sess.RequestPty(term, h, w, modes) err = sess.RequestPty(term, h, w, modes)
if err != nil { if err != nil {
logger.Errorf("Request pty error: %s", err)
return return
} }
sc.stdin, err = sess.StdinPipe() sc.stdin, err = sess.StdinPipe()
...@@ -131,6 +128,18 @@ func (sc *SSHConnection) Connect(h, w int, term string) (err error) { ...@@ -131,6 +128,18 @@ func (sc *SSHConnection) Connect(h, w int, term string) (err error) {
return err return err
} }
func (sc *SSHConnection) Connect(h, w int, term string) (err error) {
_, err = sc.connect()
if err != nil {
return
}
err = sc.invokeShell(h, w, term)
if err != nil {
return
}
return nil
}
func (sc *SSHConnection) Reader() (reader io.Reader) { func (sc *SSHConnection) Reader() (reader io.Reader) {
return sc.stdout return sc.stdout
} }
......
package proxy
import (
"bytes"
"cocogo/pkg/logger"
"fmt"
"sync"
)
type ParseRule func([]byte) bool
var (
vimEnterMark = []byte("\x1b[?25l\x1b[37;1H\x1b[1m")
vimExitMark = []byte("\x1b[37;1H\x1b[K\x1b")
zmodemRecvStartMark = []byte("rz waiting to receive.**\x18B0100")
zmodemSendStartMark = []byte("**\x18B00000000000000")
zmodemCancelMark = []byte("\x18\x18\x18\x18\x18")
zmodemEndMark = []byte("**\x18B0800000000022d")
zmodemStateSend = "send"
zmodemStateRecv = "recv"
charEnter = []byte("\r")
)
// Parse 解析用户输入输出, 拦截过滤用户输入输出
type Parser struct {
inputBuf *bytes.Buffer
cmdBuf *bytes.Buffer
outputBuf *bytes.Buffer
userInputChan chan []byte
serverInputChan chan []byte
inputInitial bool
inputPreState bool
inputState bool
multiInputState bool
zmodemState string
inVimState bool
once sync.Once
}
// parseInputState 切换用户输入状态
func (p *Parser) parseInputState(b []byte) {
if p.inVimState || p.zmodemState != "" {
return
}
p.inputPreState = p.inputState
if bytes.Contains(b, charEnter) {
p.inputState = false
//fmt.Printf("Command: %s\n", p.inputBuf.String())
p.inputBuf.Reset()
} else {
p.inputState = true
if !p.inputPreState {
//fmt.Printf("Output: %s\n", p.outputBuf.String())
p.outputBuf.Reset()
}
}
}
func (p *Parser) parseInputNewLine(b []byte) []byte {
b = bytes.Replace(b, []byte{'\r', '\r', '\n'}, []byte{'\r'}, -1)
b = bytes.Replace(b, []byte{'\r', '\n'}, []byte{'\r'}, -1)
b = bytes.Replace(b, []byte{'\n'}, []byte{'\r'}, -1)
return b
}
func (p *Parser) ParseUserInput(b []byte) []byte {
p.once.Do(func() {
p.inputInitial = true
})
nb := p.parseInputNewLine(b)
p.inputBuf.Write(nb)
fmt.Printf("User input: %b\n", b)
p.parseInputState(nb)
return b
}
func (p *Parser) parseVimState(b []byte) {
if p.zmodemState == "" && !p.inVimState && bytes.Contains(b, vimEnterMark) {
p.inVimState = true
logger.Debug("In vim state: true")
}
if p.zmodemState == "" && p.inVimState && bytes.Contains(b, vimExitMark) {
p.inVimState = false
logger.Debug("In vim state: false")
}
}
func (p *Parser) parseZmodemState(b []byte) {
if p.zmodemState == "" {
if bytes.Contains(b[:50], zmodemRecvStartMark) {
p.zmodemState = zmodemStateRecv
logger.Debug("Zmodem in recv state")
} else if bytes.Contains(b[:24], zmodemSendStartMark) {
p.zmodemState = zmodemStateSend
logger.Debug("Zmodem in send state")
}
} else {
if bytes.Contains(b[:24], zmodemEndMark) {
logger.Debug("Zmodem end")
p.zmodemState = ""
} else if bytes.Contains(b[:24], zmodemCancelMark) {
logger.Debug("Zmodem cancel")
p.zmodemState = ""
}
}
}
func (p *Parser) parseCommand(b []byte) {
if p.inputState {
p.cmdBuf.Write(b)
} else {
p.outputBuf.Write(b)
}
}
func (p *Parser) ParseServerOutput(b []byte) []byte {
p.parseVimState(b)
p.parseZmodemState(b)
p.parseCommand(b)
fmt.Printf("Server output: %s\n", b)
return b
}
package proxy package proxy
import ( import (
"bytes"
"context"
"sync"
"github.com/ibuler/ssh"
"cocogo/pkg/logger" "cocogo/pkg/logger"
"cocogo/pkg/sdk" "cocogo/pkg/sdk"
"cocogo/pkg/service" "cocogo/pkg/service"
"fmt"
"github.com/ibuler/ssh"
) )
type ProxyServer struct { type ProxyServer struct {
...@@ -45,72 +49,35 @@ func (p *ProxyServer) sendConnectingMsg() { ...@@ -45,72 +49,35 @@ func (p *ProxyServer) sendConnectingMsg() {
} }
func (p *ProxyServer) Proxy() { func (p *ProxyServer) Proxy(ctx context.Context) {
if !p.checkProtocol() { if !p.checkProtocol() {
return return
} }
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>Proxy")
ptyReq, winCh, ok := p.Session.Pty()
if !ok {
logger.Error("Pty not ok")
return
}
conn := SSHConnection{ conn := SSHConnection{
Host: "192.168.244.185", Host: "192.168.244.185",
Port: "22", Port: "22",
User: "root", User: "root",
Password: "redhat", Password: "redhat",
} }
ptyReq, _, ok := p.Session.Pty()
if !ok {
logger.Error("Pty not ok")
return
}
err := conn.Connect(ptyReq.Window.Height, ptyReq.Window.Width, ptyReq.Term) err := conn.Connect(ptyReq.Window.Height, ptyReq.Window.Width, ptyReq.Term)
if err != nil { if err != nil {
return return
} }
sw := Switch{
go func() { userSession: p.Session,
for { serverConn: &conn,
select { parser: &Parser{
case win, ok := <-winCh: once: sync.Once{},
if !ok { userInputChan: make(chan []byte, 5),
return inputBuf: new(bytes.Buffer),
} outputBuf: new(bytes.Buffer),
err := conn.SetWinSize(win.Height, win.Width) cmdBuf: new(bytes.Buffer),
if err != nil { },
logger.Error("windowChange err: ", win) }
return sw.Bridge(ctx)
}
logger.Info("windowChange: ", win)
}
}
}()
go func() {
buf := make([]byte, 1024)
writer := conn.Writer()
for {
fmt.Println("Start read from user session")
nr, err := p.Session.Read(buf)
fmt.Printf("get ddata from user: %s\n", buf)
if err != nil {
logger.Error("...............")
}
writer.Write(buf[:nr])
}
}()
go func() {
buf := make([]byte, 1024)
reader := conn.Reader()
fmt.Printf("Go func stdout pip")
for {
fmt.Printf("Start read from server\n")
nr, err := reader.Read(buf)
fmt.Printf("Read data from server: %s\n", buf)
if err != nil {
logger.Error("Read error")
}
p.Session.Write(buf[:nr])
}
}()
conn.Session.Wait()
} }
package proxy
import (
"context"
"fmt"
"sync"
"time"
"github.com/ibuler/ssh"
"cocogo/pkg/logger"
)
type Switch struct {
Id string `json:"id"`
User string `json:"user"`
Asset string `json:"asset"`
SystemUser string `json:"system_user"`
Org string `json:"org_id"`
LoginFrom string `json:"login_from"`
RemoteAddr string `json:"remote_addr"`
DateStart time.Time `json:"date_start"`
DateEnd time.Time `json:"date_end"`
DateActive time.Time `json:"date_last_active"`
Finished bool `json:"is_finished"`
Closed bool
parser *Parser
userSession ssh.Session
serverConn ServerConnection
closeChan chan struct{}
}
func (s *Switch) preBridge() {
}
func (s *Switch) postBridge() {
}
func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
logger.Debug("Watch window change routine end")
}()
for {
select {
case <-ctx.Done():
return
case win, ok := <-winCh:
if !ok {
break
}
err := s.serverConn.SetWinSize(win.Height, win.Width)
if err != nil {
logger.Error("Change server win size err: ", err)
break
}
logger.Debugf("Window server change: %d*%d", win.Height, win.Width)
}
}
}
func (s *Switch) readUserToServer(wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
logger.Debug("Read user to server end")
}()
buf := make([]byte, 1024)
writer := s.serverConn.Writer()
for {
nr, err := s.userSession.Read(buf)
if err != nil {
return
}
buf2 := s.parser.ParseUserInput(buf[:nr])
_, err = writer.Write(buf2)
if err != nil {
return
}
}
}
func (s *Switch) readServerToUser(wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
logger.Debug("Read server to user end")
}()
buf := make([]byte, 1024)
reader := s.serverConn.Reader()
for {
nr, err := reader.Read(buf)
if err != nil {
logger.Errorf("Read from server error: %s", err)
break
}
buf2 := s.parser.ParseServerOutput(buf[:nr])
_, err = s.userSession.Write(buf2)
if err != nil {
break
}
}
}
func (s *Switch) Bridge(ctx context.Context) (err error) {
_, winCh, _ := s.userSession.Pty()
wg := sync.WaitGroup{}
wg.Add(3)
go s.watchWindowChange(ctx, winCh, &wg)
go s.readUserToServer(&wg)
go s.readServerToUser(&wg)
wg.Wait()
fmt.Println("Bride end")
return
}
...@@ -329,7 +329,7 @@ func (i *InteractiveHandler) searchNodeAssets(num int) (assets []sdk.Asset) { ...@@ -329,7 +329,7 @@ func (i *InteractiveHandler) searchNodeAssets(num int) (assets []sdk.Asset) {
func (i *InteractiveHandler) Proxy(ctx context.Context) { func (i *InteractiveHandler) Proxy(ctx context.Context) {
p := proxy.ProxyServer{Session: i.sess} p := proxy.ProxyServer{Session: i.sess}
p.Proxy() p.Proxy(ctx)
} }
// /* // /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment