Commit 88f5215a authored by Eric's avatar Eric

[update] 调整结构

parent 1461c7d1
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"cocogo/pkg/config" "cocogo/pkg/config"
"cocogo/pkg/logger" "cocogo/pkg/logger"
"cocogo/pkg/proxy"
"cocogo/pkg/service" "cocogo/pkg/service"
"cocogo/pkg/sshd" "cocogo/pkg/sshd"
) )
...@@ -45,5 +44,5 @@ func loadingBoot() { ...@@ -45,5 +44,5 @@ func loadingBoot() {
config.Initial() config.Initial()
logger.Initial() logger.Initial()
service.Initial() service.Initial()
proxy.Initial() Initial()
} }
package proxy package coco
import ( import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
"cocogo/pkg/common" "cocogo/pkg/common"
"cocogo/pkg/config" "cocogo/pkg/config"
"cocogo/pkg/logger" "cocogo/pkg/logger"
"cocogo/pkg/model" "cocogo/pkg/proxy"
"cocogo/pkg/service" "cocogo/pkg/service"
) )
var sessionMap = make(map[string]*SwitchSession)
var lock = new(sync.RWMutex)
func Initial() { func Initial() {
conf := config.GetConf() conf := config.GetConf()
if conf.UploadFailedReplay { if conf.UploadFailedReplay {
go uploadFailedReplay(conf.RootPath) go uploadFailedReplay(conf.RootPath)
} }
go KeepHeartbeat(conf.HeartbeatDuration) go keepHeartbeat(conf.HeartbeatDuration)
} }
func uploadFailedReplay(rootPath string) { func uploadFailedReplay(rootPath string) {
...@@ -43,10 +39,10 @@ func uploadFailedReplay(rootPath string) { ...@@ -43,10 +39,10 @@ func uploadFailedReplay(rootPath string) {
if strings.HasSuffix(filename, ".replay.gz") { if strings.HasSuffix(filename, ".replay.gz") {
sid := strings.Split(filename, ".")[0] sid := strings.Split(filename, ".")[0]
if len(sid) == 36 { if len(sid) == 36 {
relayRecord := NewReplyRecord(sid) relayRecord := proxy.NewReplyRecord(sid)
relayRecord.absGzFilePath = path relayRecord.AbsGzFilePath = path
relayRecord.target, _ = filepath.Rel(path, rootPath) relayRecord.Target, _ = filepath.Rel(path, rootPath)
go relayRecord.uploadGzipFile(3) go relayRecord.UploadGzipFile(3)
} }
} }
return nil return nil
...@@ -54,61 +50,19 @@ func uploadFailedReplay(rootPath string) { ...@@ -54,61 +50,19 @@ func uploadFailedReplay(rootPath string) {
logger.Debug("upload Replay Done") logger.Debug("upload Replay Done")
} }
func KeepHeartbeat(interval int) { func keepHeartbeat(interval int) {
tick := time.Tick(time.Duration(interval) * time.Second) tick := time.Tick(time.Duration(interval) * time.Second)
for { for {
select { select {
case <-tick: case <-tick:
data := GetAliveSessions() data := proxy.GetAliveSessions()
tasks := service.TerminalHeartBeat(data) tasks := service.TerminalHeartBeat(data)
if len(tasks) != 0 { if len(tasks) != 0 {
for _, task := range tasks { for _, task := range tasks {
HandlerSessionTask(task) proxy.HandlerSessionTask(task)
}
} }
} }
} }
}
func HandlerSessionTask(task model.TerminalTask) {
switch task.Name {
case "kill_session":
KillSession(task.Args)
service.FinishTask(task.Id)
default:
}
}
func KillSession(sessionID string) {
lock.RLock()
defer lock.RUnlock()
if sw, ok := sessionMap[sessionID]; ok {
sw.Terminate()
} }
}
func GetAliveSessions() []string {
lock.RLock()
defer lock.RUnlock()
sids := make([]string, 0, len(sessionMap))
for sid := range sessionMap {
sids = append(sids, sid)
}
return sids
}
func RemoveSession(sw *SwitchSession) {
lock.Lock()
defer lock.Unlock()
delete(sessionMap, sw.Id)
}
func AddSession(sw *SwitchSession) {
lock.Lock()
defer lock.Unlock()
sessionMap[sw.Id] = sw
} }
...@@ -95,8 +95,8 @@ type ReplyRecorder struct { ...@@ -95,8 +95,8 @@ type ReplyRecorder struct {
sessionID string sessionID string
absFilePath string absFilePath string
absGzFilePath string AbsGzFilePath string
target string Target string
file *os.File file *os.File
timeStartNano int64 timeStartNano int64
...@@ -126,8 +126,8 @@ func (r *ReplyRecorder) prepare() { ...@@ -126,8 +126,8 @@ func (r *ReplyRecorder) prepare() {
replayDir := filepath.Join(rootPath, "data", "replays", today) replayDir := filepath.Join(rootPath, "data", "replays", today)
r.absFilePath = filepath.Join(replayDir, sessionId) r.absFilePath = filepath.Join(replayDir, sessionId)
r.absGzFilePath = filepath.Join(replayDir, gzFileName) r.AbsGzFilePath = filepath.Join(replayDir, gzFileName)
r.target = strings.Join([]string{today, gzFileName}, "/") r.Target = strings.Join([]string{today, gzFileName}, "/")
r.timeStartNano = time.Now().UnixNano() r.timeStartNano = time.Now().UnixNano()
err := common.EnsureDirExist(replayDir) err := common.EnsureDirExist(replayDir)
...@@ -160,21 +160,21 @@ func (r *ReplyRecorder) uploadReplay() { ...@@ -160,21 +160,21 @@ func (r *ReplyRecorder) uploadReplay() {
_ = os.Remove(r.absFilePath) _ = os.Remove(r.absFilePath)
return return
} }
if !common.FileExists(r.absGzFilePath) { if !common.FileExists(r.AbsGzFilePath) {
logger.Debug("Compress replay file: ", r.absFilePath) logger.Debug("Compress replay file: ", r.absFilePath)
_ = common.GzipCompressFile(r.absFilePath, r.absGzFilePath) _ = common.GzipCompressFile(r.absFilePath, r.AbsGzFilePath)
_ = os.Remove(r.absFilePath) _ = os.Remove(r.absFilePath)
} }
r.uploadGzipFile(3) r.UploadGzipFile(3)
} }
func (r *ReplyRecorder) uploadGzipFile(maxRetry int) { func (r *ReplyRecorder) UploadGzipFile(maxRetry int) {
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
logger.Debug("Upload replay file: ", r.absGzFilePath) logger.Debug("Upload replay file: ", r.AbsGzFilePath)
err := r.storage.Upload(r.absGzFilePath, r.target) err := r.storage.Upload(r.AbsGzFilePath, r.Target)
if err == nil { if err == nil {
_ = os.Remove(r.absGzFilePath) _ = os.Remove(r.AbsGzFilePath)
break break
} }
// 如果还是失败,使用备用storage再传一次 // 如果还是失败,使用备用storage再传一次
...@@ -184,7 +184,7 @@ func (r *ReplyRecorder) uploadGzipFile(maxRetry int) { ...@@ -184,7 +184,7 @@ func (r *ReplyRecorder) uploadGzipFile(maxRetry int) {
} }
logger.Errorf("Using back off storage retry upload") logger.Errorf("Using back off storage retry upload")
r.storage = r.backOffStorage r.storage = r.backOffStorage
r.uploadGzipFile(3) r.UploadGzipFile(3)
break break
} }
} }
......
package proxy
import (
"cocogo/pkg/model"
"cocogo/pkg/service"
"sync"
)
var sessionMap = make(map[string]*SwitchSession)
var lock = new(sync.RWMutex)
func HandlerSessionTask(task model.TerminalTask) {
switch task.Name {
case "kill_session":
KillSession(task.Args)
service.FinishTask(task.Id)
default:
}
}
func KillSession(sessionID string) {
lock.RLock()
defer lock.RUnlock()
if sw, ok := sessionMap[sessionID]; ok {
sw.Terminate()
}
}
func GetAliveSessions() []string {
lock.RLock()
defer lock.RUnlock()
sids := make([]string, 0, len(sessionMap))
for sid := range sessionMap {
sids = append(sids, sid)
}
return sids
}
func RemoveSession(sw *SwitchSession) {
lock.Lock()
defer lock.Unlock()
delete(sessionMap, sw.Id)
}
func AddSession(sw *SwitchSession) {
lock.Lock()
defer lock.Unlock()
sessionMap[sw.Id] = sw
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment