Unverified Commit b991c052 authored by Eric_Lee's avatar Eric_Lee Committed by GitHub

V52 bug (#82)

* 修改配置, 并移除无效的配置项

* 优化连接复用引用计数代码
parent 5976cc55
...@@ -24,16 +24,9 @@ BOOTSTRAP_TOKEN: <PleasgeChangeSameWithJumpserver> ...@@ -24,16 +24,9 @@ BOOTSTRAP_TOKEN: <PleasgeChangeSameWithJumpserver>
# ACCESS KEY 保存的地址, 默认注册后会保存到该文件中 # ACCESS KEY 保存的地址, 默认注册后会保存到该文件中
# ACCESS_KEY_FILE: data/keys/.access_key # ACCESS_KEY_FILE: data/keys/.access_key
# 加密密钥
# SECRET_KEY: null
# 设置日志级别 [DEBUG, INFO, WARN, ERROR, FATAL, CRITICAL] # 设置日志级别 [DEBUG, INFO, WARN, ERROR, FATAL, CRITICAL]
# LOG_LEVEL: INFO # LOG_LEVEL: INFO
# 和Jumpserver 保持心跳时间间隔 (seconds)
# HEARTBEAT_INTERVAL: 5
# SSH连接超时时间 (default 15 seconds) # SSH连接超时时间 (default 15 seconds)
# SSH_TIMEOUT: 15 # SSH_TIMEOUT: 15
......
...@@ -24,11 +24,11 @@ type Config struct { ...@@ -24,11 +24,11 @@ type Config struct {
SessionKeepDuration time.Duration `json:"TERMINAL_SESSION_KEEP_DURATION"` SessionKeepDuration time.Duration `json:"TERMINAL_SESSION_KEEP_DURATION"`
TelnetRegex string `json:"TERMINAL_TELNET_REGEX"` TelnetRegex string `json:"TERMINAL_TELNET_REGEX"`
MaxIdleTime time.Duration `json:"SECURITY_MAX_IDLE_TIME"` MaxIdleTime time.Duration `json:"SECURITY_MAX_IDLE_TIME"`
HeartbeatDuration time.Duration `json:"TERMINAL_HEARTBEAT_INTERVAL"`
SftpRoot string `json:"TERMINAL_SFTP_ROOT" yaml:"SFTP_ROOT"` SftpRoot string `json:"TERMINAL_SFTP_ROOT" yaml:"SFTP_ROOT"`
ShowHiddenFile bool `yaml:"SFTP_SHOW_HIDDEN_FILE"` ShowHiddenFile bool `yaml:"SFTP_SHOW_HIDDEN_FILE"`
ReuseConnection bool `yaml:"REUSE_CONNECTION"` ReuseConnection bool `yaml:"REUSE_CONNECTION"`
Name string `yaml:"NAME"` Name string `yaml:"NAME"`
SecretKey string `yaml:"SECRET_KEY"`
HostKeyFile string `yaml:"HOST_KEY_FILE"` HostKeyFile string `yaml:"HOST_KEY_FILE"`
CoreHost string `yaml:"CORE_HOST"` CoreHost string `yaml:"CORE_HOST"`
BootstrapToken string `yaml:"BOOTSTRAP_TOKEN"` BootstrapToken string `yaml:"BOOTSTRAP_TOKEN"`
...@@ -39,7 +39,6 @@ type Config struct { ...@@ -39,7 +39,6 @@ type Config struct {
AccessKey string `yaml:"ACCESS_KEY"` AccessKey string `yaml:"ACCESS_KEY"`
AccessKeyFile string `yaml:"ACCESS_KEY_FILE"` AccessKeyFile string `yaml:"ACCESS_KEY_FILE"`
LogLevel string `yaml:"LOG_LEVEL"` LogLevel string `yaml:"LOG_LEVEL"`
HeartbeatDuration time.Duration `yaml:"HEARTBEAT_INTERVAL"`
RootPath string `yaml:"ROOT_PATH"` RootPath string `yaml:"ROOT_PATH"`
Comment string `yaml:"COMMENT"` Comment string `yaml:"COMMENT"`
Language string `yaml:"LANG"` Language string `yaml:"LANG"`
...@@ -52,6 +51,9 @@ func (c *Config) EnsureConfigValid() { ...@@ -52,6 +51,9 @@ func (c *Config) EnsureConfigValid() {
if c.LanguageCode != "" && c.Language == "" { if c.LanguageCode != "" && c.Language == "" {
c.Language = c.LanguageCode c.Language = c.LanguageCode
} }
if c.Language == ""{
c.Language = "zh"
}
// 确保至少有一个认证 // 确保至少有一个认证
if !c.PublicKeyAuth && !c.PasswordAuth { if !c.PublicKeyAuth && !c.PasswordAuth {
c.PasswordAuth = true c.PasswordAuth = true
...@@ -126,7 +128,6 @@ var Conf = &Config{ ...@@ -126,7 +128,6 @@ var Conf = &Config{
HostKey: "", HostKey: "",
RootPath: rootPath, RootPath: rootPath,
Comment: "Coco", Comment: "Coco",
Language: "zh",
ReplayStorage: map[string]interface{}{"TYPE": "server"}, ReplayStorage: map[string]interface{}{"TYPE": "server"},
CommandStorage: map[string]interface{}{"TYPE": "server"}, CommandStorage: map[string]interface{}{"TYPE": "server"},
UploadFailedReplay: true, UploadFailedReplay: true,
......
...@@ -30,7 +30,7 @@ func (c *Coco) Start() { ...@@ -30,7 +30,7 @@ func (c *Coco) Start() {
func (c *Coco) Stop() { func (c *Coco) Stop() {
sshd.StopServer() sshd.StopServer()
httpd.StopHTTPServer() httpd.StopHTTPServer()
logger.Debug("Quit The Coco") logger.Info("Quit The Coco")
} }
func RunForever() { func RunForever() {
......
...@@ -19,7 +19,7 @@ func Initial() { ...@@ -19,7 +19,7 @@ func Initial() {
go uploadRemainReplay(conf.RootPath) go uploadRemainReplay(conf.RootPath)
} }
go keepHeartbeat(conf.HeartbeatDuration) go keepHeartbeat()
} }
// uploadRemainReplay 上传遗留的录像 // uploadRemainReplay 上传遗留的录像
...@@ -37,7 +37,7 @@ func uploadRemainReplay(rootPath string) { ...@@ -37,7 +37,7 @@ func uploadRemainReplay(rootPath string) {
} }
var sid string var sid string
filename := info.Name() filename := info.Name()
if len(filename) == 36{ if len(filename) == 36 {
sid = filename sid = filename
} }
if strings.HasSuffix(filename, ".replay.gz") { if strings.HasSuffix(filename, ".replay.gz") {
...@@ -46,8 +46,8 @@ func uploadRemainReplay(rootPath string) { ...@@ -46,8 +46,8 @@ func uploadRemainReplay(rootPath string) {
sid = sidName sid = sidName
} }
} }
if sid != ""{ if sid != "" {
data := map[string]interface{}{"id":sid,"date_end":info.ModTime().UTC().Format( data := map[string]interface{}{"id": sid, "date_end": info.ModTime().UTC().Format(
"2006-01-02 15:04:05 +0000")} "2006-01-02 15:04:05 +0000")}
service.FinishSession(data) service.FinishSession(data)
allRemainFiles[sid] = path allRemainFiles[sid] = path
...@@ -56,21 +56,21 @@ func uploadRemainReplay(rootPath string) { ...@@ -56,21 +56,21 @@ func uploadRemainReplay(rootPath string) {
return nil return nil
}) })
for sid, path := range allRemainFiles{ for sid, path := range allRemainFiles {
var absGzPath string var absGzPath string
if strings.HasSuffix(path, ".replay.gz") { if strings.HasSuffix(path, ".replay.gz") {
absGzPath = path absGzPath = path
}else if strings.HasSuffix(path, sid) { } else if strings.HasSuffix(path, sid) {
if err := ValidateRemainReplayFile(path); err != nil{ if err := ValidateRemainReplayFile(path); err != nil {
continue continue
} }
absGzPath = path + ".replay.gz" absGzPath = path + ".replay.gz"
if err := common.GzipCompressFile(path,absGzPath); err != nil{ if err := common.GzipCompressFile(path, absGzPath); err != nil {
continue continue
} }
_ = os.Remove(path) _ = os.Remove(path)
} }
relayRecord := &proxy.ReplyRecorder{SessionID:sid} relayRecord := &proxy.ReplyRecorder{SessionID: sid}
relayRecord.AbsGzFilePath = absGzPath relayRecord.AbsGzFilePath = absGzPath
relayRecord.Target, _ = filepath.Rel(path, rootPath) relayRecord.Target, _ = filepath.Rel(path, rootPath)
relayRecord.UploadGzipFile(3) relayRecord.UploadGzipFile(3)
...@@ -79,44 +79,41 @@ func uploadRemainReplay(rootPath string) { ...@@ -79,44 +79,41 @@ func uploadRemainReplay(rootPath string) {
} }
// keepHeartbeat 保持心跳 // keepHeartbeat 保持心跳
func keepHeartbeat(interval time.Duration) { func keepHeartbeat() {
tick := time.Tick(interval * time.Second)
for { for {
select { time.Sleep(config.GetConf().HeartbeatDuration * time.Second)
case <-tick: data := proxy.GetAliveSessions()
data := proxy.GetAliveSessions() tasks := service.TerminalHeartBeat(data)
tasks := service.TerminalHeartBeat(data) if len(tasks) != 0 {
if len(tasks) != 0 { for _, task := range tasks {
for _, task := range tasks { proxy.HandleSessionTask(task)
proxy.HandleSessionTask(task)
}
} }
} }
} }
} }
func ValidateRemainReplayFile(path string) error{ func ValidateRemainReplayFile(path string) error {
f, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND,os.ModePerm) f, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND, os.ModePerm)
if err != nil{ if err != nil {
return err return err
} }
defer f.Close() defer f.Close()
tmp := make([]byte,1) tmp := make([]byte, 1)
_, err = f.Seek(-1,2) _, err = f.Seek(-1, 2)
if err != nil{ if err != nil {
return err return err
} }
_, err = f.Read(tmp) _, err = f.Read(tmp)
if err != nil{ if err != nil {
return err return err
} }
switch string(tmp) { switch string(tmp) {
case "}": case "}":
return nil return nil
case ",": case ",":
_,err = f.Write([]byte(`"0":""}`)) _, err = f.Write([]byte(`"0":""}`))
default: default:
_,err = f.Write([]byte(`}`)) _, err = f.Write([]byte(`}`))
} }
return err return err
} }
\ No newline at end of file
...@@ -42,18 +42,27 @@ type SSHClient struct { ...@@ -42,18 +42,27 @@ type SSHClient struct {
} }
func (s *SSHClient) refCount() int { func (s *SSHClient) refCount() int {
if s.isClose() {
return 0
}
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
return s.ref return s.ref
} }
func (s *SSHClient) increaseRef() { func (s *SSHClient) increaseRef() {
if s.isClose() {
return
}
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.ref++ s.ref++
} }
func (s *SSHClient) decreaseRef() { func (s *SSHClient) decreaseRef() {
if s.isClose() {
return
}
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.ref-- s.ref--
...@@ -64,21 +73,28 @@ func (s *SSHClient) NewSession() (*gossh.Session, error) { ...@@ -64,21 +73,28 @@ func (s *SSHClient) NewSession() (*gossh.Session, error) {
} }
func (s *SSHClient) Close() error { func (s *SSHClient) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.ref > 1 {
return nil
}
select { select {
case <-s.closed: case <-s.closed:
return nil return nil
default: default:
close(s.closed) close(s.closed)
} }
s.mu.Lock()
s.ref = 0
s.mu.Unlock()
return s.client.Close() return s.client.Close()
} }
func KeepAlive(c *gossh.Client, closed <-chan struct{}, keepInterval time.Duration) { func (s *SSHClient) isClose() bool {
select {
case <-s.closed:
return true
default:
return false
}
}
func KeepAlive(c *SSHClient, closed <-chan struct{}, keepInterval time.Duration) {
t := time.NewTicker(keepInterval * time.Second) t := time.NewTicker(keepInterval * time.Second)
defer t.Stop() defer t.Stop()
logger.Debugf("SSH client %p keep alive start", c) logger.Debugf("SSH client %p keep alive start", c)
...@@ -88,9 +104,11 @@ func KeepAlive(c *gossh.Client, closed <-chan struct{}, keepInterval time.Durati ...@@ -88,9 +104,11 @@ func KeepAlive(c *gossh.Client, closed <-chan struct{}, keepInterval time.Durati
case <-closed: case <-closed:
return return
case <-t.C: case <-t.C:
_, _, err := c.SendRequest("keepalive@jumpserver.org", true, nil) _, _, err := c.client.SendRequest("keepalive@openssh.com", true, nil)
if err != nil { if err != nil {
logger.Error("SSH client %p keep alive err: ", c, err.Error()) logger.Errorf("SSH client %p keep alive err: %s", c, err.Error())
_ = c.Close()
RecycleClient(c)
return return
} }
} }
...@@ -236,12 +254,12 @@ func newClient(asset *model.Asset, systemUser *model.SystemUser, timeout time.Du ...@@ -236,12 +254,12 @@ func newClient(asset *model.Asset, systemUser *model.SystemUser, timeout time.Du
return nil, err return nil, err
} }
closed := make(chan struct{}) closed := make(chan struct{})
go KeepAlive(conn, closed, 60) client = &SSHClient{client: conn, username: systemUser.Username,
return &SSHClient{ mu: new(sync.RWMutex),
client: conn, ref: 1,
username: systemUser.Username, closed: closed}
mu: new(sync.RWMutex), go KeepAlive(client, closed, 60)
closed: closed,}, nil return client, nil
} }
func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUser, timeout time.Duration, func NewClient(user *model.User, asset *model.Asset, systemUser *model.SystemUser, timeout time.Duration,
...@@ -281,30 +299,27 @@ func getClientFromCache(key string) (client *SSHClient) { ...@@ -281,30 +299,27 @@ func getClientFromCache(key string) (client *SSHClient) {
func setClientCache(key string, client *SSHClient) { func setClientCache(key string, client *SSHClient) {
clientLock.Lock() clientLock.Lock()
sshClients[key] = client sshClients[key] = client
client.increaseRef()
client.key = key client.key = key
clientLock.Unlock() clientLock.Unlock()
} }
func RecycleClient(client *SSHClient) { func RecycleClient(client *SSHClient) {
// 0, 1: delete Cache, close client. // ref: 0 delete Cache, close client.
// default: client ref decrease. // default: client ref decrease.
if client == nil { if client == nil {
return return
} }
switch client.refCount() { client.decreaseRef()
case 0, 1: if client.refCount() == 0 {
clientLock.Lock() clientLock.Lock()
delete(sshClients, client.key) delete(sshClients, client.key)
clientLock.Unlock() clientLock.Unlock()
err := client.Close() if err := client.Close(); err != nil {
if err != nil { logger.Errorf("Close ssh client %p err: %s", client, err.Error())
logger.Error("Failed to close client err: ", err.Error())
} else { } else {
logger.Debug("Success to close client") logger.Infof("Close ssh client %p", client)
} }
default: }else {
client.decreaseRef() logger.Debugf("SSH client %p ref -1, current ref: %s", client, client.refCount())
logger.Debugf("Reuse client %p Current ref: %d", client, client.refCount())
} }
} }
...@@ -587,6 +587,7 @@ func (u *UserSftp) GetSftpClient(asset *model.Asset, sysUser *model.SystemUser) ...@@ -587,6 +587,7 @@ func (u *UserSftp) GetSftpClient(asset *model.Asset, sysUser *model.SystemUser)
} }
sftpClient, err := sftp.NewClient(sshClient.client) sftpClient, err := sftp.NewClient(sshClient.client)
if err != nil { if err != nil {
RecycleClient(sshClient)
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment