Commit 8f5d95aa authored by Eric's avatar Eric

support join session home

parent b3f07dfb
......@@ -14,7 +14,7 @@ func NewService() *Service {
}
func (s *Service) SSHPassword(ctx ssh.Context, password string) bool {
ctx.SessionID()
Username := "softwareuser1"
Password := "123456"
......
package core
import (
"github.com/sirupsen/logrus"
)
var log *logrus.Logger
const maxBufferSize = 1024 * 4
func init() {
log = logrus.New()
}
package core
import (
"context"
"fmt"
"io"
uuid "github.com/satori/go.uuid"
gossh "golang.org/x/crypto/ssh"
)
func NewNodeConn(c *gossh.Client, s *gossh.Session, useS Conn) (*NodeConn, error) {
ptyReq, winCh, _ := useS.Pty()
err := s.RequestPty(ptyReq.Term, ptyReq.Window.Height, ptyReq.Window.Width, gossh.TerminalModes{})
if err != nil {
return nil, err
}
nodeStdin, err := s.StdinPipe()
if err != nil {
return nil, err
}
nodeStdout, err := s.StdoutPipe()
if err != nil {
return nil, err
}
err = s.Shell()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(useS.Context())
Out, In := io.Pipe()
go func() {
for {
select {
case <-ctx.Done():
log.Info("NewNodeConn goroutine closed")
err = s.Close()
err = c.Close()
if err != io.EOF && err != nil {
log.Info(" sess Close():", err)
}
return
case win, ok := <-winCh:
if !ok {
return
}
err = s.WindowChange(win.Height, win.Width)
if err != nil {
log.Info("windowChange err: ", win)
return
}
log.Info("windowChange: ", win)
}
}
}()
go func() {
nr, err := io.Copy(In, nodeStdout)
if err != nil {
log.Info("io copy err:", err)
}
err = In.Close()
if err != nil {
log.Info("io copy c.Close():", err)
}
cancel()
log.Info("io copy int:", nr)
}()
nConn := &NodeConn{
uuid: uuid.NewV4(),
client: c,
conn: s,
stdin: nodeStdin,
stdout: nodeStdout,
cusOut: Out,
cusIn: In,
tParser: NewTerminalParser(),
}
return nConn, nil
}
// coco连接远程Node的连接
type NodeConn struct {
uuid uuid.UUID
client *gossh.Client
conn *gossh.Session
stdin io.Writer
stdout io.Reader
cusIn io.WriteCloser
cusOut io.ReadCloser
tParser *TerminalParser
currentCommandInput string
currentCommandResult string
rulerFilters []RuleFilter
specialCommands []SpecialRuler
inSpecialStatus bool
}
func (n *NodeConn) UUID() uuid.UUID {
return n.uuid
}
func (n *NodeConn) Read(b []byte) (nr int, err error) {
nr, err = n.cusOut.Read(b)
if n.tParser.Started && nr > 0 {
n.FilterSpecialCommand(b[:nr])
switch {
case n.inSpecialStatus:
// 进入特殊命令状态,
case n.tParser.InputStatus:
n.tParser.CmdInputBuf.Write(b[:nr])
case n.tParser.OutputStatus:
n.tParser.CmdOutputBuf.Write(b[:nr])
default:
}
}
return nr, err
}
func (n *NodeConn) Write(b []byte) (nw int, err error) {
n.tParser.Once.Do(
func() {
n.tParser.Started = true
})
switch {
case n.inSpecialStatus:
// 特殊的命令 vim 或者 rz
case n.tParser.IsEnterKey(b):
n.currentCommandInput = n.tParser.ParseCommandInput()
if n.FilterWhiteBlackRule(n.currentCommandInput) {
msg := fmt.Sprintf("\r\n cmd '%s' is forbidden \r\n", n.currentCommandInput)
nw, err = n.cusIn.Write([]byte(msg))
if err != nil {
return nw, err
}
ctrU := []byte{21, 13} // 清除行并换行
nw, err = n.stdin.Write(ctrU)
if err != nil {
return nw, err
}
n.tParser.InputStatus = false
n.tParser.OutputStatus = false
return len(b), nil
}
n.tParser.InputStatus = false
n.tParser.OutputStatus = true
default:
// 1. 是否是一个命令的完整周期 是则解析命令,记录结果 并重置
// 2. 重置用户输入状态
if len(n.tParser.CmdOutputBuf.Bytes()) > 0 && n.currentCommandInput != "" {
n.currentCommandResult = n.tParser.ParseCommandResult()
n.tParser.Reset()
n.currentCommandInput = ""
n.currentCommandResult = ""
}
n.tParser.InputStatus = true
}
return n.stdin.Write(b)
}
func (n *NodeConn) Close() error {
return n.cusOut.Close()
}
func (n *NodeConn) Wait() error {
return n.conn.Wait()
}
func (n *NodeConn) FilterSpecialCommand(b []byte) {
for _, specialCommand := range n.specialCommands {
if matched := specialCommand.MatchRule(b); matched {
switch {
case specialCommand.EnterStatus():
n.inSpecialStatus = true
case specialCommand.ExitStatus():
n.inSpecialStatus = false
}
}
}
}
func (n *NodeConn) FilterWhiteBlackRule(cmd string) bool {
for _, rule := range n.rulerFilters {
if rule.Match(cmd) {
return rule.BlockCommand()
}
}
return false
}
package core
import (
"context"
uuid "github.com/satori/go.uuid"
)
type ProxyChannel interface {
UUID() string
ReceiveRequestChannel(ctx context.Context) chan<- []byte
SendResponseChannel(ctx context.Context) <-chan []byte
Wait() error
}
func NewMemoryChannel(n *NodeConn) *memoryChannel {
m := &memoryChannel{
uuid: n.UUID(),
conn: n,
}
return m
}
type memoryChannel struct {
uuid uuid.UUID
conn *NodeConn
}
func (m *memoryChannel) UUID() string {
return m.uuid.String()
}
func (m *memoryChannel) SendResponseChannel(ctx context.Context) <-chan []byte {
// 传入context, 可以从外层进行取消
sendChannel := make(chan []byte)
go func() {
defer close(sendChannel)
resp := make([]byte, maxBufferSize)
for {
nr, e := m.conn.Read(resp)
if e != nil {
log.Info("m.conn.Read(resp) err: ", e)
break
}
select {
case <-ctx.Done():
return
default:
sendChannel <- resp[:nr]
}
}
}()
return sendChannel
}
func (m *memoryChannel) ReceiveRequestChannel(ctx context.Context) chan<- []byte {
// 传入context, 可以从外层进行取消
receiveChannel := make(chan []byte)
go func() {
defer m.conn.Close()
for {
select {
case <-ctx.Done():
log.Info("ReceiveRequestChannel ctx done")
return
case reqBuf, ok := <-receiveChannel:
if !ok {
return
}
nw, e := m.conn.Write(reqBuf)
if e != nil && nw != len(reqBuf) {
return
}
}
}
}()
return receiveChannel
}
func (m *memoryChannel) Wait() error {
return m.conn.Wait()
}
package sshd
package core
type SpecialRuler interface {
......
package core
import (
"context"
"sync"
)
type room struct {
sessionID string
uHome SessionHome
pChan ProxyChannel
}
var Manager = &manager{container: map[string]room{}}
type manager struct {
container map[string]room
sync.RWMutex
}
func (m *manager) add(uHome SessionHome, pChan ProxyChannel) {
m.Lock()
m.container[uHome.SessionID()] = room{
sessionID: uHome.SessionID(),
uHome: uHome,
pChan: pChan,
}
m.Unlock()
}
func (m *manager) delete(roomID string) {
m.Lock()
delete(m.container, roomID)
m.Unlock()
}
func (m *manager) search(roomID string) (SessionHome, bool) {
m.RLock()
defer m.RUnlock()
if room, ok := m.container[roomID]; ok {
return room.uHome, ok
}
return nil, false
}
func JoinShareRoom(roomID string, uConn Conn) {
if userHome, ok := Manager.search(roomID); ok {
userHome.AddConnection(uConn)
}
}
func ExitShareRoom(roomID string, uConn Conn) {
if userHome, ok := Manager.search(roomID); ok {
userHome.RemoveConnection(uConn)
}
}
func Switch(ctx context.Context, userHome SessionHome, pChannel ProxyChannel) error {
Manager.add(userHome, pChannel)
defer Manager.delete(userHome.SessionID())
subCtx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
userSendRequestStream := userHome.SendRequestChannel(ctx)
nodeRequestChan := pChannel.ReceiveRequestChannel(ctx)
for reqFromUser := range userSendRequestStream {
nodeRequestChan <- reqFromUser
}
log.Info("userSendRequestStream close")
close(nodeRequestChan)
}(subCtx, &wg)
go func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
userReceiveStream := userHome.ReceiveResponseChannel(ctx)
nodeSendResponseStream := pChannel.SendResponseChannel(ctx)
for resFromNode := range nodeSendResponseStream {
userReceiveStream <- resFromNode
}
log.Info("nodeSendResponseStream close")
close(userReceiveStream)
}(subCtx, &wg)
err := pChannel.Wait()
if err != nil {
log.Info("pChannel err:", err)
}
cancel()
wg.Wait()
log.Info("switch end")
return err
}
package core
import (
"bytes"
"sync"
)
func NewTerminalParser() *TerminalParser {
return &TerminalParser{
Once: sync.Once{},
Started: false,
InputStatus: false,
OutputStatus: false,
CmdInputBuf: new(bytes.Buffer),
CmdOutputBuf: new(bytes.Buffer),
}
}
type TerminalParser struct {
Once sync.Once
Started bool
InputStatus bool
OutputStatus bool
CmdInputBuf *bytes.Buffer // node对用户输入的回写数据
CmdOutputBuf *bytes.Buffer // node对用户按下enter按键之后,返回的数据
}
func (t *TerminalParser) Reset() {
t.CmdInputBuf.Reset()
t.CmdOutputBuf.Reset()
}
func (t *TerminalParser) ParseCommandInput() string {
return t.CmdInputBuf.String()
}
func (t *TerminalParser) ParseCommandResult() string {
return t.CmdOutputBuf.String()
}
func (t *TerminalParser) IsEnterKey(b []byte) bool {
return len(b) == 1 && b[0] == 13
}
package core
import (
"context"
"io"
"sync"
"github.com/gliderlabs/ssh"
uuid "github.com/satori/go.uuid"
)
type Conn interface {
SessionID() string
User() string
UUID() uuid.UUID
Pty() (ssh.Pty, <-chan ssh.Window, bool)
Context() context.Context
io.Reader
io.WriteCloser
}
type SessionHome interface {
SessionID() string
AddConnection(c Conn)
RemoveConnection(c Conn)
SendRequestChannel(ctx context.Context) <-chan []byte
ReceiveResponseChannel(ctx context.Context) chan<- []byte
}
func NewUserSessionHome(con Conn) *userSessionHome {
return &userSessionHome{
readStream: make(chan []byte),
mainConn: con,
connMap: map[string]Conn{con.UUID().String(): con},
cancelMap: map[string]context.CancelFunc{},
}
}
type userSessionHome struct {
readStream chan []byte
mainConn Conn
connMap map[string]Conn
cancelMap map[string]context.CancelFunc
sync.RWMutex
}
func (r *userSessionHome) SessionID() string {
return r.mainConn.SessionID()
}
func (r *userSessionHome) AddConnection(c Conn) {
key := c.SessionID()
if _, ok := r.connMap[key]; !ok {
log.Info("add connection ", c)
r.connMap[key] = c
} else {
log.Info("already add connection")
return
}
log.Info("add conn session room: ", r.SessionID())
ctx, cancelFunc := context.WithCancel(r.mainConn.Context())
r.cancelMap[key] = cancelFunc
defer r.RemoveConnection(c)
buf := make([]byte, maxBufferSize)
for {
nr, err := c.Read(buf)
if err != nil {
log.Error("conn read err")
return
}
select {
case <-ctx.Done():
log.Info("conn ctx done")
return
default:
r.readStream <- buf[:nr]
}
}
}
func (r *userSessionHome) RemoveConnection(c Conn) {
r.Lock()
defer r.Unlock()
key := c.SessionID()
if _, ok := r.connMap[key]; ok {
delete(r.connMap, key)
delete(r.cancelMap, key)
}
}
func (r *userSessionHome) SendRequestChannel(ctx context.Context) <-chan []byte {
go func() {
buf := make([]byte, 1024)
// 从发起的session这里关闭 接受的通道
defer close(r.readStream)
for {
nr, e := r.mainConn.Read(buf)
if e != nil {
log.Error("main Conn read err")
break
}
select {
case <-ctx.Done():
return
default:
r.readStream <- buf[:nr]
}
}
}()
return r.readStream
}
func (r *userSessionHome) ReceiveResponseChannel(ctx context.Context) chan<- []byte {
writeStream := make(chan []byte)
go func() {
defer func() {
r.RLock()
for _, cancel := range r.cancelMap {
cancel()
}
r.RUnlock()
}()
for {
select {
case <-ctx.Done():
return
case buf, ok := <-writeStream:
if !ok {
return
}
for _, c := range r.connMap {
nw, err := c.Write(buf)
if err != nil || nw != len(buf) {
log.Error("Write Conn err", c)
r.cancelMap[c.SessionID()]()
}
}
}
}
}()
return writeStream
}
package sshd
package core
import (
"regexp"
)
const (
actionDeny = true
actionAllow = false
)
type RuleFilter interface {
// 判断是否是匹配当前规则
......@@ -31,7 +26,6 @@ type Rule struct {
func (w *Rule) Match(s string) bool {
switch w.ruleType {
case "command":
for _, content := range w.contents {
if content == s {
return true
......
package sshd
import (
"bytes"
"cocogo/pkg/asset"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/gliderlabs/ssh"
uuid "github.com/satori/go.uuid"
gossh "golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/terminal"
)
const (
maxBufferSize = 1024 * 4
)
type NodeProxy struct {
uuid string
userSess ssh.Session
nodeSess *gossh.Session
nodeClient *gossh.Client
started bool // 记录开始
inputStatus bool // 是否是用户的输入状态
userInputBuf *bytes.Buffer // 用户输入的数据
nodeCmdInputBuf *bytes.Buffer // node对用户输入的回写数据
nodeCmdOutputBuf *bytes.Buffer // node对用户按下enter按键之后,返回的数据
nodeResponseCmdInputBuf []byte
nodeResponseCmdOutputBuf []byte
sendUserStream chan []byte
sendNodeStream chan []byte
sync.Mutex
ctx context.Context
nodeClosed bool
userClosed bool
term *terminal.Terminal
rulerFilters []RuleFilter
specialCommands []SpecialRuler
forbiddenSignal bool
inSpecialStatus bool
}
func NewNodeProxy(nodec *gossh.Client, nodes *gossh.Session, uers ssh.Session) *NodeProxy {
userInputBuf := new(bytes.Buffer)
return &NodeProxy{
uuid: uuid.NewV4().String(),
userSess: uers,
nodeSess: nodes,
nodeClient: nodec,
started: false,
inputStatus: false,
ctx: context.Background(),
nodeClosed: false,
userClosed: false,
userInputBuf: userInputBuf,
nodeCmdInputBuf: new(bytes.Buffer),
nodeCmdOutputBuf: new(bytes.Buffer),
sendUserStream: make(chan []byte),
sendNodeStream: make(chan []byte),
specialCommands: []SpecialRuler{},
term: terminal.NewTerminal(userInputBuf, ""),
}
}
func (n *NodeProxy) receiveNodeResponse(wg *sync.WaitGroup) {
defer wg.Done()
nodeStdout, err := n.nodeSess.StdoutPipe()
if err != nil {
log.Info(err)
return
}
readBuf := make([]byte, maxBufferSize)
for {
nr, err := nodeStdout.Read(readBuf)
if err != nil {
break
}
if nr > 0 {
/*
是否是特殊命令状态:
直接放过;
是否是命令输入:
是:
放入nodeCmdInputBuf
否:
放入nodeCmdOutputBuf
*/
//开始输入之后,才开始记录输入的内容
if n.started {
// 对返回值进行解析,是否进入了特殊命令状态
n.SpecialCommandFilter(readBuf[:nr])
switch {
case n.InSpecialCommandStatus():
// 进入特殊命令状态,
case n.forbiddenSignal:
// 阻断命令的返回值
case n.inputStatus:
n.nodeCmdInputBuf.Write(readBuf[:nr])
default:
n.nodeCmdOutputBuf.Write(readBuf[:nr])
}
}
n.sendUserStream <- readBuf[:nr]
}
}
n.nodeClosed = true
close(n.sendUserStream)
}
func (n *NodeProxy) sendUserResponse(wg *sync.WaitGroup) {
defer wg.Done()
for resBytes := range n.sendUserStream {
nw, err := n.userSess.Write(resBytes)
if nw != len(resBytes) || err != nil {
break
}
}
}
func (n *NodeProxy) receiveUserRequest(wg *sync.WaitGroup) {
defer wg.Done()
readBuf := make([]byte, 1024)
once := sync.Once{}
path := filepath.Join("log", n.uuid)
cmdRecord, _ := os.Create(path)
defer cmdRecord.Close()
var currentCommandInput string
var currentCommandResult string
for {
nr, err := n.userSess.Read(readBuf)
once.Do(func() {
n.started = true
})
if err != nil {
break
}
if n.nodeClosed {
break
}
if nr > 0 {
// 当inputStatus 为false
/*
enter之后
是否需要解析
是:
解析用户真实执行的命令
过滤命令:
1、阻断则发送阻断msg 向node发送清除命令 和换行
否:
直接放过
*/
switch {
case n.InSpecialCommandStatus():
// vim 或者 rz 等状态
case isEnterKey(readBuf[:nr]):
currentCommandInput = n.ParseCommandInput()
if currentCommandInput != "" && n.FilterCommand(currentCommandInput) {
log.Info("cmd forbidden------>", currentCommandInput)
msg := fmt.Sprintf("\r\n cmd '%s' is forbidden \r\n", currentCommandInput)
n.sendUserStream <- []byte(msg)
ctrU := []byte{21, 13} // 清除所有的输入
n.inputStatus = true
n.sendNodeStream <- ctrU
n.forbiddenSignal = true
data := CommandData{
Input: currentCommandInput,
Output: string(msg),
Timestamp: time.Now().UTC().UnixNano(),
}
b, _ := json.Marshal(data)
log.Info("write json data to file.")
cmdRecord.Write(b)
cmdRecord.Write([]byte("\r\n"))
currentCommandInput = ""
currentCommandResult = ""
n.resetNodeInputOutBuf()
continue
}
n.nodeCmdInputBuf.Reset()
n.inputStatus = false
default:
fmt.Println(readBuf[:nr])
if len(n.nodeCmdOutputBuf.Bytes()) > 0 && currentCommandInput != "" {
log.Info("write cmd and result")
currentCommandResult = n.ParseCommandResult()
data := CommandData{
Input: currentCommandInput,
Output: currentCommandResult,
Timestamp: time.Now().UTC().UnixNano(),
}
b, _ := json.Marshal(data)
log.Info("write json data to file.")
cmdRecord.Write(b)
n.resetNodeInputOutBuf()
currentCommandInput = ""
currentCommandResult = ""
}
n.inputStatus = true
n.forbiddenSignal = false
}
//解析命令且过滤命令
n.sendNodeStream <- readBuf[:nr]
}
}
close(n.sendNodeStream)
n.nodeSess.Close()
log.Info("receiveUserRequest exit---->")
}
func (n *NodeProxy) sendNodeRequest(wg *sync.WaitGroup) {
defer wg.Done()
nodeStdin, err := n.nodeSess.StdinPipe()
if err != nil {
return
}
for reqBytes := range n.sendNodeStream {
nw, err := nodeStdin.Write(reqBytes)
if nw != len(reqBytes) || err != nil {
n.nodeClosed = true
break
}
}
log.Info("sendNodeStream closed")
}
// 匹配特殊命令,
func (n *NodeProxy) SpecialCommandFilter(b []byte) {
for _, specialCommand := range n.specialCommands {
if matched := specialCommand.MatchRule(b); matched {
switch {
case specialCommand.EnterStatus():
n.inSpecialStatus = true
case specialCommand.ExitStatus():
n.inSpecialStatus = false
}
}
}
}
func (n *NodeProxy) MatchedSpecialCommand() (SpecialRuler, bool) {
return nil, false
}
func (n *NodeProxy) InSpecialCommandStatus() bool {
return n.inSpecialStatus
}
// 解析命令
func (n *NodeProxy) ParseCommandInput() string {
// 解析用户输入的命令
return n.nodeCmdInputBuf.String()
}
// 解析命令结果
func (n *NodeProxy) ParseCommandResult() string {
return n.nodeCmdOutputBuf.String()
}
// 过滤所有的规则,判断是否阻止命令;如果是空字符串直接返回false
func (n *NodeProxy) FilterCommand(cmd string) bool {
if strings.TrimSpace(cmd) == "" {
return false
}
n.Lock()
defer n.Unlock()
for _, rule := range n.rulerFilters {
if rule.Match(cmd) {
log.Info("match rule", rule)
return rule.BlockCommand()
}
}
return false
}
func (n *NodeProxy) replayFileName() string {
return fmt.Sprintf("%s.replay", n.uuid)
}
// 加载该资产的过滤规则
func (n *NodeProxy) LoadRuleFilters() {
r1 := &Rule{
priority: 10,
action: actionDeny,
contents: []string{"ls"},
ruleType: "command",
}
r2 := &Rule{
priority: 10,
action: actionDeny,
contents: []string{"pwd"},
ruleType: "command",
}
n.Lock()
defer n.Unlock()
n.rulerFilters = []RuleFilter{r1, r2}
}
func (n *NodeProxy) resetNodeInputOutBuf() {
n.nodeCmdInputBuf.Reset()
n.nodeCmdOutputBuf.Reset()
}
func (n *NodeProxy) Start() error {
var (
err error
wg sync.WaitGroup
)
winChangeDone := make(chan struct{})
ptyreq, winCh, _ := n.userSess.Pty()
err = n.nodeSess.RequestPty(ptyreq.Term, ptyreq.Window.Height, ptyreq.Window.Width, gossh.TerminalModes{})
if err != nil {
return err
}
wg.Add(5)
go func() {
defer wg.Done()
for {
select {
case <-winChangeDone:
return
case win := <-winCh:
err = n.nodeSess.WindowChange(win.Height, win.Width)
if err != nil {
return
}
log.Info("windowChange: ", win)
}
}
}()
go n.receiveUserRequest(&wg)
go n.sendNodeRequest(&wg)
go n.receiveNodeResponse(&wg)
go n.sendUserResponse(&wg)
err = n.nodeSess.Shell()
if err != nil {
return err
}
err = n.nodeSess.Wait()
winChangeDone <- struct{}{}
wg.Wait()
log.Info("wg done --->")
if err != nil {
return err
}
return nil
}
func CreateAssetNodeSession(node asset.Node) (c *gossh.Client, s *gossh.Session, err error) {
config := &gossh.ClientConfig{
User: node.UserName,
Auth: []gossh.AuthMethod{
gossh.Password(node.PassWord),
gossh.PublicKeys(node.PublicKey),
},
HostKeyCallback: gossh.InsecureIgnoreHostKey(),
}
client, err := gossh.Dial("tcp", node.IP+":"+node.Port, config)
if err != nil {
log.Info(err)
return c, s, err
}
s, err = client.NewSession()
if err != nil {
log.Error(err)
return c, s, err
}
return client, s, nil
}
func Proxy(userSess ssh.Session, node asset.Node) error {
nodeclient, nodeSess, err := CreateAssetNodeSession(node)
if err != nil {
return err
}
nproxy := NewNodeProxy(nodeclient, nodeSess, userSess)
log.Info("session_uuid_id:", nproxy.uuid)
nproxy.LoadRuleFilters()
err = nproxy.Start()
if err != nil {
log.Error("nproxy err:", err)
return err
}
fmt.Println("exit-------> Proxy")
return nil
}
func isEnterKey(b []byte) bool {
return b[0] == 13
}
......@@ -2,10 +2,15 @@ package sshd
import (
"cocogo/pkg/asset"
"cocogo/pkg/core"
"context"
"fmt"
"io"
"runtime"
"strings"
"github.com/gliderlabs/ssh"
gossh "golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/terminal"
)
......@@ -38,7 +43,7 @@ func (d HelpInfo) displayHelpInfo(sess ssh.Session) {
}
func InteractiveHandler(sess ssh.Session) {
_, _, ptyOk := sess.Pty()
_, winCh, ptyOk := sess.Pty()
if ptyOk {
helpInfo := HelpInfo{
......@@ -52,10 +57,29 @@ func InteractiveHandler(sess ssh.Session) {
log.Info("accept one session")
helpInfo.displayHelpInfo(sess)
term := terminal.NewTerminal(sess, "Opt>")
for {
fmt.Println("start g num:", runtime.NumGoroutine())
ctx, cancelFuc := context.WithCancel(sess.Context())
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("ctx done")
return
case win, ok := <-winCh:
if !ok {
return
}
fmt.Println("InteractiveHandler term change:", win)
_ = term.SetSize(win.Width, win.Height)
}
}
}()
line, err := term.ReadLine()
cancelFuc()
if err != nil {
log.Error(err)
log.Error("ReadLine done", err)
break
}
switch line {
......@@ -87,6 +111,7 @@ func InteractiveHandler(sess ssh.Session) {
return
default:
searchNodeAndProxy(line, sess)
fmt.Println("end g num:", runtime.NumGoroutine())
}
}
......@@ -100,9 +125,39 @@ func InteractiveHandler(sess ssh.Session) {
}
func searchNodeAndProxy(line string, sess ssh.Session) {
searchKey := strings.TrimPrefix(line, "/")
if node, ok := searchNode(searchKey); ok {
err := Proxy(sess, node)
searchWord := strings.TrimPrefix(line, "/")
if strings.Contains(searchWord, "join") {
roomID := strings.TrimSpace(strings.Join(strings.Split(searchWord, "join"), ""))
sshConn := &SSHConn{
conn: sess,
uuid: generateNewUUID(),
}
log.Info("join room id: ", roomID)
ctx, cancelFuc := context.WithCancel(context.Background())
_, winCh, _ := sess.Pty()
go func() {
for {
select {
case <-ctx.Done():
return
case win, ok := <-winCh:
if !ok {
return
}
fmt.Println("join term change:", win)
}
}
}()
core.JoinShareRoom(roomID, sshConn)
log.Info("exit room id:", roomID)
cancelFuc()
return
}
if node, ok := searchNode(searchWord); ok {
err := MyProxy(sess, node)
if err != nil {
log.Info("proxy err ", err)
}
......@@ -120,3 +175,56 @@ func searchNode(key string) (asset.Node, bool) {
}
return asset.Node{}, false
}
func MyProxy(userSess ssh.Session, node asset.Node) error {
/*
1. 创建SSHConn,符合core.Conn接口
2. 创建一个session Home
3. 创建一个NodeConn,及相关的channel 可以是MemoryChannel 或者是redisChannel
4. session Home 与 proxy channel 交换数据
*/
sshConn := &SSHConn{
conn: userSess,
uuid: generateNewUUID(),
}
userHome := core.NewUserSessionHome(sshConn)
log.Info("session room id:", userHome.SessionID())
c, s, err := CreateNodeSession(node)
if err != nil {
return err
}
nodeC, err := core.NewNodeConn(c, s, sshConn)
if err != nil {
return err
}
mc := core.NewMemoryChannel(nodeC)
err = core.Switch(sshConn.Context(), userHome, mc)
return err
}
func CreateNodeSession(node asset.Node) (c *gossh.Client, s *gossh.Session, err error) {
config := &gossh.ClientConfig{
User: node.UserName,
Auth: []gossh.AuthMethod{
gossh.Password(node.PassWord),
gossh.PublicKeys(node.PublicKey),
},
HostKeyCallback: gossh.InsecureIgnoreHostKey(),
}
client, err := gossh.Dial("tcp", node.IP+":"+node.Port, config)
if err != nil {
log.Info(err)
return c, s, err
}
s, err = client.NewSession()
if err != nil {
log.Error(err)
return c, s, err
}
return client, s, nil
}
package sshd
import (
"context"
"github.com/gliderlabs/ssh"
uuid "github.com/satori/go.uuid"
)
// ssh方式连接coco的用户request 将实现Conn接口
type SSHConn struct {
conn ssh.Session
uuid uuid.UUID
}
func (s *SSHConn) SessionID() string {
return s.uuid.String()
}
func (s *SSHConn) User() string {
return s.conn.User()
}
func (s *SSHConn) UUID() uuid.UUID {
return s.uuid
}
func (s *SSHConn) Pty() (ssh.Pty, <-chan ssh.Window, bool) {
return s.conn.Pty()
}
func (s *SSHConn) Context() context.Context {
return s.conn.Context()
}
func (s *SSHConn) Read(b []byte) (n int, err error) {
return s.conn.Read(b)
}
func (s *SSHConn) Write(b []byte) (n int, err error) {
return s.conn.Write(b)
}
func (s *SSHConn) Close() error {
return s.conn.Close()
}
// ws方式连接coco的用户request 将实现Conn接口
type WSConn struct {
}
......@@ -3,6 +3,8 @@ package sshd
import (
"io/ioutil"
uuid "github.com/satori/go.uuid"
gossh "golang.org/x/crypto/ssh"
)
......@@ -18,3 +20,7 @@ func getPrivateKey(keyPath string) gossh.Signer {
}
return private
}
func generateNewUUID() uuid.UUID {
return uuid.NewV4()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment