Commit 2f4e004c authored by ibuler's avatar ibuler

[Update] 修改proxy

parent 5533c490
package parser
type SpecialRuler interface {
// 匹配规则
MatchRule([]byte) bool
// 进入状态
EnterStatus() bool
// 退出状态
ExitStatus() bool
}
package parser
import (
"bytes"
"sync"
)
func NewTerminalParser() *TerminalParser {
return &TerminalParser{
Once: sync.Once{},
Started: false,
InputStatus: false,
OutputStatus: false,
CmdInputBuf: new(bytes.Buffer),
CmdOutputBuf: new(bytes.Buffer),
}
}
type TerminalParser struct {
Once sync.Once
Started bool
InputStatus bool
OutputStatus bool
CmdInputBuf *bytes.Buffer // node对用户输入的回写数据
CmdOutputBuf *bytes.Buffer // node对用户按下enter按键之后,返回的数据
}
func (t *TerminalParser) Reset() {
t.CmdInputBuf.Reset()
t.CmdOutputBuf.Reset()
}
func (t *TerminalParser) ParseCommandInput() string {
return t.CmdInputBuf.String()
}
func (t *TerminalParser) ParseCommandResult() string {
return t.CmdOutputBuf.String()
}
func (t *TerminalParser) IsEnterKey(b []byte) bool {
return len(b) == 1 && b[0] == 13
}
package parser
import (
"regexp"
)
type RuleFilter interface {
// 判断是否是匹配当前规则
Match(string) bool
// 是否阻断命令
BlockCommand() bool
}
type Rule struct {
priority int
ruleType string
contents []string
action bool
}
func (r *Rule) Match(s string) bool {
switch r.ruleType {
case "command":
for _, content := range r.contents {
if content == s {
return true
}
}
return false
default:
for _, content := range r.contents {
if matched, _ := regexp.MatchString(content, s); matched {
return true
}
}
return false
}
}
func (r *Rule) BlockCommand() bool {
return r.action
}
......@@ -10,12 +10,10 @@ import (
)
type ServerConnection interface {
Writer() io.WriteCloser
Reader() io.Reader
io.ReadWriteCloser
Protocol() string
Connect(h, w int, term string) error
SetWinSize(w, h int) error
Close()
}
type SSHConnection struct {
......@@ -41,16 +39,16 @@ func (sc *SSHConnection) Protocol() string {
}
func (sc *SSHConnection) Config() (config *gossh.ClientConfig, err error) {
auths := make([]gossh.AuthMethod, 0)
authMethods := make([]gossh.AuthMethod, 0)
if sc.Password != "" {
auths = append(auths, gossh.Password(sc.Password))
authMethods = append(authMethods, gossh.Password(sc.Password))
}
if sc.PrivateKeyPath != "" {
if pubkey, err := GetPubKeyFromFile(sc.PrivateKeyPath); err != nil {
err = fmt.Errorf("parse private key from file error: %sc", err)
return config, err
} else {
auths = append(auths, gossh.PublicKeys(pubkey))
authMethods = append(authMethods, gossh.PublicKeys(pubkey))
}
}
if sc.PrivateKey != "" {
......@@ -58,12 +56,12 @@ func (sc *SSHConnection) Config() (config *gossh.ClientConfig, err error) {
err = fmt.Errorf("parse private key error: %sc", err)
return config, err
} else {
auths = append(auths, gossh.PublicKeys(signer))
authMethods = append(authMethods, gossh.PublicKeys(signer))
}
}
config = &gossh.ClientConfig{
User: sc.User,
Auth: auths,
Auth: authMethods,
HostKeyCallback: gossh.InsecureIgnoreHostKey(),
Timeout: sc.Timeout,
}
......@@ -140,26 +138,27 @@ func (sc *SSHConnection) Connect(h, w int, term string) (err error) {
return nil
}
func (sc *SSHConnection) Reader() (reader io.Reader) {
return sc.stdout
func (sc *SSHConnection) SetWinSize(h, w int) error {
return sc.Session.WindowChange(h, w)
}
func (sc *SSHConnection) Writer() (writer io.WriteCloser) {
return sc.stdin
func (sc *SSHConnection) Read(p []byte) (n int, err error) {
return sc.stdout.Read(p)
}
func (sc *SSHConnection) SetWinSize(h, w int) error {
return sc.Session.WindowChange(h, w)
func (sc *SSHConnection) Write(p []byte) (n int, err error) {
return sc.stdin.Write(p)
}
func (sc *SSHConnection) Close() {
func (sc *SSHConnection) Close() (err error) {
if sc.closed {
return
}
_ = sc.Session.Close()
_ = sc.client.Close()
err = sc.Session.Close()
err = sc.client.Close()
if sc.proxyConn != nil {
_ = sc.proxyConn.Close()
err = sc.proxyConn.Close()
}
sc.closed = true
return
}
......@@ -3,17 +3,12 @@ package proxy
import (
"bytes"
"fmt"
"os"
"strings"
"sync"
"cocogo/pkg/logger"
"cocogo/pkg/model"
"cocogo/pkg/utils"
)
type ParseRule func([]byte) bool
var (
// Todo: Vim过滤依然存在问题
vimEnterMark = []byte("\x1b[?25l\x1b[37;1H\x1b[1m")
......@@ -29,43 +24,20 @@ var (
charEnter = []byte("\r")
)
type CmdParser struct {
term *utils.Terminal
buf *bytes.Buffer
}
func (cp *CmdParser) Reset() {
cp.buf.Reset()
}
func (cp *CmdParser) Initial() {
cp.buf = new(bytes.Buffer)
cp.term = utils.NewTerminal(cp.buf, "")
cp.term.SetEcho(false)
}
func (cp *CmdParser) Parse(b []byte) string {
cp.buf.Write(b)
cp.buf.WriteString("\r")
lines, _ := cp.term.ReadLines()
return strings.TrimSpace(strings.Join(lines, "\r\n"))
}
// Parse 解析用户输入输出, 拦截过滤用户输入输出
type Parser struct {
inputBuf *bytes.Buffer
cmdBuf *bytes.Buffer
outputBuf *bytes.Buffer
filterRules []model.SystemUserFilterRule
cmdFilterRules []model.SystemUserFilterRule
inputInitial bool
inputPreState bool
inputState bool
multiInputState bool
zmodemState string
inVimState bool
once sync.Once
inputInitial bool
inputPreState bool
inputState bool
zmodemState string
inVimState bool
once sync.Once
command string
output string
......@@ -106,19 +78,16 @@ func (p *Parser) parseInputState(b []byte) {
}
}
var f, _ = os.Create("/tmp/cmd.text")
func (p *Parser) parseCmdInput() {
parser := CmdParser{}
parser.Initial()
data := p.cmdBuf.Bytes()
fmt.Printf("原始输入: %b\n", data)
line := parser.Parse(data)
data2 := fmt.Sprintf("[%d] 命令: %s\n", p.counter, line)
fmt.Printf(data2)
p.cmdBuf.Reset()
p.inputBuf.Reset()
f.WriteString(data2)
p.counter += 1
}
func (p *Parser) parseCmdOutput() {
......@@ -128,8 +97,6 @@ func (p *Parser) parseCmdOutput() {
_ = fmt.Sprintf("[%d] 结果: %s\n", p.counter, line)
fmt.Printf(data2)
p.outputBuf.Reset()
f.WriteString(data2)
p.counter += 1
}
func (p *Parser) parseInputNewLine(b []byte) []byte {
......@@ -161,8 +128,11 @@ func (p *Parser) parseVimState(b []byte) {
}
func (p *Parser) parseZmodemState(b []byte) {
if len(b) < 25 {
return
}
if p.zmodemState == "" {
if bytes.Contains(b[:50], zmodemRecvStartMark) {
if len(b) > 50 && bytes.Contains(b[:50], zmodemRecvStartMark) {
p.zmodemState = zmodemStateRecv
logger.Debug("Zmodem in recv state")
} else if bytes.Contains(b[:24], zmodemSendStartMark) {
......@@ -197,3 +167,7 @@ func (p *Parser) ParseServerOutput(b []byte) []byte {
p.parseCommand(b)
return b
}
func (p *Parser) SetCMDFilterRules(rules []model.SystemUserFilterRule) {
p.cmdFilterRules = rules
}
package proxy
import (
"bytes"
"strings"
"cocogo/pkg/utils"
)
type CmdParser struct {
term *utils.Terminal
buf *bytes.Buffer
}
func (cp *CmdParser) Reset() {
cp.buf.Reset()
}
func (cp *CmdParser) Initial() {
cp.buf = new(bytes.Buffer)
cp.term = utils.NewTerminal(cp.buf, "")
cp.term.SetEcho(false)
}
func (cp *CmdParser) Parse(b []byte) string {
cp.buf.Write(b)
cp.buf.WriteString("\r")
lines, _ := cp.term.ReadLines()
return strings.TrimSpace(strings.Join(lines, "\r\n"))
}
package proxy
import (
"context"
"github.com/ibuler/ssh"
"cocogo/pkg/logger"
......@@ -46,15 +45,15 @@ func (p *ProxyServer) sendConnectingMsg() {
}
func (p *ProxyServer) Proxy(ctx context.Context) {
func (p *ProxyServer) Proxy() {
if !p.checkProtocol() {
return
}
conn := SSHConnection{
Host: "127.0.0.1",
Port: "1337",
Host: "192.168.244.185",
Port: "22",
User: "root",
Password: "MyRootPW123",
Password: "redhat",
}
ptyReq, _, ok := p.Session.Pty()
if !ok {
......@@ -65,18 +64,12 @@ func (p *ProxyServer) Proxy(ctx context.Context) {
if err != nil {
return
}
rules, err := service.GetSystemUserFilterRules("")
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
parser := &Parser{
filterRules: rules,
}
parser.Initial()
sw := Switch{
userSession: p.Session,
serverConn: &conn,
parser: parser,
}
sw.Bridge(ctx)
_ = sw.Bridge()
_ = conn.Close()
}
package proxy
import (
"cocogo/pkg/service"
"context"
"fmt"
"sync"
"time"
"github.com/ibuler/ssh"
"github.com/satori/go.uuid"
"time"
"cocogo/pkg/logger"
)
type Switch struct {
func NewSwitch(userSess ssh.Session, serverConn ServerConnection) (sw *Switch) {
rules, err := service.GetSystemUserFilterRules("")
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
parser := &Parser{
cmdFilterRules: rules,
}
parser.Initial()
sw = &Switch{userSession: userSess, serverConn: serverConn, parser: parser}
return sw
}
type SwitchInfo struct {
Id string `json:"id"`
User string `json:"user"`
Asset string `json:"asset"`
......@@ -24,11 +36,20 @@ type Switch struct {
DateActive time.Time `json:"date_last_active"`
Finished bool `json:"is_finished"`
Closed bool
}
type Switch struct {
Info *SwitchInfo
parser *Parser
userSession ssh.Session
serverConn ServerConnection
closeChan chan struct{}
userTran Transport
serverTran Transport
cancelFunc context.CancelFunc
}
func (s *Switch) Initial() {
s.Id = uuid.NewV4().String()
}
func (s *Switch) preBridge() {
......@@ -39,8 +60,7 @@ func (s *Switch) postBridge() {
}
func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window, wg *sync.WaitGroup) {
defer wg.Done()
func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window) {
defer func() {
logger.Debug("Watch window change routine end")
}()
......@@ -62,56 +82,61 @@ func (s *Switch) watchWindowChange(ctx context.Context, winCh <-chan ssh.Window,
}
}
func (s *Switch) readUserToServer(wg *sync.WaitGroup) {
defer wg.Done()
func (s *Switch) readUserToServer(ctx context.Context) {
defer func() {
logger.Debug("Read user to server end")
}()
buf := make([]byte, 1024)
writer := s.serverConn.Writer()
for {
nr, err := s.userSession.Read(buf)
if err != nil {
return
}
buf2 := s.parser.ParseUserInput(buf[:nr])
_, err = writer.Write(buf2)
if err != nil {
select {
case <-ctx.Done():
_ = s.userTran.Close()
return
case p, ok := <-s.userTran.Chan():
if !ok {
s.cancelFunc()
}
buf2 := s.parser.ParseUserInput(p)
logger.Debug("Send to server: ", string(buf2))
_, err := s.serverTran.Write(buf2)
if err != nil {
return
}
}
}
}
func (s *Switch) readServerToUser(wg *sync.WaitGroup) {
defer wg.Done()
func (s *Switch) readServerToUser(ctx context.Context) {
defer func() {
logger.Debug("Read server to user end")
}()
buf := make([]byte, 1024)
reader := s.serverConn.Reader()
for {
nr, err := reader.Read(buf)
if err != nil {
logger.Errorf("Read from server error: %s", err)
break
}
buf2 := s.parser.ParseServerOutput(buf[:nr])
_, err = s.userSession.Write(buf2)
if err != nil {
break
select {
case <-ctx.Done():
_ = s.serverTran.Close()
return
case p, ok := <-s.serverTran.Chan():
if !ok {
s.cancelFunc()
}
buf2 := s.parser.ParseServerOutput(p)
_, err := s.userTran.Write(buf2)
if err != nil {
return
}
}
}
}
func (s *Switch) Bridge(ctx context.Context) (err error) {
func (s *Switch) Bridge() (err error) {
_, winCh, _ := s.userSession.Pty()
wg := sync.WaitGroup{}
wg.Add(3)
go s.watchWindowChange(ctx, winCh, &wg)
go s.readUserToServer(&wg)
go s.readServerToUser(&wg)
wg.Wait()
fmt.Println("Bride end")
ctx, cancel := context.WithCancel(context.Background())
s.cancelFunc = cancel
s.userTran = NewDirectTransport("", s.userSession)
s.serverTran = NewDirectTransport("", s.serverConn)
go s.watchWindowChange(ctx, winCh)
go s.readServerToUser(ctx)
s.readUserToServer(ctx)
logger.Debug("Switch bridge end")
return
}
package proxy
import (
"cocogo/pkg/logger"
"io"
)
type Transport interface {
io.WriteCloser
Name() string
Chan() <-chan []byte
}
type DirectTransport struct {
name string
readWriter io.ReadWriter
ch chan []byte
closed bool
}
func (dt *DirectTransport) Name() string {
return dt.name
}
func (dt *DirectTransport) Write(p []byte) (n int, err error) {
return dt.readWriter.Write(p)
}
func (dt *DirectTransport) Close() error {
logger.Debug("Close transport")
if dt.closed {
return nil
}
dt.closed = true
close(dt.ch)
return nil
}
func (dt *DirectTransport) Chan() <-chan []byte {
return dt.ch
}
func (dt *DirectTransport) Keep() {
buf := make([]byte, 1024)
for {
n, err := dt.readWriter.Read(buf)
if err != nil {
_ = dt.Close()
break
}
if !dt.closed {
dt.ch <- buf[:n]
} else {
logger.Debug("Transport ")
break
}
}
return
}
func NewDirectTransport(name string, readWriter io.ReadWriter) (tr Transport) {
ch := make(chan []byte, 1024)
dtr := DirectTransport{readWriter: readWriter, ch: ch}
go dtr.Keep()
return &dtr
}
package proxybak
import (
"context"
"fmt"
"io"
"time"
"github.com/ibuler/ssh"
gossh "golang.org/x/crypto/ssh"
"cocogo/pkg/logger"
"cocogo/pkg/parser"
"cocogo/pkg/record"
)
const maxBufferSize = 1024 * 4
type ServerAuth struct {
SessionID string
IP string
Port int
UserName string
Password string
PublicKey gossh.Signer
}
type Conn interface {
ReceiveRequest(context.Context, <-chan []byte, chan<- []byte)
SendResponse(context.Context, chan<- []byte)
}
func CreateNodeSession(authInfo ServerAuth) (c *gossh.Client, s *gossh.Session, err error) {
config := &gossh.ClientConfig{
User: authInfo.UserName,
Auth: []gossh.AuthMethod{
gossh.Password(authInfo.Password),
gossh.PublicKeys(authInfo.PublicKey),
},
HostKeyCallback: gossh.InsecureIgnoreHostKey(),
}
client, err := gossh.Dial("tcp", fmt.Sprintf("%s:%d", authInfo.IP, authInfo.Port), config)
if err != nil {
logger.Error(err)
return c, s, err
}
s, err = client.NewSession()
if err != nil {
logger.Error(err)
return c, s, err
}
return client, s, nil
}
func NewNodeConn(ctx context.Context, authInfo ServerAuth, ptyReq ssh.Pty, winCh <-chan ssh.Window) (*NodeConn, error) {
c, s, err := CreateNodeSession(authInfo)
if err != nil {
return nil, err
}
err = s.RequestPty(ptyReq.Term, ptyReq.Window.Height, ptyReq.Window.Width, gossh.TerminalModes{})
if err != nil {
return nil, err
}
nodeStdin, err := s.StdinPipe()
if err != nil {
return nil, err
}
nodeStdout, err := s.StdoutPipe()
if err != nil {
return nil, err
}
err = s.Shell()
if err != nil {
return nil, err
}
subCtx, cancelFunc := context.WithCancel(ctx)
replyRecord := record.NewReplyRecord(authInfo.SessionID)
replyRecord.StartRecord()
go replyRecord.EndRecord(subCtx)
nConn := &NodeConn{
SessionID: authInfo.SessionID,
client: c,
conn: s,
ctx: subCtx,
ctxCancelFunc: cancelFunc,
stdin: nodeStdin,
stdout: nodeStdout,
tParser: parser.NewTerminalParser(),
replyRecord: replyRecord,
StartTime: time.Now().UTC(),
}
go nConn.windowChangeHandler(winCh)
return nConn, nil
}
// coco连接远程Node的连接
type NodeConn struct {
SessionID string
client *gossh.Client
conn *gossh.Session
stdin io.Writer
stdout io.Reader
tParser *parser.TerminalParser
currentCommandInput string
currentCommandResult string
rulerFilters []parser.RuleFilter
specialCommands []parser.SpecialRuler
inSpecialStatus bool
ctx context.Context
ctxCancelFunc context.CancelFunc
replyRecord *record.Reply
cmdRecord *record.Command
StartTime time.Time
}
func (n *NodeConn) Wait() error {
return n.conn.Wait()
}
func (n *NodeConn) FilterSpecialCommand(b []byte) {
for _, specialCommand := range n.specialCommands {
if matched := specialCommand.MatchRule(b); matched {
switch {
case specialCommand.EnterStatus():
n.inSpecialStatus = true
case specialCommand.ExitStatus():
n.inSpecialStatus = false
}
}
}
}
func (n *NodeConn) FilterWhiteBlackRule(cmd string) bool {
for _, rule := range n.rulerFilters {
if rule.Match(cmd) {
return rule.BlockCommand()
}
}
return false
}
func (n *NodeConn) windowChangeHandler(winCH <-chan ssh.Window) {
for {
select {
case <-n.ctx.Done():
logger.Info("windowChangeHandler done")
return
case win, ok := <-winCH:
if !ok {
return
}
err := n.conn.WindowChange(win.Height, win.Width)
if err != nil {
logger.Error("windowChange err: ", win)
return
}
logger.Info("windowChange: ", win)
}
}
}
func (n *NodeConn) Close() {
select {
case <-n.ctx.Done():
return
default:
_ = n.conn.Close()
_ = n.client.Close()
n.ctxCancelFunc()
logger.Info("Close conn")
}
}
func (n *NodeConn) SendResponse(ctx context.Context, outChan chan<- []byte) {
buf := make([]byte, maxBufferSize)
defer close(outChan)
for {
nr, err := n.stdout.Read(buf)
if err != nil {
logger.Error("read conn err:", err)
return
}
if n.tParser.Started && nr > 0 {
n.FilterSpecialCommand(buf[:nr])
switch {
case n.inSpecialStatus:
// 进入特殊命令状态,
case n.tParser.InputStatus:
n.tParser.CmdInputBuf.Write(buf[:nr])
case n.tParser.OutputStatus:
n.tParser.CmdOutputBuf.Write(buf[:nr])
default:
}
}
select {
case <-ctx.Done():
logger.Info("SendResponse finish by context done")
return
default:
copyBuf := make([]byte, len(buf[:nr]))
copy(copyBuf, buf[:nr])
outChan <- copyBuf
n.replyRecord.Record(buf[:nr])
}
}
}
func (n *NodeConn) ReceiveRequest(ctx context.Context, inChan <-chan []byte, outChan chan<- []byte) {
defer n.Close()
for {
select {
case <-ctx.Done():
logger.Error("ReceiveRequest finish by context done")
return
case buf, ok := <-inChan:
if !ok {
logger.Error("ReceiveRequest finish by inChan close")
return
}
n.tParser.Once.Do(
func() {
n.tParser.Started = true
})
switch {
case n.inSpecialStatus:
// 特殊的命令 vim 或者 rz
case n.tParser.IsEnterKey(buf):
n.currentCommandInput = n.tParser.ParseCommandInput()
if n.FilterWhiteBlackRule(n.currentCommandInput) {
msg := fmt.Sprintf("\r\n cmd '%s' is forbidden \r\n", n.currentCommandInput)
outChan <- []byte(msg)
n.replyRecord.Record([]byte(msg))
ctrU := []byte{21, 13} // 清除行并换行
_, err := n.stdin.Write(ctrU)
if err != nil {
logger.Error(err)
}
n.tParser.InputStatus = false
n.tParser.OutputStatus = false
continue
}
n.tParser.InputStatus = false
n.tParser.OutputStatus = true
default:
// 1. 是否是一个命令的完整周期 是则解析命令,记录结果 并重置
// 2. 重置用户输入状态
if len(n.tParser.CmdOutputBuf.Bytes()) > 0 && n.currentCommandInput != "" {
n.currentCommandResult = n.tParser.ParseCommandResult()
n.tParser.Reset()
n.currentCommandInput = ""
n.currentCommandResult = ""
}
n.tParser.InputStatus = true
}
_, err := n.stdin.Write(buf)
if err != nil {
logger.Error("write conn err:", err)
return
}
}
}
}
package proxybak
import (
"cocogo/pkg/parser"
"cocogo/pkg/record"
"context"
"fmt"
"io"
"time"
"github.com/sirupsen/logrus"
"github.com/ibuler/ssh"
gossh "golang.org/x/crypto/ssh"
)
var log = logrus.New()
const maxBufferSize = 1024 * 4
type ServerAuth struct {
SessionID string
IP string
Port int
UserName string
Password string
PublicKey gossh.Signer
}
type Conn interface {
ReceiveRequest(context.Context, <-chan []byte, chan<- []byte)
SendResponse(context.Context, chan<- []byte)
}
func CreateNodeSession(authInfo ServerAuth) (c *gossh.Client, s *gossh.Session, err error) {
config := &gossh.ClientConfig{
User: authInfo.UserName,
Auth: []gossh.AuthMethod{
gossh.Password(authInfo.Password),
gossh.PublicKeys(authInfo.PublicKey),
},
HostKeyCallback: gossh.InsecureIgnoreHostKey(),
}
client, err := gossh.Dial("tcp", fmt.Sprintf("%s:%d", authInfo.IP, authInfo.Port), config)
if err != nil {
log.Error(err)
return c, s, err
}
s, err = client.NewSession()
if err != nil {
log.Error(err)
return c, s, err
}
return client, s, nil
}
func NewNodeConn(ctx context.Context, authInfo ServerAuth, ptyReq ssh.Pty, winCh <-chan ssh.Window) (*NodeConn, error) {
c, s, err := CreateNodeSession(authInfo)
if err != nil {
return nil, err
}
err = s.RequestPty(ptyReq.Term, ptyReq.Window.Height, ptyReq.Window.Width, gossh.TerminalModes{})
if err != nil {
return nil, err
}
nodeStdin, err := s.StdinPipe()
if err != nil {
return nil, err
}
nodeStdout, err := s.StdoutPipe()
if err != nil {
return nil, err
}
err = s.Shell()
if err != nil {
return nil, err
}
subCtx, cancelFunc := context.WithCancel(ctx)
replyRecord := record.NewReplyRecord(authInfo.SessionID)
replyRecord.StartRecord()
//go replyRecord.EndRecord(subCtx)
nConn := &NodeConn{
SessionID: authInfo.SessionID,
client: c,
conn: s,
ctx: subCtx,
ctxCancelFunc: cancelFunc,
stdin: nodeStdin,
stdout: nodeStdout,
tParser: parser.NewTerminalParser(),
replyRecord: replyRecord,
StartTime: time.Now().UTC(),
}
go nConn.windowChangeHandler(winCh)
return nConn, nil
}
// coco连接远程Node的连接
type NodeConn struct {
SessionID string
client *gossh.Client
conn *gossh.Session
stdin io.Writer
stdout io.Reader
tParser *parser.TerminalParser
currentCommandInput string
currentCommandResult string
rulerFilters []parser.RuleFilter
specialCommands []parser.SpecialRuler
inSpecialStatus bool
ctx context.Context
ctxCancelFunc context.CancelFunc
replyRecord *record.Reply
cmdRecord *record.Command
StartTime time.Time
}
func (n *NodeConn) Wait() error {
return n.conn.Wait()
}
func (n *NodeConn) FilterSpecialCommand(b []byte) {
for _, specialCommand := range n.specialCommands {
if matched := specialCommand.MatchRule(b); matched {
switch {
case specialCommand.EnterStatus():
n.inSpecialStatus = true
case specialCommand.ExitStatus():
n.inSpecialStatus = false
}
}
}
}
func (n *NodeConn) FilterWhiteBlackRule(cmd string) bool {
for _, rule := range n.rulerFilters {
if rule.Match(cmd) {
return rule.BlockCommand()
}
}
return false
}
func (n *NodeConn) windowChangeHandler(winCH <-chan ssh.Window) {
for {
select {
case <-n.ctx.Done():
log.Info("windowChangeHandler done")
return
case win, ok := <-winCH:
if !ok {
return
}
err := n.conn.WindowChange(win.Height, win.Width)
if err != nil {
log.Error("windowChange err: ", win)
return
}
log.Info("windowChange: ", win)
}
}
}
func (n *NodeConn) Close() {
select {
case <-n.ctx.Done():
return
default:
_ = n.conn.Close()
_ = n.client.Close()
n.ctxCancelFunc()
log.Info("Close conn")
}
}
func (n *NodeConn) SendResponse(ctx context.Context, outChan chan<- []byte) {
buf := make([]byte, maxBufferSize)
defer close(outChan)
for {
nr, err := n.stdout.Read(buf)
if err != nil {
log.Error("read conn err:", err)
return
}
if n.tParser.Started && nr > 0 {
n.FilterSpecialCommand(buf[:nr])
switch {
case n.inSpecialStatus:
// 进入特殊命令状态,
case n.tParser.InputStatus:
n.tParser.CmdInputBuf.Write(buf[:nr])
case n.tParser.OutputStatus:
n.tParser.CmdOutputBuf.Write(buf[:nr])
default:
}
}
select {
case <-ctx.Done():
log.Info("SendResponse finish by context done")
return
default:
copyBuf := make([]byte, len(buf[:nr]))
copy(copyBuf, buf[:nr])
outChan <- copyBuf
n.replyRecord.Record(buf[:nr])
}
}
}
func (n *NodeConn) ReceiveRequest(ctx context.Context, inChan <-chan []byte, outChan chan<- []byte) {
defer n.Close()
for {
select {
case <-ctx.Done():
log.Error("ReceiveRequest finish by context done")
return
case buf, ok := <-inChan:
if !ok {
log.Error("ReceiveRequest finish by inChan close")
return
}
n.tParser.Once.Do(
func() {
n.tParser.Started = true
})
switch {
case n.inSpecialStatus:
// 特殊的命令 vim 或者 rz
case n.tParser.IsEnterKey(buf):
n.currentCommandInput = n.tParser.ParseCommandInput()
if n.FilterWhiteBlackRule(n.currentCommandInput) {
msg := fmt.Sprintf("\r\n cmd '%s' is forbidden \r\n", n.currentCommandInput)
outChan <- []byte(msg)
n.replyRecord.Record([]byte(msg))
ctrU := []byte{21, 13} // 清除行并换行
_, err := n.stdin.Write(ctrU)
if err != nil {
log.Error(err)
}
n.tParser.InputStatus = false
n.tParser.OutputStatus = false
continue
}
n.tParser.InputStatus = false
n.tParser.OutputStatus = true
default:
// 1. 是否是一个命令的完整周期 是则解析命令,记录结果 并重置
// 2. 重置用户输入状态
if len(n.tParser.CmdOutputBuf.Bytes()) > 0 && n.currentCommandInput != "" {
n.currentCommandResult = n.tParser.ParseCommandResult()
n.tParser.Reset()
n.currentCommandInput = ""
n.currentCommandResult = ""
}
n.tParser.InputStatus = true
}
_, err := n.stdin.Write(buf)
if err != nil {
log.Error("write conn err:", err)
return
}
}
}
}
package proxybak
import (
"context"
"sync"
"github.com/ibuler/ssh"
"cocogo/pkg/logger"
"cocogo/pkg/userhome"
)
type UserSessionEndpoint struct {
UserSessions []ssh.Session
Transport Transport
}
type ServerSessionEndpoint struct {
ServerSession []ssh.Session
Transport Transport
}
type Switcher struct {
User sdk.User
Asset sdk.Asset
PrimarySession *ssh.Session
ShareSessions []*ssh.Session
WatcherSessions []*ssh.Session
ServerConn ServerSessionEndpoint
}
var Manager = &manager{
container: new(sync.Map),
}
type manager struct {
container *sync.Map
}
func (m *manager) add(uHome userhome.SessionHome) {
m.container.Store(uHome.SessionID(), uHome)
}
func (m *manager) delete(roomID string) {
m.container.Delete(roomID)
}
func (m *manager) search(roomID string) (userhome.SessionHome, bool) {
if uHome, ok := m.container.Load(roomID); ok {
return uHome.(userhome.SessionHome), ok
}
return nil, false
}
func (m *manager) JoinShareRoom(roomID string, uConn userhome.Conn) {
if userHome, ok := m.search(roomID); ok {
userHome.AddConnection(uConn)
}
}
func (m *manager) ExitShareRoom(roomID string, uConn userhome.Conn) {
if userHome, ok := m.search(roomID); ok {
userHome.RemoveConnection(uConn)
}
}
func (m *manager) Switch(ctx context.Context, uHome userhome.SessionHome, agent transport.Agent) error {
m.add(uHome)
defer m.delete(uHome.SessionID())
subCtx, cancelFunc := context.WithCancel(ctx)
userSendRequestStream := uHome.SendRequestChannel(subCtx)
userReceiveStream := uHome.ReceiveResponseChannel(subCtx)
nodeRequestChan := agent.ReceiveRequestChannel(subCtx)
nodeSendResponseStream := agent.SendResponseChannel(subCtx)
for userSendRequestStream != nil || nodeSendResponseStream != nil {
select {
case buf1, ok := <-userSendRequestStream:
if !ok {
logger.Warn("userSendRequestStream close")
userSendRequestStream = nil
close(nodeRequestChan)
continue
}
nodeRequestChan <- buf1
case buf2, ok := <-nodeSendResponseStream:
if !ok {
logger.Warn("nodeSendResponseStream close")
nodeSendResponseStream = nil
close(userReceiveStream)
cancelFunc()
continue
}
userReceiveStream <- buf2
case <-ctx.Done():
logger.Info("proxy end by context done")
cancelFunc()
return nil
}
}
logger.Info("proxy end")
return nil
}
package proxybak
import (
"context"
uuid "github.com/satori/go.uuid"
)
type Transport interface {
ReceiveChannel(ctx context.Context) chan<- []byte
SendChannel(ctx context.Context) <-chan []byte
}
type Agent interface {
ReceiveRequestChannel(ctx context.Context) chan<- []byte
SendResponseChannel(ctx context.Context) <-chan []byte
}
func NewMemoryAgent(nConn Conn) *memoryAgent {
m := &memoryAgent{
conn: nConn,
inChan: make(chan []byte),
outChan: make(chan []byte),
}
return m
}
type memoryAgent struct {
uuid uuid.UUID
conn Conn
inChan chan []byte
outChan chan []byte
}
func (m *memoryAgent) SendResponseChannel(ctx context.Context) <-chan []byte {
go m.conn.SendResponse(ctx, m.outChan)
return m.outChan
}
func (m *memoryAgent) ReceiveRequestChannel(ctx context.Context) chan<- []byte {
go m.conn.ReceiveRequest(ctx, m.inChan, m.outChan)
return m.inChan
}
package proxybak
import (
"context"
"sync"
)
type SessionHome interface {
SessionID() string
AddConnection(c Conn)
RemoveConnection(c Conn)
SendRequestChannel(ctx context.Context) <-chan []byte
ReceiveResponseChannel(ctx context.Context) chan<- []byte
}
func NewUserSessionHome(con Conn) *userSessionHome {
uHome := &userSessionHome{
readStream: make(chan []byte),
mainConn: con,
connMap: new(sync.Map),
cancelMap: new(sync.Map),
}
uHome.connMap.Store(con.SessionID(), con)
return uHome
}
type userSessionHome struct {
readStream chan []byte
mainConn Conn
connMap *sync.Map
cancelMap *sync.Map
}
func (r *userSessionHome) SessionID() string {
return r.mainConn.SessionID()
}
func (r *userSessionHome) AddConnection(c Conn) {
key := c.SessionID()
if _, ok := r.connMap.Load(key); !ok {
log.Info("add connection ", c)
r.connMap.Store(key, c)
} else {
log.Info("already add connection")
return
}
log.Info("add conn session room: ", r.SessionID())
ctx, cancelFunc := context.WithCancel(r.mainConn.Context())
r.cancelMap.Store(key, cancelFunc)
defer r.RemoveConnection(c)
buf := make([]byte, maxBufferSize)
for {
nr, err := c.Read(buf)
if err != nil {
log.Error("conn read err")
return
}
select {
case <-ctx.Done():
log.Info(" user conn cctx done")
return
default:
copyBuf := make([]byte, nr)
copy(copyBuf, buf[:nr])
r.readStream <- copyBuf
}
}
}
func (r *userSessionHome) RemoveConnection(c Conn) {
key := c.SessionID()
if cancelFunc, ok := r.cancelMap.Load(key); ok {
cancelFunc.(context.CancelFunc)()
}
r.connMap.Delete(key)
}
func (r *userSessionHome) SendRequestChannel(ctx context.Context) <-chan []byte {
go func() {
buf := make([]byte, 1024)
// 从发起的session这里关闭 接受的通道
defer close(r.readStream)
for {
nr, e := r.mainConn.Read(buf)
if e != nil {
log.Error("main Conn read err")
break
}
select {
case <-ctx.Done():
return
default:
var respCopy []byte
respCopy = append(respCopy, buf[:nr]...)
r.readStream <- respCopy
}
}
}()
return r.readStream
}
func (r *userSessionHome) ReceiveResponseChannel(ctx context.Context) chan<- []byte {
writeStream := make(chan []byte)
go func() {
defer func() {
r.cancelMap.Range(func(key, cancelFunc interface{}) bool {
cancelFunc.(context.CancelFunc)()
return true
})
}()
for {
select {
case <-ctx.Done():
return
case buf, ok := <-writeStream:
if !ok {
return
}
r.connMap.Range(func(key, connItem interface{}) bool {
nw, err := connItem.(Conn).Write(buf)
if err != nil || nw != len(buf) {
log.Error("Write Conn err", connItem)
r.RemoveConnection(connItem.(Conn))
}
return true
})
}
}
}()
return writeStream
}
package proxybak
type Slot interface {
Chan() chan<- []byte
Send([]byte)
}
package storage
package record
type Storage interface {
Upload(gZipFile, target string)
......
package storage
package record
//var client = service.Client
......
......@@ -7,16 +7,17 @@ import (
"cocogo/pkg/model"
)
func Authenticate(username, password, publicKey, remoteAddr, loginType string) (user model.User) {
func Authenticate(username, password, publicKey, remoteAddr, loginType string) (user *model.User) {
data := map[string]string{
"username": username,
"password": password,
"public_key": publicKey,
"remote_addr": remoteAddr,
"login_type": loginType}
"login_type": loginType,
}
var resp struct {
Token string `json:"token"`
User model.User `json:"user"`
Token string `json:"token"`
User *model.User `json:"user"`
}
Url := client.ParseUrlQuery(UserAuthURL, nil)
err := client.Post(Url, data, &resp)
......@@ -26,7 +27,7 @@ func Authenticate(username, password, publicKey, remoteAddr, loginType string) (
return resp.User
}
func GetUserProfile(userId string) (user model.User) {
func GetUserProfile(userId string) (user *model.User) {
Url := authClient.ParseUrlQuery(fmt.Sprintf(UserUserURL, userId), nil)
err := authClient.Get(Url, &user)
if err != nil {
......@@ -35,7 +36,7 @@ func GetUserProfile(userId string) (user model.User) {
return
}
func CheckUserCookie(sessionId, csrfToken string) (user model.User) {
func CheckUserCookie(sessionId, csrfToken string) (user *model.User) {
client.SetCookie("csrftoken", csrfToken)
client.SetCookie("sessionid", sessionId)
Url := client.ParseUrlQuery(UserProfileURL, nil)
......
......@@ -42,7 +42,6 @@ func (mi *MenuItem) Text() string {
type Menu []MenuItem
func init() {
fmt.Println("Init bnanner")
defaultTitle = i18n.T("Welcome to use Jumpserver open source fortress system")
menu = Menu{
{id: 1, instruct: "ID", helpText: i18n.T("directly login")},
......
......@@ -55,7 +55,7 @@ type InteractiveHandler struct {
searchResult model.AssetList
nodes model.NodeList
onceLoad sync.Once
sync.RWMutex
mu sync.RWMutex
}
func (i *InteractiveHandler) displayBanner() {
......@@ -96,7 +96,11 @@ func (i *InteractiveHandler) Dispatch(ctx cctx.Context) {
close(doneChan)
if err != nil {
logger.Error("Read line from user err:", err)
if err != io.EOF {
logger.Debug("User disconnected")
} else {
logger.Error("Read from user err: ", err)
}
break
}
......@@ -324,7 +328,7 @@ func (i *InteractiveHandler) Proxy(ctx context.Context) {
Asset: i.assetSelect,
SystemUser: i.systemUserSelect,
}
p.Proxy(ctx)
p.Proxy()
}
// /*
......
package userhome
import (
"context"
"io"
"github.com/sirupsen/logrus"
"github.com/ibuler/ssh"
uuid "github.com/satori/go.uuid"
)
var log = logrus.New()
const maxBufferSize = 1024 * 4
type Conn interface {
SessionID() string
User() string
UUID() uuid.UUID
Pty() (ssh.Pty, <-chan ssh.Window, bool)
Context() context.Context
io.Reader
io.WriteCloser
}
package userhome
import (
"context"
"github.com/ibuler/ssh"
uuid "github.com/satori/go.uuid"
)
func NewSSHConn(sess ssh.Session) *SSHConn {
return &SSHConn{
conn: sess,
uuid: uuid.NewV4(),
}
}
type SSHConn struct {
conn ssh.Session
uuid uuid.UUID
}
func (s *SSHConn) SessionID() string {
return s.uuid.String()
}
func (s *SSHConn) User() string {
return s.conn.User()
}
func (s *SSHConn) UUID() uuid.UUID {
return s.uuid
}
func (s *SSHConn) Pty() (ssh.Pty, <-chan ssh.Window, bool) {
return s.conn.Pty()
}
func (s *SSHConn) Context() context.Context {
return s.conn.Context()
}
func (s *SSHConn) Read(b []byte) (n int, err error) {
return s.conn.Read(b)
}
func (s *SSHConn) Write(b []byte) (n int, err error) {
return s.conn.Write(b)
}
func (s *SSHConn) Close() error {
return s.conn.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment