Unverified Commit 6d4e69b2 authored by Eric_Lee's avatar Eric_Lee Committed by GitHub

修复bug,增加复用连接选项 (#68)

* fix bugs

* fix ip addr bug

* fix addr bugs

* fix download many files bug

* fix upload chunk file bugs

* support folder cut and paste

* update go.sum file

* 修复web文件管理按钮和页面背景重叠

* 优化命令过滤匹配

* make ssh reuse connection configurable
parent 9ab4ea69
This diff is collapsed.
......@@ -5,6 +5,7 @@
<script type="text/javascript" src="/coco/static/js/neffos.min.js"></script>
<script type="text/javascript" src="/coco/static/plugins/elfinder/elfinder.full.js"></script>
<script type="text/javascript" src="/coco/static/plugins/elfinder/i18n/elfinder.pl.js"></script>
<link rel="stylesheet" type="text/css" media="screen" href="/coco/static/js/jquery-ui-1.10.4.min.css">
<link rel="stylesheet" type="text/css" media="screen" href="/coco/static/plugins/elfinder/css/elfinder.min.css">
<link rel="stylesheet" type="text/css" media="screen" href="/coco/static/plugins/elfinder/css/theme-gray.css">
<script type="text/javascript" charset="utf-8">
......
......@@ -6,7 +6,7 @@ require (
github.com/Azure/azure-pipeline-go v0.1.9 // indirect
github.com/Azure/azure-storage-blob-go v0.6.0
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/LeeEirc/elfinder v0.0.1
github.com/LeeEirc/elfinder v0.0.2
github.com/aliyun/aliyun-oss-go-sdk v1.9.8
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
github.com/aws/aws-sdk-go v1.19.46
......
......@@ -9,6 +9,8 @@ github.com/LeeEirc/elfinder v0.0.0-20190718023636-5679c8bdb7bf h1:dZipr1cwienSKN
github.com/LeeEirc/elfinder v0.0.0-20190718023636-5679c8bdb7bf/go.mod h1:ApL/XFs34Gvqinex9Z1sZdsp3Jeu26nNuEsf1wQFx8s=
github.com/LeeEirc/elfinder v0.0.1 h1:fFVy2xddwB2qQxLxJOGl+1Lj686pnRFnySsjPr7luZ0=
github.com/LeeEirc/elfinder v0.0.1/go.mod h1:VSfmUhE4Fvv+4Dfyo7Wmi44bdyDuIQgJtyi5EDcDSxE=
github.com/LeeEirc/elfinder v0.0.2 h1:OnsOkZ3FVVKk91JxQQGwAULo78BSwPRN0yXaYcUK6Yk=
github.com/LeeEirc/elfinder v0.0.2/go.mod h1:VSfmUhE4Fvv+4Dfyo7Wmi44bdyDuIQgJtyi5EDcDSxE=
github.com/aliyun/aliyun-oss-go-sdk v1.9.8 h1:BOflvK0Zs/zGmoabyFIzTg5c3kguktWTXEwewwbuba0=
github.com/aliyun/aliyun-oss-go-sdk v1.9.8/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA=
......
......@@ -26,6 +26,7 @@ type Config struct {
MaxIdleTime time.Duration `json:"SECURITY_MAX_IDLE_TIME"`
SftpRoot string `json:"TERMINAL_SFTP_ROOT" yaml:"SFTP_ROOT"`
ShowHiddenFile bool `yaml:"SFTP_SHOW_HIDDEN_FILE"`
ReuseConnection bool `yaml:"REUSE_CONNECTION"`
Name string `yaml:"NAME"`
SecretKey string `yaml:"SECRET_KEY"`
HostKeyFile string `yaml:"HOST_KEY_FILE"`
......@@ -131,6 +132,7 @@ var Conf = &Config{
UploadFailedReplay: true,
SftpRoot: "/tmp",
ShowHiddenFile: false,
ReuseConnection: true,
}
func SetConf(conf *Config) {
......
......@@ -173,6 +173,8 @@ func (u *UserVolume) UploadChunk(cid int, dirPath, uploadPath, filename string,
switch {
case strings.Contains(uploadPath,filename):
path = filepath.Join(dirPath, TrimPrefix(uploadPath))
case uploadPath != "":
path = filepath.Join(dirPath, TrimPrefix(uploadPath), filename)
default:
path = filepath.Join(dirPath, filename)
......@@ -204,6 +206,8 @@ func (u *UserVolume) MergeChunk(cid, total int, dirPath, uploadPath, filename st
switch {
case strings.Contains(uploadPath,filename):
path = filepath.Join(dirPath, TrimPrefix(uploadPath))
case uploadPath != "":
path = filepath.Join(dirPath, TrimPrefix(uploadPath), filename)
default:
path = filepath.Join(dirPath, filename)
......
......@@ -6,6 +6,8 @@ import (
"sort"
"strconv"
"strings"
"unicode"
"unicode/utf8"
)
type AssetList []Asset
......@@ -315,8 +317,14 @@ func (sf *SystemUserFilterRule) Pattern() *regexp.Regexp {
if sf.Type.Value == TypeCmd {
var regex []string
for _, cmd := range strings.Split(sf.Content, "\r\n") {
cmd = regexp.QuoteMeta(cmd)
cmd = strings.Replace(cmd, " ", "\\s+", 1)
regex = append(regex, fmt.Sprintf("\\b%s\\b", cmd))
regexItem := fmt.Sprintf(`\b%s\b`, cmd)
lastRune, _ := utf8.DecodeLastRuneInString(cmd)
if lastRune != utf8.RuneError && !unicode.IsLetter(lastRune) {
regexItem = fmt.Sprintf(`\b%s`, cmd)
}
regex = append(regex, regexItem)
}
regexs = strings.Join(regex, "|")
} else {
......
......@@ -4,6 +4,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"
"github.com/jumpserver/koko/pkg/config"
......@@ -93,19 +94,18 @@ func (p *ProxyServer) validatePermission() bool {
}
// getSSHConn 获取ssh连接
func (p *ProxyServer) getSSHConn(fromCache ...bool) (srvConn *srvconn.ServerSSHConnection, err error) {
func (p *ProxyServer) getSSHConn() (srvConn *srvconn.ServerSSHConnection, err error) {
pty := p.UserConn.Pty()
conf := config.GetConf()
srvConn = &srvconn.ServerSSHConnection{
User: p.User,
Asset: p.Asset,
SystemUser: p.SystemUser,
Overtime: time.Duration(config.GetConf().SSHTimeout) * time.Second,
}
if len(fromCache) > 0 && fromCache[0] {
err = srvConn.TryConnectFromCache(pty.Window.Height, pty.Window.Width, pty.Term)
} else {
err = srvConn.Connect(pty.Window.Height, pty.Window.Width, pty.Term)
}
User: p.User,
Asset: p.Asset,
SystemUser: p.SystemUser,
Overtime: conf.SSHTimeout * time.Second,
ReuseConnection: conf.ReuseConnection,
CloseOnce: new(sync.Once),
}
err = srvConn.Connect(pty.Window.Height, pty.Window.Width, pty.Term)
return
}
......@@ -127,14 +127,6 @@ func (p *ProxyServer) getTelnetConn() (srvConn *srvconn.ServerTelnetConnection,
return
}
// getServerConnFromCache 从cache中获取ssh server连接
func (p *ProxyServer) getServerConnFromCache() (srvConn srvconn.ServerConnection, err error) {
if p.SystemUser.Protocol == "ssh" {
srvConn, err = p.getSSHConn(true)
}
return
}
// getServerConn 获取获取server连接
func (p *ProxyServer) getServerConn() (srvConn srvconn.ServerConnection, err error) {
err = p.getSystemUserUsernameIfNeed()
......@@ -154,7 +146,7 @@ func (p *ProxyServer) getServerConn() (srvConn srvconn.ServerConnection, err err
if p.SystemUser.Protocol == "telnet" {
return p.getTelnetConn()
} else {
return p.getSSHConn(false)
return p.getSSHConn()
}
}
......@@ -220,11 +212,7 @@ func (p *ProxyServer) Proxy() {
if !p.preCheckRequisite() {
return
}
// 先从cache中获取srv连接, 如果没有获得,则连接
srvConn, err := p.getServerConnFromCache()
if err != nil || srvConn == nil {
srvConn, err = p.getServerConn()
}
srvConn, err := p.getServerConn()
// 连接后端服务器失败
if err != nil {
p.sendConnectErrorMsg(err)
......
......@@ -16,9 +16,8 @@ import (
)
var (
sshClients = make(map[string]*SSHClient)
clientsRefCounter = make(map[*SSHClient]int)
clientLock = new(sync.RWMutex)
sshClients = make(map[string]*SSHClient)
clientLock = new(sync.RWMutex)
)
var (
......@@ -32,8 +31,43 @@ var (
)
type SSHClient struct {
Client *gossh.Client
Username string
client *gossh.Client
username string
ref int
key string
mu *sync.RWMutex
}
func (s *SSHClient) refCount() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.ref
}
func (s *SSHClient) increaseRef() {
s.mu.Lock()
defer s.mu.Unlock()
s.ref++
}
func (s *SSHClient) decreaseRef() {
s.mu.Lock()
defer s.mu.Unlock()
s.ref--
}
func (s *SSHClient) NewSession() (*gossh.Session, error) {
return s.client.NewSession()
}
func (s *SSHClient) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.ref > 1 {
return nil
}
return s.client.Close()
}
type SSHClientConfig struct {
......@@ -150,7 +184,7 @@ func MakeConfig(asset *model.Asset, systemUser *model.SystemUser, timeout time.D
}
}
}
if systemUser.Password == "" && systemUser.PrivateKey == "" && systemUser.LoginMode != model.LoginModeManual{
if systemUser.Password == "" && systemUser.PrivateKey == "" && systemUser.LoginMode != model.LoginModeManual {
info := service.GetSystemUserAssetAuthInfo(systemUser.ID, asset.ID)
systemUser.Password = info.Password
systemUser.PrivateKey = info.PrivateKey
......@@ -173,78 +207,69 @@ func newClient(asset *model.Asset, systemUser *model.SystemUser, timeout time.Du
if err != nil {
return nil, err
}
return &SSHClient{Client: conn, Username: systemUser.Username}, err
return &SSHClient{client: conn, username: systemUser.Username, mu: new(sync.RWMutex)}, err
}
func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUser, timeout time.Duration) (client *SSHClient, err error) {
client = GetClientFromCache(user, asset, systemUser)
if client != nil {
return client, nil
}
func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUser, timeout time.Duration,
useCache bool) (client *SSHClient, err error) {
key := fmt.Sprintf("%s_%s_%s", user.ID, asset.ID, systemUser.ID)
switch {
case useCache:
client = getClientFromCache(key)
if client != nil {
if systemUser.Username == "" {
systemUser.Username = client.username
}
logger.Infof("Reuse connection: %s->%s@%s ref: %d",
user.Username, client.username, asset.IP, client.refCount())
return client, nil
}
}
client, err = newClient(asset, systemUser, timeout)
if err == nil {
clientLock.Lock()
sshClients[key] = client
clientsRefCounter[client] = 1
clientLock.Unlock()
if err == nil && useCache {
setClientCache(key, client)
}
return
}
func GetClientFromCache(user *model.User, asset *model.Asset, systemUser *model.SystemUser) (client *SSHClient) {
key := fmt.Sprintf("%s_%s_%s", user.ID, asset.ID, systemUser.ID)
func getClientFromCache(key string) (client *SSHClient) {
clientLock.Lock()
defer clientLock.Unlock()
client, ok := sshClients[key]
if !ok {
return
}
if systemUser.Username == "" {
systemUser.Username = client.Username
return nil
}
var u = user.Username
var ip = asset.IP
clientsRefCounter[client]++
var counter = clientsRefCounter[client]
logger.Infof("Reuse connection: %s->%s@%s ref: %d", u, client.Username, ip, counter)
client.increaseRef()
return
}
func RecycleClient(client *SSHClient) {
clientLock.RLock()
counter, ok := clientsRefCounter[client]
clientLock.RUnlock()
if ok {
if counter == 1 {
logger.Debug("Recycle client: close it")
CloseClient(client)
} else {
clientLock.Lock()
clientsRefCounter[client]--
clientLock.Unlock()
logger.Debugf("Recycle client: ref -1: %d", clientsRefCounter[client])
}
}
}
func CloseClient(client *SSHClient) {
func setClientCache(key string, client *SSHClient) {
clientLock.Lock()
defer clientLock.Unlock()
sshClients[key] = client
client.increaseRef()
client.key = key
clientLock.Unlock()
}
delete(clientsRefCounter, client)
var key string
for k, v := range sshClients {
if v == client {
key = k
break
}
func RecycleClient(client *SSHClient) {
// 0, 1: delete Cache, close client.
// default: client ref decrease.
if client == nil {
return
}
if key != "" {
delete(sshClients, key)
switch client.refCount() {
case 0, 1:
clientLock.Lock()
delete(sshClients, client.key)
clientLock.Unlock()
err := client.Close()
if err != nil {
logger.Info("Failed to close client err: ", err.Error())
}else {
logger.Debug("Success to close client")
}
default:
client.decreaseRef()
}
_ = client.Client.Close()
}
......@@ -27,13 +27,15 @@ func NewUserSFTP(user *model.User, addr string, assets ...model.Asset) *UserSftp
}
type UserSftp struct {
User *model.User
Addr string
User *model.User
Addr string
RootPath string
ShowHidden bool
hosts map[string]*HostnameDir // key hostname or hostname.orgName
sftpClients map[string]*SftpConn // key %s@%s suName hostName
RootPath string
ShowHidden bool
ReuseConnection bool
Overtime time.Duration
hosts map[string]*HostnameDir // key hostname or hostname.orgName
sftpClients map[string]*SftpConn // key %s@%s suName hostName
LogChan chan *model.FTPLog
}
......@@ -42,6 +44,8 @@ func (u *UserSftp) initial(assets []model.Asset) {
conf := config.GetConf()
u.RootPath = conf.SftpRoot
u.ShowHidden = conf.ShowHiddenFile
u.ReuseConnection = conf.ReuseConnection
u.Overtime = conf.SSHTimeout * time.Second
u.hosts = make(map[string]*HostnameDir)
u.sftpClients = make(map[string]*SftpConn)
u.LogChan = make(chan *model.FTPLog, 10)
......@@ -92,9 +96,9 @@ func (u *UserSftp) ReadDir(path string) (res []os.FileInfo, err error) {
res, err = conn.client.ReadDir(realPath)
if !u.ShowHidden {
noHiddenFiles := make([]os.FileInfo, 0, len(res))
for i:=0; i<len(res);i++ {
for i := 0; i < len(res); i++ {
if !strings.HasPrefix(res[i].Name(), ".") {
noHiddenFiles = append(noHiddenFiles,res[i])
noHiddenFiles = append(noHiddenFiles, res[i])
}
}
return noHiddenFiles, err
......@@ -577,11 +581,11 @@ func (u *UserSftp) SendFTPLog(dataChan <-chan *model.FTPLog) {
}
func (u *UserSftp) GetSftpClient(asset *model.Asset, sysUser *model.SystemUser) (conn *SftpConn, err error) {
sshClient, err := NewClient(u.User, asset, sysUser, config.GetConf().SSHTimeout*time.Second)
sshClient, err := NewClient(u.User, asset, sysUser, u.Overtime, u.ReuseConnection)
if err != nil {
return
}
sftpClient, err := sftp.NewClient(sshClient.Client)
sftpClient, err := sftp.NewClient(sshClient.client)
if err != nil {
return
}
......
package srvconn
import (
"errors"
"io"
"sync"
"time"
gossh "golang.org/x/crypto/ssh"
......@@ -11,29 +11,25 @@ import (
)
type ServerSSHConnection struct {
User *model.User
Asset *model.Asset
SystemUser *model.SystemUser
Overtime time.Duration
client *SSHClient
session *gossh.Session
stdin io.WriteCloser
stdout io.Reader
closed bool
connected bool
User *model.User
Asset *model.Asset
SystemUser *model.SystemUser
Overtime time.Duration
CloseOnce *sync.Once
ReuseConnection bool
client *SSHClient
session *gossh.Session
stdin io.WriteCloser
stdout io.Reader
}
func (sc *ServerSSHConnection) Protocol() string {
return "ssh"
}
func (sc *ServerSSHConnection) Username() string {
return sc.client.Username
}
func (sc *ServerSSHConnection) invokeShell(h, w int, term string) (err error) {
sess, err := sc.client.Client.NewSession()
sess, err := sc.client.NewSession()
if err != nil {
return
}
......@@ -60,32 +56,15 @@ func (sc *ServerSSHConnection) invokeShell(h, w int, term string) (err error) {
}
func (sc *ServerSSHConnection) Connect(h, w int, term string) (err error) {
sc.client, err = NewClient(sc.User, sc.Asset, sc.SystemUser, sc.Timeout())
sc.client, err = NewClient(sc.User, sc.Asset, sc.SystemUser, sc.Timeout(), sc.ReuseConnection)
if err != nil {
return
}
err = sc.invokeShell(h, w, term)
if err != nil {
return
}
sc.connected = true
return nil
}
func (sc *ServerSSHConnection) TryConnectFromCache(h, w int, term string) (err error) {
sc.client = GetClientFromCache(sc.User, sc.Asset, sc.SystemUser)
if sc.client == nil {
return errors.New("no client in cache")
}
err = sc.invokeShell(h, w, term)
if err != nil {
RecycleClient(sc.client)
return
}
sc.connected = true
return nil
return
}
func (sc *ServerSSHConnection) SetWinSize(h, w int) error {
......@@ -108,13 +87,9 @@ func (sc *ServerSSHConnection) Timeout() time.Duration {
}
func (sc *ServerSSHConnection) Close() (err error) {
RecycleClient(sc.client)
if sc.closed || !sc.connected {
return
}
err = sc.session.Close()
if err != nil {
return
}
return
sc.CloseOnce.Do(func() {
RecycleClient(sc.client)
})
return sc.session.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment