Commit 2de9b63c authored by ibuler's avatar ibuler

[Update] 优化ws

parent bd3d0119
......@@ -5,6 +5,7 @@ ARG GOPROXY
ENV GOPROXY=$GOPROXY
ENV GO111MODULE=on
COPY . .
RUN cd cmd && go build koko.go
FROM alpine
......
......@@ -7,21 +7,16 @@ import (
"github.com/gliderlabs/ssh"
"github.com/kataras/neffos"
"github.com/jumpserver/koko/pkg/logger"
"github.com/jumpserver/koko/pkg/model"
)
type Client struct {
Uuid string
Cid string
user *model.User
addr string
WinChan chan ssh.Window
UserRead io.Reader
UserWrite io.WriteCloser
Conn *neffos.NSConn
Conn *neffos.NSConn
Closed bool
pty ssh.Pty
mu *sync.RWMutex
......@@ -49,18 +44,13 @@ func (c *Client) Write(p []byte) (n int, err error) {
if c.Closed {
return
}
data := DataMsg{Data: string(p)}
data := DataMsg{Data: string(p), Room: c.Uuid}
msg, err := json.Marshal(data)
if err != nil {
return
}
n = len(p)
room := c.Conn.Room(c.Uuid)
if room == nil {
logger.Error("room not found: ", c.Uuid)
return
}
room.Emit("data", msg)
c.Conn.Emit("data", msg)
return
}
......@@ -81,5 +71,8 @@ func (c *Client) Close() (err error) {
func (c *Client) SetWinSize(size ssh.Window) {
c.mu.RLock()
defer c.mu.RUnlock()
c.WinChan <- size
select {
case c.WinChan <- size:
default:
}
}
......@@ -20,6 +20,7 @@ type TokenMsg struct {
type DataMsg struct {
Data string `json:"data"`
Room string `json:"room"`
}
type RoomMsg struct {
......@@ -27,16 +28,11 @@ type RoomMsg struct {
Secret string `json:"secret"`
}
type EmitDataMsg struct {
Room string `json:"room"`
Data string `json:"data"`
}
type EmitLogoutMsg struct {
type LogoutMsg struct {
Room string `json:"room"`
}
type EmitDisconnectMsg struct {
type DisconnectMsg struct {
}
type EmitSidMsg struct {
......
......@@ -20,10 +20,6 @@ import (
"github.com/jumpserver/koko/pkg/service"
)
// OnConnectHandler 当websocket连接后触发
func OnNamespaceConnected(c *neffos.NSConn, msg neffos.Message) error {
// 首次连接 1.获取当前用户的信息
......@@ -61,9 +57,8 @@ func OnNamespaceConnected(c *neffos.NSConn, msg neffos.Message) error {
return nil
}
// OnDisconnect websocket断开后触发
func OnNamespaceDisconnect(c *neffos.NSConn, msg neffos.Message) (err error){
func OnNamespaceDisconnect(c *neffos.NSConn, msg neffos.Message) (err error) {
logger.Debug("On disconnect event trigger")
conns.DeleteClients(c.Conn.ID())
return nil
......@@ -83,7 +78,6 @@ func OnHostHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
if err != nil {
return
}
fmt.Println("Host msg: ", message)
win := ssh.Window{Height: 24, Width: 80}
assetID := message.Uuid
systemUserID := message.UserID
......@@ -97,46 +91,49 @@ func OnHostHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
}
roomID := uuid.NewV4().String()
emitMsg := RoomMsg{roomID, secret}
joinRoomMsg, _ := json.Marshal(emitMsg)
c.Emit("room", joinRoomMsg)
if err != nil {
logger.Debug("Join room error occur: ", err)
return
}
roomMsg, _ := json.Marshal(emitMsg)
c.Emit("room", roomMsg)
asset := service.GetAsset(assetID)
systemUser := service.GetSystemUser(systemUserID)
if asset.ID == "" || systemUser.ID == "" {
logger.Debug("No asset id or system user id found, exit")
msg := "No asset id or system user id found, exit"
logger.Debug(msg)
dataMsg := DataMsg{Room: roomID, Data: msg}
c.Emit("data", neffos.Marshal(dataMsg))
return
}
logger.Debug("Web terminal want to connect host: ", asset.Hostname)
currentUser, ok := cc.Get("currentUser").(*model.User)
if !ok {
return errors.New("not found current user")
err = errors.New("not found current user")
dataMsg := DataMsg{Room: roomID, Data: err.Error()}
c.Emit("data", neffos.Marshal(dataMsg))
return
}
userR, userW := io.Pipe()
addr, _, _ := net.SplitHostPort(cc.Socket().Request().RemoteAddr)
client := &Client{
Uuid: roomID, user: currentUser, addr: addr,
Uuid: roomID, addr: addr,
WinChan: make(chan ssh.Window, 100), Conn: c,
UserRead: userR, UserWrite: userW, mu: new(sync.RWMutex),
pty: ssh.Pty{Term: "xterm", Window: win},
}
user := cc.Get("currentUser").(*model.User)
client.WinChan <- win
clients.AddClient(roomID, client)
conns.AddClient(cc.ID(), roomID)
proxySrv := proxy.ProxyServer{
UserConn: client, User: user,
UserConn: client, User: currentUser,
Asset: &asset, SystemUser: &systemUser,
}
go func() {
defer logger.Debug("web proxy end")
logger.Debug("Start proxy")
defer logger.Debug("Web proxy process end")
logger.Debug("Start proxy to host")
proxySrv.Proxy()
logoutMsg, _ := json.Marshal(RoomMsg{Room: roomID})
// 服务器主动退出
c.Emit("logout", logoutMsg)
clients.DeleteClient(roomID)
}()
......@@ -154,21 +151,21 @@ func OnTokenHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
}
token := message.Token
secret := message.Secret
clientID := uuid.NewV4().String()
roomMsg := RoomMsg{clientID, secret}
roomID := uuid.NewV4().String()
roomMsg := RoomMsg{roomID, secret}
c.Emit("room", neffos.Marshal(roomMsg))
// check token
if token == "" || secret == "" {
msg := fmt.Sprintf("Token or secret is None: %s %s", token, secret)
dataMsg := EmitDataMsg{Data: msg, Room: clientID}
dataMsg := DataMsg{Data: msg, Room: roomID}
c.Emit("data", neffos.Marshal(dataMsg))
c.Emit("disconnect", nil)
}
tokenUser := service.GetTokenAsset(token)
if tokenUser.UserID == "" {
msg := "Token info is none, maybe token expired"
dataMsg := EmitDataMsg{Data: msg, Room: clientID}
dataMsg := DataMsg{Data: msg, Room: roomID}
c.Emit("data", neffos.Marshal(dataMsg))
c.Emit("disconnect", nil)
}
......@@ -177,7 +174,7 @@ func OnTokenHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
if currentUser == nil {
msg := "User id error"
dataMsg := EmitDataMsg{Data: msg, Room: clientID}
dataMsg := DataMsg{Data: msg, Room: roomID}
c.Emit("data", neffos.Marshal(dataMsg))
c.Emit("disconnect", nil)
}
......@@ -185,28 +182,27 @@ func OnTokenHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
cc.Set("currentUser", currentUser)
hostMsg := HostMsg{
Uuid: tokenUser.AssetID, UserID: tokenUser.SystemUserID,
Size: message.Size, Secret:secret,
Size: message.Size, Secret: secret,
}
fmt.Println("Host msg: ", hostMsg)
hostWsMsg := neffos.Message{
Body:neffos.Marshal(hostMsg),
Body: neffos.Marshal(hostMsg),
}
return OnHostHandler(c, hostWsMsg)
}
// OnDataHandler 收发数据时触发
func OnDataHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
roomID := msg.Room
client := clients.GetClient(roomID)
if client == nil {
return
}
var message DataMsg
err = msg.Unmarshal(&message)
if err != nil {
return
}
clientID := message.Room
client := clients.GetClient(clientID)
if client == nil {
return
}
_, err = client.UserWrite.Write([]byte(message.Data))
return err
}
......@@ -220,9 +216,8 @@ func OnResizeHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
}
logger.Debugf("Web terminal on resize event trigger: %d*%d", message.Width, message.Height)
winSize := ssh.Window{Height: message.Height, Width: message.Width}
for _, room := range c.Rooms() {
roomID := room.Name
client := clients.GetClient(roomID)
for _, clientID := range conns.GetClients(c.Conn.ID()) {
client := clients.GetClient(clientID)
if client != nil {
client.SetWinSize(winSize)
}
......@@ -230,16 +225,15 @@ func OnResizeHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
return nil
}
// OnLogoutHandler 用户登出一个会话时触发
func OnLogoutHandler(c *neffos.NSConn, msg neffos.Message) (err error){
// OnLogoutHandler 用户登出一个会话时触发, 用户主动退出
func OnLogoutHandler(c *neffos.NSConn, msg neffos.Message) (err error) {
logger.Debug("Web terminal on logout event trigger: ", msg.Room)
var message RoomMsg
var message LogoutMsg
err = msg.Unmarshal(&message)
if err != nil {
return
}
roomID := message.Room
clients.DeleteClient(roomID)
clientID := message.Room
clients.DeleteClient(clientID)
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment