Commit 216880ee authored by Eric's avatar Eric

restruct code

parent 7ca4f893
......@@ -4,10 +4,9 @@ go 1.12
require (
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
github.com/gdamore/tcell v1.1.1
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect
github.com/gliderlabs/ssh v0.1.3
github.com/kr/pty v1.1.4 // indirect
github.com/mattn/go-runewidth v0.0.4
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.1
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.0
......
package core
import (
"github.com/sirupsen/logrus"
)
var log *logrus.Logger
const maxBufferSize = 1024 * 4
func init() {
log = logrus.New()
}
package core
import (
"context"
uuid "github.com/satori/go.uuid"
)
type ProxyChannel interface {
UUID() string
ReceiveRequestChannel(ctx context.Context) chan<- []byte
SendResponseChannel(ctx context.Context) <-chan []byte
Wait() error
}
func NewMemoryChannel(nConn *NodeConn, useS Conn) *memoryChannel {
m := &memoryChannel{
uuid: nConn.UUID(),
conn: nConn,
}
return m
}
type memoryChannel struct {
uuid uuid.UUID
conn *NodeConn
}
func (m *memoryChannel) UUID() string {
return m.uuid.String()
}
func (m *memoryChannel) SendResponseChannel(ctx context.Context) <-chan []byte {
go m.conn.handleResponse(ctx)
return m.conn.outChan
}
func (m *memoryChannel) ReceiveRequestChannel(ctx context.Context) chan<- []byte {
go m.conn.handleRequest(ctx)
return m.conn.inChan
}
func (m *memoryChannel) Wait() error {
return m.conn.Wait()
}
package core
package parser
type SpecialRuler interface {
......
package core
package parser
import (
"bytes"
......
package core
package proxy
import (
"cocogo/pkg/transport"
"cocogo/pkg/userhome"
"context"
"sync"
"github.com/sirupsen/logrus"
)
var log = logrus.New()
var Manager = &manager{
container: new(sync.Map),
}
......@@ -13,7 +19,7 @@ type manager struct {
container *sync.Map
}
func (m *manager) add(uHome SessionHome) {
func (m *manager) add(uHome userhome.SessionHome) {
m.container.Store(uHome.SessionID(), uHome)
}
......@@ -23,35 +29,35 @@ func (m *manager) delete(roomID string) {
}
func (m *manager) search(roomID string) (SessionHome, bool) {
func (m *manager) search(roomID string) (userhome.SessionHome, bool) {
if uHome, ok := m.container.Load(roomID); ok {
return uHome.(SessionHome), ok
return uHome.(userhome.SessionHome), ok
}
return nil, false
}
func (m *manager) JoinShareRoom(roomID string, uConn Conn) {
func (m *manager) JoinShareRoom(roomID string, uConn userhome.Conn) {
if userHome, ok := m.search(roomID); ok {
userHome.AddConnection(uConn)
}
}
func (m *manager) ExitShareRoom(roomID string, uConn Conn) {
func (m *manager) ExitShareRoom(roomID string, uConn userhome.Conn) {
if userHome, ok := m.search(roomID); ok {
userHome.RemoveConnection(uConn)
}
}
func (m *manager) Switch(ctx context.Context, userHome SessionHome, pChannel ProxyChannel) error {
m.add(userHome)
defer m.delete(userHome.SessionID())
func (m *manager) Switch(ctx context.Context, uHome userhome.SessionHome, agent transport.Agent) error {
m.add(uHome)
defer m.delete(uHome.SessionID())
subCtx, cancelFunc := context.WithCancel(ctx)
userSendRequestStream := userHome.SendRequestChannel(subCtx)
userReceiveStream := userHome.ReceiveResponseChannel(subCtx)
nodeRequestChan := pChannel.ReceiveRequestChannel(subCtx)
nodeSendResponseStream := pChannel.SendResponseChannel(subCtx)
userSendRequestStream := uHome.SendRequestChannel(subCtx)
userReceiveStream := uHome.ReceiveResponseChannel(subCtx)
nodeRequestChan := agent.ReceiveRequestChannel(subCtx)
nodeSendResponseStream := agent.SendResponseChannel(subCtx)
for userSendRequestStream != nil || nodeSendResponseStream != nil {
select {
......@@ -59,6 +65,7 @@ func (m *manager) Switch(ctx context.Context, userHome SessionHome, pChannel Pro
if !ok {
log.Warn("userSendRequestStream close")
userSendRequestStream = nil
close(nodeRequestChan)
continue
}
nodeRequestChan <- buf1
......@@ -72,9 +79,11 @@ func (m *manager) Switch(ctx context.Context, userHome SessionHome, pChannel Pro
}
userReceiveStream <- buf2
case <-ctx.Done():
log.Info("proxy end by context done")
cancelFunc()
return nil
}
}
log.Info("switch end")
log.Info("proxy end")
return nil
}
package sshd
import (
"cocogo/pkg/core"
"cocogo/pkg/model"
"cocogo/pkg/proxy"
"cocogo/pkg/transport"
"cocogo/pkg/userhome"
"context"
"fmt"
"io"
......@@ -178,10 +180,7 @@ func (s *sshInteractive) changeLanguage() {
}
func (s *sshInteractive) JoinShareRoom(roomID string) {
sshConn := &SSHConn{
conn: s.sess,
uuid: generateNewUUID(),
}
sshConn := userhome.NewSSHConn(s.sess)
ctx, cancelFuc := context.WithCancel(s.sess.Context())
_, winCh, _ := s.sess.Pty()
......@@ -198,7 +197,7 @@ func (s *sshInteractive) JoinShareRoom(roomID string) {
}
}
}()
core.Manager.JoinShareRoom(roomID, sshConn)
proxy.Manager.JoinShareRoom(roomID, sshConn)
log.Info("exit room id:", roomID)
cancelFuc()
......@@ -359,30 +358,28 @@ func (s *sshInteractive) Proxy(asset model.Asset, systemUser model.SystemUserAut
3. 创建一个NodeConn,及相关的channel 可以是MemoryChannel 或者是redisChannel
4. session Home 与 proxy channel 交换数据
*/
sshConn := &SSHConn{
conn: s.sess,
uuid: generateNewUUID(),
}
serverAuth := core.ServerAuth{
ptyReq, winChan, _ := s.sess.Pty()
sshConn := userhome.NewSSHConn(s.sess)
serverAuth := transport.ServerAuth{
IP: asset.Ip,
Port: asset.Port,
UserName: systemUser.UserName,
Password: systemUser.Password,
PublicKey: parsePrivateKey(systemUser.PrivateKey)}
nodeConn, err := core.NewNodeConn(serverAuth, sshConn)
nodeConn, err := transport.NewNodeConn(s.sess.Context(), serverAuth, ptyReq, winChan)
if err != nil {
log.Error(err)
return err
}
defer nodeConn.Close()
memChan := core.NewMemoryChannel(nodeConn, sshConn)
memChan := transport.NewMemoryAgent(nodeConn)
userHome := core.NewUserSessionHome(sshConn)
log.Info("session Home ID: ", userHome.SessionID())
Home := userhome.NewUserSessionHome(sshConn)
log.Info("session Home ID: ", Home.SessionID())
err = core.Manager.Switch(context.TODO(), userHome, memChan)
err = proxy.Manager.Switch(s.sess.Context(), Home, memChan)
if err != nil {
log.Error(err)
}
......
......@@ -5,6 +5,7 @@ import (
"cocogo/pkg/config"
"cocogo/pkg/model"
"io"
"runtime"
"strconv"
"sync"
"text/template"
......@@ -75,6 +76,8 @@ func connectHandler(sess ssh.Session) {
log.Info("accept one session")
userInteractive.displayHelpInfo()
userInteractive.StartDispatch()
log.Info("finish one session")
runtime.GC()
} else {
_, err := io.WriteString(sess, "No PTY requested.\n")
......
package sshd
import (
uuid "github.com/satori/go.uuid"
gossh "golang.org/x/crypto/ssh"
)
func parsePrivateKey(privateKey string) gossh.Signer {
private, err := gossh.ParsePrivateKey([]byte(privateKey))
if err != nil {
log.Info("Failed to parse private key: ", err)
log.Error("Failed to parse private key: ", err)
}
return private
}
func generateNewUUID() uuid.UUID {
return uuid.NewV4()
}
......@@ -7,7 +7,7 @@ const welcomeTemplate = `
{{.Tab}}3) Enter {{.ColorCode}}p{{.ColorEnd}} to display the host you have permission.{{.EndLine}}
{{.Tab}}4) Enter {{.ColorCode}}g{{.ColorEnd}} to display the node that you have permission.{{.EndLine}}
{{.Tab}}5) Enter {{.ColorCode}}g{{.ColorEnd}} + {{.ColorCode}}NodeID{{.ColorEnd}} to display the host under the node, such as g1. {{.EndLine}}
{{.Tab}}6) Enter {{.ColorCode}}s{{.ColorEnd}} Chinese-english switch.{{.EndLine}}
{{.Tab}}6) Enter {{.ColorCode}}s{{.ColorEnd}} Chinese-english proxy.{{.EndLine}}
{{.Tab}}7) Enter {{.ColorCode}}h{{.ColorEnd}} help.{{.EndLine}}
{{.Tab}}8) Enter {{.ColorCode}}r{{.ColorEnd}} to refresh your assets and nodes.{{.EndLine}}
{{.Tab}}0) Enter {{.ColorCode}}q{{.ColorEnd}} exit.{{.EndLine}}
......
package transport
import "context"
type Conn interface {
ReceiveRequest(context.Context, <-chan []byte, chan<- []byte)
SendResponse(context.Context, chan<- []byte)
}
package core
package transport
import (
"cocogo/pkg/parser"
"context"
"fmt"
"io"
"github.com/sirupsen/logrus"
"github.com/gliderlabs/ssh"
uuid "github.com/satori/go.uuid"
gossh "golang.org/x/crypto/ssh"
)
type Conn interface {
SessionID() string
User() string
UUID() uuid.UUID
Pty() (ssh.Pty, <-chan ssh.Window, bool)
var log = logrus.New()
Context() context.Context
io.Reader
io.WriteCloser
}
const maxBufferSize = 1024 * 4
type ServerAuth struct {
IP string
......@@ -57,13 +49,12 @@ func CreateNodeSession(authInfo ServerAuth) (c *gossh.Client, s *gossh.Session,
return client, s, nil
}
func NewNodeConn(authInfo ServerAuth, userS Conn) (*NodeConn, error) {
func NewNodeConn(ctx context.Context, authInfo ServerAuth, ptyReq ssh.Pty, winCh <-chan ssh.Window) (*NodeConn, error) {
c, s, err := CreateNodeSession(authInfo)
if err != nil {
return nil, err
}
ptyReq, winCh, _ := userS.Pty()
err = s.RequestPty(ptyReq.Term, ptyReq.Window.Height, ptyReq.Window.Width, gossh.TerminalModes{})
if err != nil {
return nil, err
......@@ -81,7 +72,7 @@ func NewNodeConn(authInfo ServerAuth, userS Conn) (*NodeConn, error) {
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(userS.Context())
ctx, cancelFunc := context.WithCancel(ctx)
nConn := &NodeConn{
uuid: uuid.NewV4(),
......@@ -91,9 +82,7 @@ func NewNodeConn(authInfo ServerAuth, userS Conn) (*NodeConn, error) {
ctxCancelFunc: cancelFunc,
stdin: nodeStdin,
stdout: nodeStdout,
tParser: NewTerminalParser(),
inChan: make(chan []byte),
outChan: make(chan []byte),
tParser: parser.NewTerminalParser(),
}
go nConn.windowChangeHandler(winCh)
......@@ -107,16 +96,14 @@ type NodeConn struct {
conn *gossh.Session
stdin io.Writer
stdout io.Reader
tParser *TerminalParser
tParser *parser.TerminalParser
currentCommandInput string
currentCommandResult string
rulerFilters []RuleFilter
specialCommands []SpecialRuler
rulerFilters []parser.RuleFilter
specialCommands []parser.SpecialRuler
inSpecialStatus bool
ctx context.Context
ctxCancelFunc context.CancelFunc
inChan chan []byte
outChan chan []byte
}
func (n *NodeConn) UUID() uuid.UUID {
......@@ -171,13 +158,69 @@ func (n *NodeConn) windowChangeHandler(winCH <-chan ssh.Window) {
}
func (n *NodeConn) handleRequest(ctx context.Context) {
func (n *NodeConn) Close() {
select {
case <-n.ctx.Done():
return
default:
_ = n.conn.Close()
_ = n.client.Close()
n.ctxCancelFunc()
log.Info("Close conn")
}
}
func (n *NodeConn) SendResponse(ctx context.Context, outChan chan<- []byte) {
buf := make([]byte, maxBufferSize)
defer close(outChan)
for {
nr, err := n.stdout.Read(buf)
if err != nil {
log.Error("read conn err:", err)
return
}
if n.tParser.Started && nr > 0 {
n.FilterSpecialCommand(buf[:nr])
switch {
case n.inSpecialStatus:
// 进入特殊命令状态,
case n.tParser.InputStatus:
n.tParser.CmdInputBuf.Write(buf[:nr])
case n.tParser.OutputStatus:
n.tParser.CmdOutputBuf.Write(buf[:nr])
default:
}
}
select {
case <-ctx.Done():
log.Info("SendResponse finish by context done")
return
case buf, ok := <-n.inChan:
default:
copyBuf := make([]byte, len(buf[:nr]))
copy(copyBuf, buf[:nr])
outChan <- copyBuf
}
}
}
func (n *NodeConn) ReceiveRequest(ctx context.Context, inChan <-chan []byte, outChan chan<- []byte) {
defer n.Close()
for {
select {
case <-ctx.Done():
log.Error("ReceiveRequest finish by context done")
return
case buf, ok := <-inChan:
if !ok {
log.Error("ReceiveRequest finish by inChan close")
return
}
......@@ -194,7 +237,7 @@ func (n *NodeConn) handleRequest(ctx context.Context) {
n.currentCommandInput = n.tParser.ParseCommandInput()
if n.FilterWhiteBlackRule(n.currentCommandInput) {
msg := fmt.Sprintf("\r\n cmd '%s' is forbidden \r\n", n.currentCommandInput)
n.outChan <- []byte(msg)
outChan <- []byte(msg)
ctrU := []byte{21, 13} // 清除行并换行
_, err := n.stdin.Write(ctrU)
if err != nil {
......@@ -219,56 +262,12 @@ func (n *NodeConn) handleRequest(ctx context.Context) {
n.tParser.InputStatus = true
}
_, _ = n.stdin.Write(buf)
}
}
}
func (n *NodeConn) handleResponse(ctx context.Context) {
buf := make([]byte, maxBufferSize)
defer close(n.outChan)
for {
nr, err := n.stdout.Read(buf)
if err != nil {
return
}
if n.tParser.Started && nr > 0 {
n.FilterSpecialCommand(buf[:nr])
switch {
case n.inSpecialStatus:
// 进入特殊命令状态,
case n.tParser.InputStatus:
n.tParser.CmdInputBuf.Write(buf[:nr])
case n.tParser.OutputStatus:
n.tParser.CmdOutputBuf.Write(buf[:nr])
default:
_, err := n.stdin.Write(buf)
if err != nil {
log.Error("write conn err:", err)
return
}
}
select {
case <-ctx.Done():
return
default:
copyBuf := make([]byte, len(buf[:nr]))
copy(copyBuf, buf[:nr])
n.outChan <- copyBuf
}
}
}
func (n *NodeConn) Close() {
select {
case <-n.ctx.Done():
return
default:
_ = n.conn.Close()
_ = n.client.Close()
n.ctxCancelFunc()
}
}
package transport
import (
"context"
uuid "github.com/satori/go.uuid"
)
type Agent interface {
ReceiveRequestChannel(ctx context.Context) chan<- []byte
SendResponseChannel(ctx context.Context) <-chan []byte
}
func NewMemoryAgent(nConn Conn) *memoryAgent {
m := &memoryAgent{
conn: nConn,
inChan: make(chan []byte),
outChan: make(chan []byte),
}
return m
}
type memoryAgent struct {
uuid uuid.UUID
conn Conn
inChan chan []byte
outChan chan []byte
}
func (m *memoryAgent) SendResponseChannel(ctx context.Context) <-chan []byte {
go m.conn.SendResponse(ctx, m.outChan)
return m.outChan
}
func (m *memoryAgent) ReceiveRequestChannel(ctx context.Context) chan<- []byte {
go m.conn.ReceiveRequest(ctx, m.inChan, m.outChan)
return m.inChan
}
package userhome
import (
"context"
"io"
"github.com/sirupsen/logrus"
"github.com/gliderlabs/ssh"
uuid "github.com/satori/go.uuid"
)
var log = logrus.New()
const maxBufferSize = 1024 * 4
type Conn interface {
SessionID() string
User() string
UUID() uuid.UUID
Pty() (ssh.Pty, <-chan ssh.Window, bool)
Context() context.Context
io.Reader
io.WriteCloser
}
package sshd
package userhome
import (
"context"
......@@ -7,7 +7,13 @@ import (
uuid "github.com/satori/go.uuid"
)
// ssh方式连接coco的用户request 将实现Conn接口
func NewSSHConn(sess ssh.Session) *SSHConn {
return &SSHConn{
conn: sess,
uuid: uuid.NewV4(),
}
}
type SSHConn struct {
conn ssh.Session
uuid uuid.UUID
......@@ -44,7 +50,3 @@ func (s *SSHConn) Write(b []byte) (n int, err error) {
func (s *SSHConn) Close() error {
return s.conn.Close()
}
// ws方式连接coco的用户request 将实现Conn接口
type WSConn struct {
}
package core
package userhome
import (
"context"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment