Commit ceb7cd0f authored by ibuler's avatar ibuler

[Update] 修改一些结构

parents 23e31e79 89842d0d
......@@ -6,19 +6,10 @@ import (
"cocogo/pkg/sshd"
)
var (
conf *config.Config
appService *auth.Service
)
func init() {
configFile := "config.yml"
conf = config.LoadFromYaml(configFile)
appService = auth.NewAuthService(conf)
appService.LoadAccessKey()
appService.EnsureValidAuth()
appService.LoadTerminalConfig()
sshd.Initial(conf, appService)
config.Initial()
auth.Initial()
sshd.Initial()
}
func main() {
......
......@@ -7,7 +7,9 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"os"
......@@ -458,3 +460,89 @@ func (s *Service) ValidateUserAssetPermission(userID, systemUserID, AssetID stri
}
return res.Msg
}
func (s *Service) PushSessionReplay(gZipFile, sessionID string) error {
fp, err := os.Open(gZipFile)
if err != nil {
return err
}
defer fp.Close()
fi, err := fp.Stat()
if err != nil {
return err
}
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("file", fi.Name())
if err != nil {
return err
}
_, _ = io.Copy(part, fp)
err = writer.Close() // close writer before POST request
if err != nil {
return err
}
url := fmt.Sprintf("%s%s", s.Conf.CoreHost, fmt.Sprintf(SessionReplay, sessionID))
req, err := http.NewRequest("POST", url, body)
currentDate := HTTPGMTDate()
req.Header.Add("Content-Type", writer.FormDataContentType())
req.Header.Set("Date", currentDate)
req.Header.Set("Authorization", s.auth.Signature(currentDate))
resp, err := s.http.Do(req)
defer resp.Body.Close()
if err != nil {
log.Info("Send HTTP Request failed:", err)
return err
}
log.Info("PushSessionReplay:", err)
return err
}
func (s *Service) CreateSession(data []byte) bool {
url := fmt.Sprintf("%s%s", s.Conf.CoreHost, SessionList)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json")
currentDate := HTTPGMTDate()
req.Header.Set("Date", currentDate)
req.Header.Set("Authorization", s.auth.Signature(currentDate))
resp, err := s.http.Do(req)
defer resp.Body.Close()
if err != nil {
log.Error("create Session err: ", err)
return false
}
if resp.StatusCode == 201 {
log.Info("create Session 201")
return true
}
return false
}
func (s *Service) FinishSession(id string, jsonData []byte) bool {
url := fmt.Sprintf("%s%s", s.Conf.CoreHost, fmt.Sprintf(SessionDetail, id))
res, err := s.SendHTTPRequest("PATCH", url, jsonData)
fmt.Printf("%s", res)
if err != nil {
log.Error(err)
return false
}
return true
}
func (s *Service) FinishReply(id string) bool {
data := map[string]bool{"has_replay": true}
jsonData, _ := json.Marshal(data)
url := fmt.Sprintf("%s%s", s.Conf.CoreHost, fmt.Sprintf(SessionDetail, id))
_, err := s.SendHTTPRequest("PATCH", url, jsonData)
if err != nil {
log.Error(err)
return false
}
return true
}
......@@ -31,7 +31,7 @@ func MD5Encode(b []byte) string {
func MakeSureDirExit(filePath string) {
dirPath := filepath.Dir(filePath)
if _, err := os.Stat(dirPath); os.IsNotExist(err) {
err = os.Mkdir(dirPath, os.ModePerm)
err = os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
log.Info("could not create dir path:", dirPath)
os.Exit(1)
......
package auth
import (
"cocogo/pkg/config"
)
var appService *Service
func Initial() {
conf := config.GetGlobalConfig()
appService = NewAuthService(conf)
appService.LoadAccessKey()
appService.EnsureValidAuth()
appService.LoadTerminalConfig()
}
func GetGlobalService() *Service {
if appService == nil {
Initial()
}
return appService
}
......@@ -11,6 +11,10 @@ const (
SystemUserAuthUrl = "/api/assets/v1/system-user/%s/auth-info/" // 该系统用户的授权
ValidateUserAssetPermission = "/api/perms/v1/asset-permission/user/validate/" //0不使用缓存 1 使用缓存 2 刷新缓存
SessionList = "/api/terminal/v1/sessions/" //上传创建的资产会话session id
SessionDetail = "/api/terminal/v1/sessions/%s/" // finish session的时候发送
SessionReplay = "/api/terminal/v1/sessions/%s/replay/" //上传录像
)
/*
......
......@@ -5,6 +5,7 @@ import (
log "github.com/sirupsen/logrus"
"io/ioutil"
"os"
"sync"
"gopkg.in/yaml.v2"
)
......@@ -26,58 +27,64 @@ type Config struct {
}
var mux = new(sync.RWMutex)
var name, _ = os.Hostname()
var rootPath, _ = os.Getwd()
var conf = &Config{
Name: name,
CoreHost: "http://localhost:8080",
BootstrapToken: "",
BindHost: "0.0.0.0",
SshPort: 2222,
HTTPPort: 5000,
CustomerAccessKey: "",
AccessKeyFile: "data/keys/.access_key",
LogLevel: "DEBUG",
RootPath: rootPath,
Comment: "Coco",
TermConfig: &TerminalConfig{},
}
func LoadFromYaml(filepath string) *Config {
c := createDefaultConfig()
body, err := ioutil.ReadFile(filepath)
if err != nil {
log.Errorf("Not found file: %s", filepath)
os.Exit(1)
}
e := yaml.Unmarshal(body, &c)
e := yaml.Unmarshal(body, conf)
if e != nil {
fmt.Println("Load yaml err")
os.Exit(1)
}
return &c
return conf
}
func createDefaultConfig() Config {
name, _ := os.Hostname()
rootPath, _ := os.Getwd()
return Config{
Name: name,
CoreHost: "http://localhost:8080",
BootstrapToken: "",
BindHost: "0.0.0.0",
SshPort: 2222,
HTTPPort: 5000,
CustomerAccessKey: "",
AccessKeyFile: "data/keys/.access_key",
LogLevel: "DEBUG",
RootPath: rootPath,
Comment: "Coco",
TermConfig: &TerminalConfig{},
}
func GetGlobalConfig() *Config {
mux.RLock()
defer mux.RUnlock()
return conf
}
func SetGlobalConfig(c Config) {
mux.Lock()
conf = &c
mux.Unlock()
}
type TerminalConfig struct {
AssetListPageSize string `json:"TERMINAL_ASSET_LIST_PAGE_SIZE"`
AssetListSortBy string `json:"TERMINAL_ASSET_LIST_SORT_BY"`
CommandStorage Storage `json:"TERMINAL_COMMAND_STORAGE"`
HeaderTitle string `json:"TERMINAL_HEADER_TITLE"`
HeartBeatInterval int `json:"TERMINAL_HEARTBEAT_INTERVAL"`
HostKey string `json:"TERMINAL_HOST_KEY"`
PasswordAuth bool `json:"TERMINAL_PASSWORD_AUTH"`
PublicKeyAuth bool `json:"TERMINAL_PUBLIC_KEY_AUTH"`
RePlayStorage Storage `json:"TERMINAL_REPLAY_STORAGE"`
SessionKeepDuration int `json:"TERMINAL_SESSION_KEEP_DURATION"`
TelnetRegex string `json:"TERMINAL_TELNET_REGEX"`
SecurityMaxIdleTime int `json:"SECURITY_MAX_IDLE_TIME"`
}
type Storage struct {
Type string `json:"TYPE"`
type TerminalConfig struct {
AssetListPageSize string `json:"TERMINAL_ASSET_LIST_PAGE_SIZE"`
AssetListSortBy string `json:"TERMINAL_ASSET_LIST_SORT_BY"`
CommandStorage map[string]string `json:"TERMINAL_COMMAND_STORAGE"`
HeaderTitle string `json:"TERMINAL_HEADER_TITLE"`
HeartBeatInterval int `json:"TERMINAL_HEARTBEAT_INTERVAL"`
HostKey string `json:"TERMINAL_HOST_KEY"`
PasswordAuth bool `json:"TERMINAL_PASSWORD_AUTH"`
PublicKeyAuth bool `json:"TERMINAL_PUBLIC_KEY_AUTH"`
RePlayStorage map[string]string `json:"TERMINAL_REPLAY_STORAGE"`
SessionKeepDuration int `json:"TERMINAL_SESSION_KEEP_DURATION"`
TelnetRegex string `json:"TERMINAL_TELNET_REGEX"`
SecurityMaxIdleTime int `json:"SECURITY_MAX_IDLE_TIME"`
}
package config
func Initial() {
configFile := "config.yml"
conf = LoadFromYaml(configFile)
}
package record
import (
"cocogo/pkg/auth"
"cocogo/pkg/config"
"cocogo/pkg/storage"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
)
func NewReplyRecord(sessionID string) *Reply {
rootPath := config.GetGlobalConfig().RootPath
currentData := time.Now().UTC().Format("2006-01-02")
gzFileName := sessionID + ".replay.gz"
absFilePath := filepath.Join(rootPath, "data", "replays", currentData, sessionID)
absGzFilePath := filepath.Join(rootPath, "data", "replays", currentData, gzFileName)
target := strings.Join([]string{currentData, gzFileName}, "/")
return &Reply{
SessionID: sessionID,
FileName: sessionID,
absFilePath: absFilePath,
gzFileName: gzFileName,
absGzFilePath: absGzFilePath,
StartTime: time.Now().UTC(),
target: target,
}
}
type Reply struct {
SessionID string
FileName string
gzFileName string
absFilePath string
absGzFilePath string
target string
WriteF *os.File
StartTime time.Time
}
func (r *Reply) Record(b []byte) {
interval := time.Now().UTC().Sub(r.StartTime).Seconds()
data, _ := json.Marshal(string(b))
_, _ = r.WriteF.WriteString(fmt.Sprintf("\"%0.6f\":%s,", interval, data))
}
func (r *Reply) StartRecord() {
auth.MakeSureDirExit(r.absFilePath)
r.WriteF, _ = os.Create(r.absFilePath)
_, _ = r.WriteF.Write([]byte("{"))
}
func (r *Reply) EndRecord(ctx context.Context) {
select {
case <-ctx.Done():
_, _ = r.WriteF.WriteString(`"0":""}`)
_ = r.WriteF.Close()
}
r.uploadReplay()
}
func (r *Reply) uploadReplay() {
_ = GzipCompressFile(r.absFilePath, r.absGzFilePath)
if sto := storage.NewStorageServer(); sto != nil {
sto.Upload(r.absGzFilePath, r.target)
}
_ = os.Remove(r.absGzFilePath)
}
func GzipCompressFile(srcPath, dstPath string) error {
srcf, err := os.Open(srcPath)
if err != nil {
return err
}
dstf, err := os.Create(dstPath)
if err != nil {
return err
}
zw := gzip.NewWriter(dstf)
zw.Name = dstPath
zw.ModTime = time.Now().UTC()
_, err = io.Copy(zw, srcf)
if err != nil {
return err
}
if err := zw.Close(); err != nil {
return err
}
return nil
}
......@@ -6,11 +6,15 @@ import (
"cocogo/pkg/transport"
"cocogo/pkg/userhome"
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
uuid "github.com/satori/go.uuid"
"github.com/olekukonko/tablewriter"
......@@ -38,7 +42,7 @@ func (d HelpInfo) displayHelpInfo(sess ssh.Session) {
type sshInteractive struct {
sess ssh.Session
term *terminal.Terminal
assetData sync.Map
assetData *sync.Map
user model.User
helpInfo HelpInfo
currentSearchAssets []model.Asset
......@@ -235,11 +239,11 @@ func (s *sshInteractive) StartDispatch() {
s.onceLoad.Do(func() {
if _, ok := Cached.Load(s.user.Id); !ok {
log.Info("first load this user asset data ")
s.loadUserAssets()
s.loadUserAssetNodes()
} else {
log.Info("first load this user asset data ")
go func() {
s.loadUserAssets()
s.loadUserAssetNodes()
......@@ -253,7 +257,7 @@ func (s *sshInteractive) StartDispatch() {
if assets, ok := s.assetData.Load(AssetsMapKey); ok {
s.displayAssets(assets.([]model.Asset))
s.currentSearchAssets = assets.([]model.Asset)
} else if assets, _ := Cached.Load(s.user.Id); ok {
} else if assets, ok := Cached.Load(s.user.Id); ok {
s.displayAssets(assets.([]model.Asset))
s.currentSearchAssets = assets.([]model.Asset)
}
......@@ -361,6 +365,7 @@ func (s *sshInteractive) Proxy(asset model.Asset, systemUser model.SystemUserAut
ptyReq, winChan, _ := s.sess.Pty()
sshConn := userhome.NewSSHConn(s.sess)
serverAuth := transport.ServerAuth{
SessionID: uuid.NewV4().String(),
IP: asset.Ip,
Port: asset.Port,
UserName: systemUser.UserName,
......@@ -372,7 +377,41 @@ func (s *sshInteractive) Proxy(asset model.Asset, systemUser model.SystemUserAut
log.Error(err)
return err
}
defer nodeConn.Close()
defer func() {
nodeConn.Close()
data := map[string]interface{}{
"id": nodeConn.SessionID,
"user": s.user.UserName,
"asset": asset.Hostname,
"org_id": asset.OrgID,
"system_user": systemUser.UserName,
"login_from": "ST",
"remote_addr": s.sess.RemoteAddr().String(),
"is_finished": true,
"date_start": nodeConn.StartTime.Format("2006-01-02 15:04:05 +0000"),
"date_end": time.Now().UTC().Format("2006-01-02 15:04:05 +0000"),
}
postData, _ := json.Marshal(data)
appService.FinishSession(nodeConn.SessionID, postData)
appService.FinishReply(nodeConn.SessionID)
}()
data := map[string]interface{}{
"id": nodeConn.SessionID,
"user": s.user.UserName,
"asset": asset.Hostname,
"org_id": asset.OrgID,
"system_user": systemUser.UserName,
"login_from": "ST",
"remote_addr": s.sess.RemoteAddr().String(),
"is_finished": false,
"date_start": nodeConn.StartTime.Format("2006-01-02 15:04:05 +0000"),
"date_end": nil,
}
postData, err := json.Marshal(data)
if !appService.CreateSession(postData) {
return err
}
memChan := transport.NewMemoryAgent(nodeConn)
......@@ -384,7 +423,6 @@ func (s *sshInteractive) Proxy(asset model.Asset, systemUser model.SystemUserAut
log.Error(err)
}
return err
}
func isSubstring(sArray []string, substr string) bool {
......
......@@ -5,7 +5,6 @@ import (
"cocogo/pkg/config"
"cocogo/pkg/model"
"io"
"runtime"
"strconv"
"sync"
"text/template"
......@@ -23,18 +22,19 @@ var (
displayTemplate *template.Template
log *logrus.Logger
Cached sync.Map
Cached *sync.Map
)
func Initial(config *config.Config, service *auth.Service) {
func Initial() {
displayTemplate = template.Must(template.New("display").Parse(welcomeTemplate))
conf = config
appService = service
serverSig = parsePrivateKey(config.TermConfig.HostKey)
Cached = new(sync.Map)
conf = config.GetGlobalConfig()
appService = auth.GetGlobalService()
serverSig = parsePrivateKey(conf.TermConfig.HostKey)
log = logrus.New()
if level, err := logrus.ParseLevel(config.LogLevel); err != nil {
if level, err := logrus.ParseLevel(conf.LogLevel); err != nil {
log.SetLevel(logrus.InfoLevel)
} else {
log.SetLevel(level)
......@@ -64,9 +64,10 @@ func connectHandler(sess ssh.Session) {
}
userInteractive := &sshInteractive{
sess: sess,
term: terminal.NewTerminal(sess, "Opt>"),
user: user,
sess: sess,
term: terminal.NewTerminal(sess, "Opt>"),
user: user,
assetData: new(sync.Map),
helpInfo: HelpInfo{UserName: sess.User(),
ColorCode: GreenColorCode,
ColorEnd: ColorEnd,
......@@ -76,8 +77,6 @@ func connectHandler(sess ssh.Session) {
log.Info("accept one session")
userInteractive.displayHelpInfo()
userInteractive.StartDispatch()
log.Info("finish one session")
runtime.GC()
} else {
_, err := io.WriteString(sess, "No PTY requested.\n")
......
package storage
import "cocogo/pkg/config"
type Storage interface {
Upload(gZipFile, target string)
}
func NewStorageServer() Storage {
conf := config.GetGlobalConfig()
switch conf.TermConfig.RePlayStorage["TYPE"] {
case "server":
return NewJmsStorage()
}
return nil
}
package storage
import (
"cocogo/pkg/auth"
"path/filepath"
"strings"
)
func NewJmsStorage() Storage {
appService := auth.GetGlobalService()
return &Server{
StorageType: "jms",
service: appService,
}
}
type Server struct {
StorageType string
service *auth.Service
}
func (s *Server) Upload(gZipFilePath, target string) {
sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0]
_ = s.service.PushSessionReplay(gZipFilePath, sessionID)
}
......@@ -2,15 +2,16 @@ package transport
import (
"cocogo/pkg/parser"
"cocogo/pkg/record"
"context"
"fmt"
"io"
"time"
"github.com/sirupsen/logrus"
"github.com/gliderlabs/ssh"
uuid "github.com/satori/go.uuid"
gossh "golang.org/x/crypto/ssh"
)
......@@ -19,6 +20,7 @@ var log = logrus.New()
const maxBufferSize = 1024 * 4
type ServerAuth struct {
SessionID string
IP string
Port int
UserName string
......@@ -72,17 +74,22 @@ func NewNodeConn(ctx context.Context, authInfo ServerAuth, ptyReq ssh.Pty, winCh
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(ctx)
subCtx, cancelFunc := context.WithCancel(ctx)
replyRecord := record.NewReplyRecord(authInfo.SessionID)
replyRecord.StartRecord()
go replyRecord.EndRecord(subCtx)
nConn := &NodeConn{
uuid: uuid.NewV4(),
SessionID: authInfo.SessionID,
client: c,
conn: s,
ctx: ctx,
ctx: subCtx,
ctxCancelFunc: cancelFunc,
stdin: nodeStdin,
stdout: nodeStdout,
tParser: parser.NewTerminalParser(),
replyRecord: replyRecord,
StartTime: time.Now().UTC(),
}
go nConn.windowChangeHandler(winCh)
......@@ -91,7 +98,7 @@ func NewNodeConn(ctx context.Context, authInfo ServerAuth, ptyReq ssh.Pty, winCh
// coco连接远程Node的连接
type NodeConn struct {
uuid uuid.UUID
SessionID string
client *gossh.Client
conn *gossh.Session
stdin io.Writer
......@@ -104,10 +111,9 @@ type NodeConn struct {
inSpecialStatus bool
ctx context.Context
ctxCancelFunc context.CancelFunc
}
func (n *NodeConn) UUID() uuid.UUID {
return n.uuid
replyRecord *record.Reply
cmdRecord *record.Command
StartTime time.Time
}
func (n *NodeConn) Wait() error {
......@@ -206,6 +212,7 @@ func (n *NodeConn) SendResponse(ctx context.Context, outChan chan<- []byte) {
copyBuf := make([]byte, len(buf[:nr]))
copy(copyBuf, buf[:nr])
outChan <- copyBuf
n.replyRecord.Record(buf[:nr])
}
}
......@@ -238,6 +245,7 @@ func (n *NodeConn) ReceiveRequest(ctx context.Context, inChan <-chan []byte, out
if n.FilterWhiteBlackRule(n.currentCommandInput) {
msg := fmt.Sprintf("\r\n cmd '%s' is forbidden \r\n", n.currentCommandInput)
outChan <- []byte(msg)
n.replyRecord.Record([]byte(msg))
ctrU := []byte{21, 13} // 清除行并换行
_, err := n.stdin.Write(ctrU)
if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment