recorder.go 4.15 KB
package proxy

import (
	"encoding/json"
	"fmt"
	"os"
	"path/filepath"
	"strings"
	"time"

	"cocogo/pkg/common"
	"cocogo/pkg/config"
	"cocogo/pkg/logger"
	"cocogo/pkg/model"
)

func NewCommandRecorder(sid string) (recorder *CommandRecorder) {
	recorder = &CommandRecorder{sessionID: sid}
	recorder.initial()
	return recorder
}

func NewReplyRecord(sid string) (recorder *ReplyRecorder) {
	recorder = &ReplyRecorder{sessionID: sid}
	recorder.initial()
	return recorder
}

type CommandRecorder struct {
	sessionID string
	storage   CommandStorage

	queue  chan *model.Command
	closed chan struct{}
}

func (c *CommandRecorder) initial() {
	c.queue = make(chan *model.Command, 10)
	c.storage = NewCommandStorage()
	c.closed = make(chan struct{})
	//c.storage, _ = NewFileCommandStorage("/tmp/abc.log")
	go c.record()
}

func (c *CommandRecorder) Record(command *model.Command) {
	c.queue <- command
}

func (c *CommandRecorder) End() {
	select {
	case <-c.closed:
		return
	default:
	}
	close(c.closed)
}

func (c *CommandRecorder) record() {
	cmdList := make([]*model.Command, 0)
	maxRetry := 0
	for {
		select {
		case <-c.closed:
			if len(cmdList) == 0 {
				return
			}
		case p, ok := <-c.queue:
			if !ok {
				logger.Debug("Session command recorder close: ", c.sessionID)
				return
			}
			cmdList = append(cmdList, p)
			if len(cmdList) < 5 {
				continue
			}
		case <-time.After(time.Second * 5):
			if len(cmdList) == 0 {
				continue
			}
		}
		err := c.storage.BulkSave(cmdList)
		if err == nil {
			cmdList = cmdList[:0]
			continue
		}

		if maxRetry > 5 {
			cmdList = cmdList[1:]
		}
		maxRetry++
	}
}

type ReplyRecorder struct {
	sessionID string

	absFilePath   string
	AbsGzFilePath string
	Target        string
	file          *os.File
	timeStartNano int64

	storage        ReplayStorage
	backOffStorage ReplayStorage
}

func (r *ReplyRecorder) initial() {
	r.storage = NewReplayStorage()
	r.backOffStorage = defaultReplayStorage
	r.prepare()
}

func (r *ReplyRecorder) Record(b []byte) {
	if len(b) > 0 {
		delta := float64(time.Now().UnixNano()-r.timeStartNano) / 1000 / 1000 / 1000
		data, _ := json.Marshal(string(b))
		_, _ = r.file.WriteString(fmt.Sprintf(`"%.3f":%s,`, delta, data))
	}
}

func (r *ReplyRecorder) prepare() {
	sessionId := r.sessionID
	rootPath := config.GetConf().RootPath
	today := time.Now().UTC().Format("2006-01-02")
	gzFileName := sessionId + ".replay.gz"
	replayDir := filepath.Join(rootPath, "data", "replays", today)

	r.absFilePath = filepath.Join(replayDir, sessionId)
	r.AbsGzFilePath = filepath.Join(replayDir, gzFileName)
	r.Target = strings.Join([]string{today, gzFileName}, "/")
	r.timeStartNano = time.Now().UnixNano()

	err := common.EnsureDirExist(replayDir)
	if err != nil {
		logger.Errorf("Create dir %s error: %s\n", replayDir, err)
		return
	}

	logger.Debug("Replay file path: ", r.absFilePath)
	r.file, err = os.Create(r.absFilePath)
	if err != nil {
		logger.Errorf("Create file %s error: %s\n", r.absFilePath, err)
	}
	_, _ = r.file.Write([]byte("{"))
}

func (r *ReplyRecorder) End() {
	_, _ = r.file.WriteString(fmt.Sprintf(`"%.3f":%s}`, 0.0, ""))
	_ = r.file.Close()
	go r.uploadReplay()
}

func (r *ReplyRecorder) uploadReplay() {
	if !common.FileExists(r.absFilePath) {
		logger.Debug("Replay file not found, passed: ", r.absFilePath)
		return
	}
	if stat, err := os.Stat(r.absFilePath); err == nil && stat.Size() == 0 {
		logger.Debug("Replay file is empty, removed: ", r.absFilePath)
		_ = os.Remove(r.absFilePath)
		return
	}
	if !common.FileExists(r.AbsGzFilePath) {
		logger.Debug("Compress replay file: ", r.absFilePath)
		_ = common.GzipCompressFile(r.absFilePath, r.AbsGzFilePath)
		_ = os.Remove(r.absFilePath)
	}
	r.UploadGzipFile(3)

}

func (r *ReplyRecorder) UploadGzipFile(maxRetry int) {
	for i := 0; i <= maxRetry; i++ {
		logger.Debug("Upload replay file: ", r.AbsGzFilePath)
		err := r.storage.Upload(r.AbsGzFilePath, r.Target)
		if err == nil {
			_ = os.Remove(r.AbsGzFilePath)
			break
		}
		// 如果还是失败,使用备用storage再传一次
		if i == maxRetry {
			if r.storage == r.backOffStorage {
				break
			}
			logger.Errorf("Using back off storage retry upload")
			r.storage = r.backOffStorage
			r.UploadGzipFile(3)
			break
		}
	}
}