Commit f7cf431a authored by ibuler's avatar ibuler

[Update] 修改storage结构

parent e7f9565f
......@@ -22,6 +22,7 @@ func Initial() {
go keepHeartbeat(conf.HeartbeatDuration)
}
// uploadRemainReplay 上传遗留的录像
func uploadRemainReplay(rootPath string) {
replayDir := filepath.Join(rootPath, "data", "replays")
err := common.EnsureDirExist(replayDir)
......@@ -31,7 +32,6 @@ func uploadRemainReplay(rootPath string) {
}
_ = filepath.Walk(replayDir, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return nil
}
......@@ -50,6 +50,7 @@ func uploadRemainReplay(rootPath string) {
logger.Debug("Upload remain replay done")
}
// keepHeartbeat 保持心跳
func keepHeartbeat(interval time.Duration) {
tick := time.Tick(interval * time.Second)
for {
......
......@@ -65,6 +65,16 @@ func (w *WebConn) AddClient(clientID string, conn *Client) {
w.Clients[clientID] = conn
}
func (w *WebConn) GetAllClients() (clients []string) {
clients = make([]string, 0)
w.mu.RLock()
defer w.mu.RUnlock()
for k := range w.Clients {
clients = append(clients, k)
}
return clients
}
func (w *WebConn) SetWinSize(winSize ssh.Window) {
w.mu.RLock()
defer w.mu.RUnlock()
......@@ -72,3 +82,18 @@ func (w *WebConn) SetWinSize(winSize ssh.Window) {
client.WinChan <- winSize
}
}
func (w *WebConn) Close() {
w.mu.Lock()
defer w.mu.Unlock()
clientsCopy := make(map[string]*Client)
for k, v := range w.Clients {
clientsCopy[k] = v
}
for k, client := range clientsCopy {
_ = client.Close()
delete(w.Clients, k)
}
}
......@@ -36,6 +36,7 @@ func AuthDecorator(handler http.HandlerFunc) http.HandlerFunc {
}
}
// OnConnectHandler 当websocket连接后触发
func OnConnectHandler(s socketio.Conn) error {
// 首次连接 1.获取当前用户的信息
logger.Debug("On connect trigger")
......@@ -70,11 +71,13 @@ func OnConnectHandler(s socketio.Conn) error {
return nil
}
// OnErrorHandler 当出现错误时触发
func OnErrorHandler(e error) {
logger.Debug("OnError trigger")
logger.Debug(e)
}
// OnHostHandler 当用户连接Host时触发
func OnHostHandler(s socketio.Conn, message HostMsg) {
// secret uuid string
logger.Debug("OnHost trigger")
......@@ -92,6 +95,7 @@ func OnHostHandler(s socketio.Conn, message HostMsg) {
clientID := uuid.NewV4().String()
emitMsg := EmitRoomMsg{clientID, secret}
s.Emit("room", emitMsg)
logger.Debug("Asset id: ", assetID)
asset := service.GetAsset(assetID)
systemUser := service.GetSystemUser(systemUserId)
......@@ -102,37 +106,39 @@ func OnHostHandler(s socketio.Conn, message HostMsg) {
ctx := s.Context().(WebContext)
userR, userW := io.Pipe()
conn := conns.GetWebConn(s.ID())
clientConn := &Client{
client := &Client{
Uuid: clientID, Cid: conn.Cid, user: conn.User,
WinChan: make(chan ssh.Window, 100), Conn: s,
UserRead: userR, UserWrite: userW,
pty: ssh.Pty{Term: "xterm", Window: win},
}
clientConn.WinChan <- win
conn.AddClient(clientID, clientConn)
proxySrv := proxy.ProxyServer{UserConn: clientConn, User: ctx.User, Asset: &asset, SystemUser: &systemUser}
client.WinChan <- win
conn.AddClient(clientID, client)
proxySrv := proxy.ProxyServer{
UserConn: client, User: ctx.User,
Asset: &asset, SystemUser: &systemUser,
}
go proxySrv.Proxy()
}
// OnTokenHandler 当使用token连接时触发
func OnTokenHandler(s socketio.Conn, message TokenMsg) {
logger.Debug("OnToken trigger")
winSiz := ssh.Window{Height: 24, Width: 80}
win := ssh.Window{Height: 24, Width: 80}
token := message.Token
secret := message.Secret
width, height := message.Size[0], message.Size[1]
if width != 0 {
winSiz.Width = width
win.Width = width
}
if height != 0 {
winSiz.Height = height
win.Height = height
}
clientID := uuid.NewV4().String()
emitMs := EmitRoomMsg{clientID, secret}
s.Emit("room", emitMs)
// check token
if token == "" || secret == "" {
msg := fmt.Sprintf("Token or secret is None: %s %s", token, secret)
dataMsg := EmitDataMsg{Data: msg, Room: clientID}
......@@ -140,7 +146,6 @@ func OnTokenHandler(s socketio.Conn, message TokenMsg) {
s.Emit("disconnect")
}
tokenUser := service.GetTokenAsset(token)
logger.Debug(tokenUser)
if tokenUser.UserId == "" {
msg := "Token info is none, maybe token expired"
dataMsg := EmitDataMsg{Data: msg, Room: clientID}
......@@ -148,10 +153,7 @@ func OnTokenHandler(s socketio.Conn, message TokenMsg) {
s.Emit("disconnect")
}
currentUser := service.GetUserProfile(tokenUser.UserId)
con := conns.GetWebConn(s.ID())
con.User = currentUser
currentUser := service.GetUserDetail(tokenUser.UserId)
asset := service.GetAsset(tokenUser.AssetId)
systemUser := service.GetSystemUser(tokenUser.SystemUserId)
......@@ -161,28 +163,35 @@ func OnTokenHandler(s socketio.Conn, message TokenMsg) {
userR, userW := io.Pipe()
conn := conns.GetWebConn(s.ID())
clientConn := Client{
conn.User = currentUser
client := Client{
Uuid: clientID, Cid: conn.Cid, user: conn.User,
WinChan: make(chan ssh.Window, 100), Conn: s,
UserRead: userR, UserWrite: userW, Closed: false,
UserRead: userR, UserWrite: userW,
pty: ssh.Pty{Term: "xterm", Window: win},
}
clientConn.WinChan <- winSiz
conn.AddClient(clientID, &clientConn)
client.WinChan <- win
conn.AddClient(clientID, &client)
// Todo: 构建proxy server 启动goroutine
proxySrv := proxy.ProxyServer{
UserConn: &client, User: currentUser,
Asset: &asset, SystemUser: &systemUser,
}
go proxySrv.Proxy()
}
// OnDataHandler 收发数据时触发
func OnDataHandler(s socketio.Conn, message DataMsg) {
logger.Debug("OnData trigger")
cid := message.Room
webconn := conns.GetWebConn(s.ID())
client := webconn.GetClient(cid)
conn := conns.GetWebConn(s.ID())
client := conn.GetClient(cid)
if client == nil {
return
}
_, _ = client.UserWrite.Write([]byte(message.Data))
}
// OnResizeHandler 用户窗口改变时触发
func OnResizeHandler(s socketio.Conn, message ResizeMsg) {
winSize := ssh.Window{Height: message.Height, Width: message.Width}
logger.Debugf("On resize event trigger: %d*%d", message.Width, message.Height)
......@@ -190,14 +199,15 @@ func OnResizeHandler(s socketio.Conn, message ResizeMsg) {
conn.SetWinSize(winSize)
}
// OnLogoutHandler 用户登出一个会话时触发
func OnLogoutHandler(s socketio.Conn, message string) {
logger.Debug("OnLogout trigger")
webConn := conns.GetWebConn(s.ID())
if webConn == nil {
conn := conns.GetWebConn(s.ID())
if conn == nil {
logger.Error("No conn found")
return
}
client := webConn.GetClient(message)
client := conn.GetClient(message)
if client == nil {
logger.Error("No client found")
return
......@@ -205,6 +215,9 @@ func OnLogoutHandler(s socketio.Conn, message string) {
_ = client.Close()
}
// OnDisconnect websocket断开后触发
func OnDisconnect(s socketio.Conn, msg string) {
logger.Debug("OnDisconnect trigger")
conn := conns.GetWebConn(s.ID())
conn.Close()
}
......@@ -80,6 +80,7 @@ func (p *Parser) initial() {
p.cmdRecordChan = make(chan [2]string, 1024)
}
// ParseStream 解析数据流
func (p *Parser) ParseStream() {
defer func() {
close(p.userOutputChan)
......@@ -136,6 +137,7 @@ func (p *Parser) parseInputState(b []byte) []byte {
return b
}
// parseCmdInput 解析命令的输入
func (p *Parser) parseCmdInput() {
data := p.cmdBuf.Bytes()
p.command = p.cmdInputParser.Parse(data)
......@@ -143,28 +145,24 @@ func (p *Parser) parseCmdInput() {
p.inputBuf.Reset()
}
// parseCmdOutput 解析命令输出
func (p *Parser) parseCmdOutput() {
data := p.outputBuf.Bytes()
p.output = p.cmdOutputParser.Parse(data)
p.outputBuf.Reset()
}
func (p *Parser) replaceInputNewLine(b []byte) []byte {
//b = bytes.Replace(b, []byte{'\r', '\r', '\n'}, []byte{'\r'}, -1)
//b = bytes.Replace(b, []byte{'\r', '\n'}, []byte{'\r'}, -1)
//b = bytes.Replace(b, []byte{'\n'}, []byte{'\r'}, -1)
return b
}
// ParseUserInput 解析用户的输入
func (p *Parser) ParseUserInput(b []byte) []byte {
p.once.Do(func() {
p.inputInitial = true
})
nb := p.replaceInputNewLine(b)
nb = p.parseInputState(nb)
nb := p.parseInputState(b)
return nb
}
// parseZmodemState 解析数据,查看是不是处于zmodem状态
// 处于zmodem状态不会再解析命令
func (p *Parser) parseZmodemState(b []byte) {
if len(b) < 25 {
return
......@@ -190,6 +188,7 @@ func (p *Parser) parseZmodemState(b []byte) {
}
}
// parseVimState 解析vim的状态,处于vim状态中,里面输入的命令不再记录
func (p *Parser) parseVimState(b []byte) {
if p.zmodemState == "" && !p.inVimState && bytes.Contains(b, vimEnterMark) {
p.inVimState = true
......@@ -219,15 +218,18 @@ func (p *Parser) splitCmdStream(b []byte) {
}
}
// ParseServerOutput 解析服务器输出
func (p *Parser) ParseServerOutput(b []byte) []byte {
p.splitCmdStream(b)
return b
}
// SetCMDFilterRules 设置命令过滤规则
func (p *Parser) SetCMDFilterRules(rules []model.SystemUserFilterRule) {
p.cmdFilterRules = rules
}
// IsCommandForbidden 判断命令是不是在过滤规则中
func (p *Parser) IsCommandForbidden() (string, bool) {
for _, rule := range p.cmdFilterRules {
allowed, cmd := rule.Match(p.command)
......@@ -243,12 +245,13 @@ func (p *Parser) IsCommandForbidden() (string, bool) {
return "", true
}
func (p *Parser) IsRecvState() bool {
func (p *Parser) IsInZmodemRecvState() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.zmodemState == zmodemStateRecv
}
// Close 关闭parser
func (p *Parser) Close() {
select {
case <-p.closed:
......
......@@ -35,6 +35,7 @@ func (cp *CmdParser) parsePS1(s string) string {
return ps1Pattern.ReplaceAllString(s, "")
}
// Parse 解析命令或输出
func (cp *CmdParser) Parse(b []byte) string {
cp.buf.Write(b)
cp.buf.WriteString("\r")
......
......@@ -24,8 +24,19 @@ type ProxyServer struct {
// getSystemUserAuthOrManualSet 获取系统用户的认证信息或手动设置
func (p *ProxyServer) getSystemUserAuthOrManualSet() {
if p.SystemUser.LoginMode == model.LoginModeManual ||
(p.SystemUser.Password == "" && p.SystemUser.PrivateKey == "") {
info := service.GetSystemUserAssetAuthInfo(p.SystemUser.Id, p.Asset.Id)
p.SystemUser.Password = info.Password
p.SystemUser.PrivateKey = info.PrivateKey
needManualSet := false
if p.SystemUser.LoginMode == model.LoginModeManual {
needManualSet = true
logger.Debugf("System user %s login mode is: %s", p.SystemUser.Name, model.LoginModeManual)
}
if p.SystemUser.Password == "" && p.SystemUser.PrivateKey == "" {
needManualSet = true
logger.Debugf("System user %s neither has password nor private key", p.SystemUser.Name)
}
if needManualSet {
term := utils.NewTerminal(p.UserConn, "password: ")
line, err := term.ReadPassword(fmt.Sprintf("%s's password: ", p.SystemUser.Username))
if err != nil {
......@@ -33,10 +44,6 @@ func (p *ProxyServer) getSystemUserAuthOrManualSet() {
}
p.SystemUser.Password = line
logger.Debug("Get password from user input: ", line)
} else {
info := service.GetSystemUserAssetAuthInfo(p.SystemUser.Id, p.Asset.Id)
p.SystemUser.Password = info.Password
p.SystemUser.PrivateKey = info.PrivateKey
}
}
......@@ -180,6 +187,22 @@ func (p *ProxyServer) preCheckRequisite() (ok bool) {
return true
}
// sendConnectErrorMsg 发送连接错误消息
func (p *ProxyServer) sendConnectErrorMsg(err error) {
msg := fmt.Sprintf("Connect asset %s error: %s", p.Asset.Hostname, err)
utils.IgnoreErrWriteString(p.UserConn, msg)
logger.Error(msg)
password := p.SystemUser.Password
if password != "" {
passwordLen := len(p.SystemUser.Password)
showLen := passwordLen / 2
hiddenLen := passwordLen - showLen
msg2 := fmt.Sprintf("Try password: %s", password[:showLen]+strings.Repeat("*", hiddenLen))
logger.Errorf(msg2)
}
return
}
// Proxy 代理
func (p *ProxyServer) Proxy() {
if !p.preCheckRequisite() {
......@@ -191,53 +214,19 @@ func (p *ProxyServer) Proxy() {
srvConn, err = p.getServerConn()
}
// 连接后端服务器失败
if err != nil {
msg := fmt.Sprintf("Connect asset %s error: %s\n\r", p.Asset.Hostname, err)
utils.IgnoreErrWriteString(p.UserConn, msg)
logger.Errorf(msg)
p.sendConnectErrorMsg(err)
return
}
sw := NewSwitchSession(p)
ok := p.createSession(sw)
if !ok {
msg := i18n.T("Connect with api server failed")
msg = utils.WrapperWarn(msg)
utils.IgnoreErrWriteString(p.UserConn, msg)
// 创建Session
sw, err := CreateSession(p)
if err != nil {
return
}
cmdRules := p.GetFilterRules()
sw.SetFilterRules(cmdRules)
AddSession(sw)
_ = sw.Bridge(p.UserConn, srvConn)
defer func() {
_ = srvConn.Close()
p.finishSession(sw)
RemoveSession(sw)
}()
}
func (p *ProxyServer) createSession(s *SwitchSession) bool {
data := s.MapData()
for i := 0; i < 5; i++ {
if service.CreateSession(data) {
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}
func (p *ProxyServer) finishSession(s *SwitchSession) {
data := s.MapData()
service.FinishSession(data)
service.FinishReply(s.Id)
logger.Debugf("Finish session: %s", s.Id)
}
func (p *ProxyServer) GetFilterRules() []model.SystemUserFilterRule {
cmdRules, err := service.GetSystemUserFilterRules(p.SystemUser.Id)
if err != nil {
logger.Error("Get system user filter rule error: ", err)
}
return cmdRules
}
package proxy
import (
"bytes"
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/elastic/go-elasticsearch"
"cocogo/pkg/config"
"cocogo/pkg/logger"
"cocogo/pkg/model"
"cocogo/pkg/service"
"encoding/json"
)
type ReplayStorage interface {
Upload(gZipFile, target string) error
}
type CommandStorage interface {
BulkSave(commands []*model.Command) error
}
var defaultCommandStorage = &ServerCommandStorage{}
var defaultReplayStorage = &ServerReplayStorage{StorageType: "server"}
func NewReplayStorage() ReplayStorage {
cf := config.GetConf().ReplayStorage
tp, ok := cf["TYPE"]
if !ok {
tp = "server"
}
switch tp {
case "azure":
endpointSuffix := cf["ENDPOINT_SUFFIX"].(string)
if endpointSuffix == "" {
endpointSuffix = "core.chinacloudapi.cn"
}
return &AzureReplayStorage{
accountName: cf["ACCOUNT_NAME"].(string),
accountKey: cf["ACCOUNT_KEY"].(string),
containerName: cf["CONTAINER_NAME"].(string),
endpointSuffix: endpointSuffix,
}
case "oss":
return &OSSReplayStorage{
endpoint: cf["ENDPOINT"].(string),
bucket: cf["BUCKET"].(string),
accessKey: cf["ACCESS_KEY"].(string),
secretKey: cf["SECRET_KEY"].(string),
}
case "s3":
var region string
var endpoint string
bucket := cf["BUCKET"].(string)
endpoint = cf["ENDPOINT"].(string)
if bucket == "" {
bucket = "jumpserver"
}
if cf["REGION"] != nil {
region = cf["REGION"].(string)
} else {
region = strings.Split(endpoint, ".")[1]
}
return &S3ReplayStorage{
bucket: bucket,
region: region,
accessKey: cf["ACCESS_KEY"].(string),
secretKey: cf["SECRET_KEY"].(string),
endpoint: endpoint,
}
default:
return defaultReplayStorage
}
}
func NewCommandStorage() CommandStorage {
cf := config.GetConf().CommandStorage
tp, ok := cf["TYPE"]
if !ok {
tp = "server"
}
switch tp {
case "es", "elasticsearch":
var hosts = make([]string, len(cf["HOSTS"].([]interface{})))
for i, item := range cf["HOSTS"].([]interface{}) {
hosts[i] = item.(string)
}
index := cf["INDEX"].(string)
docType := cf["DOC_TYPE"].(string)
if index == "" {
index = "jumpserver"
}
if docType == "" {
docType = "command_store"
}
return &ESCommandStorage{hosts: hosts, index: index, docType: docType}
default:
return defaultCommandStorage
}
}
type ServerCommandStorage struct {
}
func (s *ServerCommandStorage) BulkSave(commands []*model.Command) (err error) {
return service.PushSessionCommand(commands)
}
type ESCommandStorage struct {
hosts []string
index string
docType string
}
func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) {
var buf bytes.Buffer
esClinet, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: es.hosts,
})
if err != nil {
logger.Error(err.Error())
return
}
for _, item := range commands {
meta := []byte(fmt.Sprintf(`{ "index" : { } }%s`, "\n"))
data, err := json.Marshal(item)
if err != nil {
return err
}
data = append(data, "\n"...)
buf.Write(meta)
buf.Write(data)
}
_, err = esClinet.Bulk(bytes.NewReader(buf.Bytes()),
esClinet.Bulk.WithIndex(es.index), esClinet.Bulk.WithDocumentType(es.docType))
if err != nil {
logger.Error(err.Error())
}
return
}
func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) {
file, err := os.Create(name)
if err != nil {
return
}
storage = &FileCommandStorage{file: file}
return
}
type FileCommandStorage struct {
file *os.File
}
func (f *FileCommandStorage) BulkSave(commands []*model.Command) (err error) {
for _, cmd := range commands {
f.file.WriteString(fmt.Sprintf("命令: %s\n", cmd.Input))
f.file.WriteString(fmt.Sprintf("结果: %s\n", cmd.Output))
f.file.WriteString("---\n")
}
return
}
type ServerReplayStorage struct {
StorageType string
}
func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) {
sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0]
return service.PushSessionReplay(sessionID, gZipFilePath)
}
type OSSReplayStorage struct {
endpoint string
bucket string
accessKey string
secretKey string
}
func (o *OSSReplayStorage) Upload(gZipFilePath, target string) (err error) {
client, err := oss.New(o.endpoint, o.accessKey, o.secretKey)
if err != nil {
return
}
bucket, err := client.Bucket(o.bucket)
if err != nil {
logger.Error(err.Error())
return
}
return bucket.PutObjectFromFile(target, gZipFilePath)
}
type S3ReplayStorage struct {
bucket string
region string
accessKey string
secretKey string
endpoint string
}
func (s *S3ReplayStorage) Upload(gZipFilePath, target string) (err error) {
file, err := os.Open(gZipFilePath)
if err != nil {
logger.Debug("Failed to open file", err)
return
}
defer file.Close()
s3Config := &aws.Config{
Credentials: credentials.NewStaticCredentials(s.accessKey, s.secretKey, ""),
Endpoint: aws.String(s.endpoint),
Region: aws.String(s.region),
}
sess := session.Must(session.NewSession(s3Config))
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = 64 * 1024 * 1024 // 64MB per part
})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(target),
Body: file,
})
if err != nil {
logger.Error(err.Error())
}
return
}
type AzureReplayStorage struct {
accountName string
accountKey string
containerName string
endpointSuffix string
}
func (a *AzureReplayStorage) Upload(gZipFilePath, target string) (err error) {
file, err := os.Open(gZipFilePath)
if err != nil {
return
}
credential, err := azblob.NewSharedKeyCredential(a.accountName, a.accountKey)
if err != nil {
logger.Error("Invalid credentials with error: " + err.Error())
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
URL, _ := url.Parse(
fmt.Sprintf("https://%s.blob.%s/%s", a.accountName, a.endpointSuffix, a.containerName))
containerURL := azblob.NewContainerURL(*URL, p)
blobURL := containerURL.NewBlockBlobURL(target)
_, err = azblob.UploadFileToBlockBlob(context.TODO(), file, blobURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
if err != nil {
logger.Error(err.Error())
}
return
}
package recorderstorage
import (
"context"
"fmt"
"net/url"
"os"
"github.com/Azure/azure-storage-blob-go/azblob"
"cocogo/pkg/logger"
)
type AzureReplayStorage struct {
AccountName string
AccountKey string
ContainerName string
EndpointSuffix string
}
func (a *AzureReplayStorage) Upload(gZipFilePath, target string) (err error) {
file, err := os.Open(gZipFilePath)
if err != nil {
return
}
credential, err := azblob.NewSharedKeyCredential(a.AccountName, a.AccountKey)
if err != nil {
logger.Error("Invalid credentials with error: " + err.Error())
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
endpoint := fmt.Sprintf("https://%s.blob.%s/%s", a.AccountName, a.EndpointSuffix, a.ContainerName)
URL, _ := url.Parse(endpoint)
containerURL := azblob.NewContainerURL(*URL, p)
blobURL := containerURL.NewBlockBlobURL(target)
_, err = azblob.UploadFileToBlockBlob(context.TODO(), file, blobURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
if err != nil {
logger.Error(err.Error())
}
return
}
package recorderstorage
import (
"bytes"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch"
"cocogo/pkg/logger"
"cocogo/pkg/model"
)
type ESCommandStorage struct {
Hosts []string
Index string
DocType string
}
func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) {
var buf bytes.Buffer
esClinet, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: es.Hosts,
})
if err != nil {
logger.Error(err.Error())
return
}
for _, item := range commands {
meta := []byte(fmt.Sprintf(`{ "index" : { } }%s`, "\n"))
data, err := json.Marshal(item)
if err != nil {
return err
}
data = append(data, "\n"...)
buf.Write(meta)
buf.Write(data)
}
_, err = esClinet.Bulk(bytes.NewReader(buf.Bytes()),
esClinet.Bulk.WithIndex(es.Index), esClinet.Bulk.WithDocumentType(es.DocType))
if err != nil {
logger.Error(err.Error())
}
return
}
package recorderstorage
import (
"fmt"
"os"
"cocogo/pkg/model"
)
func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) {
file, err := os.Create(name)
if err != nil {
return
}
storage = &FileCommandStorage{File: file}
return
}
type FileCommandStorage struct {
File *os.File
}
func (f *FileCommandStorage) BulkSave(commands []*model.Command) (err error) {
for _, cmd := range commands {
f.File.WriteString(fmt.Sprintf("命令: %s\n", cmd.Input))
f.File.WriteString(fmt.Sprintf("结果: %s\n", cmd.Output))
f.File.WriteString("---\n")
}
return
}
package recorderstorage
import (
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"cocogo/pkg/logger"
)
type OSSReplayStorage struct {
Endpoint string
Bucket string
AccessKey string
SecretKey string
}
func (o *OSSReplayStorage) Upload(gZipFilePath, target string) (err error) {
client, err := oss.New(o.Endpoint, o.AccessKey, o.SecretKey)
if err != nil {
return
}
bucket, err := client.Bucket(o.Bucket)
if err != nil {
logger.Error(err.Error())
return
}
return bucket.PutObjectFromFile(target, gZipFilePath)
}
package recorderstorage
import (
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"cocogo/pkg/logger"
)
type S3ReplayStorage struct {
Bucket string
Region string
AccessKey string
SecretKey string
Endpoint string
}
func (s *S3ReplayStorage) Upload(gZipFilePath, target string) (err error) {
file, err := os.Open(gZipFilePath)
if err != nil {
logger.Debug("Failed to open file", err)
return
}
defer file.Close()
s3Config := &aws.Config{
Credentials: credentials.NewStaticCredentials(s.AccessKey, s.SecretKey, ""),
Endpoint: aws.String(s.Endpoint),
Region: aws.String(s.Region),
}
sess := session.Must(session.NewSession(s3Config))
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = 64 * 1024 * 1024 // 64MB per part
})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(target),
Body: file,
})
if err != nil {
logger.Error(err.Error())
}
return
}
package recorderstorage
import (
"path/filepath"
"strings"
"cocogo/pkg/model"
"cocogo/pkg/service"
)
type ServerCommandStorage struct {
}
func (s *ServerCommandStorage) BulkSave(commands []*model.Command) (err error) {
return service.PushSessionCommand(commands)
}
type ServerReplayStorage struct {
StorageType string
}
func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) {
sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0]
return service.PushSessionReplay(sessionID, gZipFilePath)
}
package proxy
import (
"cocogo/pkg/i18n"
"cocogo/pkg/logger"
"cocogo/pkg/model"
"cocogo/pkg/service"
"cocogo/pkg/utils"
"sync"
"time"
)
var sessionMap = make(map[string]*SwitchSession)
......@@ -25,7 +29,6 @@ func KillSession(sessionID string) {
if sw, ok := sessionMap[sessionID]; ok {
sw.Terminate()
}
}
func GetAliveSessions() []string {
......@@ -42,6 +45,7 @@ func RemoveSession(sw *SwitchSession) {
lock.Lock()
defer lock.Unlock()
delete(sessionMap, sw.Id)
finishSession(sw)
}
func AddSession(sw *SwitchSession) {
......@@ -49,3 +53,46 @@ func AddSession(sw *SwitchSession) {
defer lock.Unlock()
sessionMap[sw.Id] = sw
}
func CreateSession(p *ProxyServer) (sw *SwitchSession, err error) {
// 创建Session
sw = NewSwitchSession(p)
// Post到Api端
ok := postSession(sw)
if !ok {
msg := i18n.T("Connect with api server failed")
msg = utils.WrapperWarn(msg)
utils.IgnoreErrWriteString(p.UserConn, msg)
logger.Error(msg)
return
}
// 获取系统用户的过滤规则,并设置
cmdRules, err := service.GetSystemUserFilterRules(p.SystemUser.Id)
if err != nil {
msg := i18n.T("Connect with api server failed")
msg = utils.WrapperWarn(msg)
utils.IgnoreErrWriteString(p.UserConn, msg)
logger.Error(msg + err.Error())
}
sw.SetFilterRules(cmdRules)
AddSession(sw)
return
}
func postSession(s *SwitchSession) bool {
data := s.MapData()
for i := 0; i < 5; i++ {
if service.CreateSession(data) {
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}
func finishSession(s *SwitchSession) {
data := s.MapData()
service.FinishSession(data)
service.FinishReply(s.Id)
logger.Debugf("Finish session: %s", s.Id)
}
......@@ -165,7 +165,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
return
}
nw, _ := s.userTran.Write(p)
if !s.parser.IsRecvState() {
if !s.parser.IsInZmodemRecvState() {
s.replayRecorder.Record(p[:nw])
}
// User发来的数据流流入parser
......
package proxy
import (
"strings"
"cocogo/pkg/config"
"cocogo/pkg/model"
storage "cocogo/pkg/proxy/recorderstorage"
)
type ReplayStorage interface {
Upload(gZipFile, target string) error
}
type CommandStorage interface {
BulkSave(commands []*model.Command) error
}
var defaultCommandStorage = &storage.ServerCommandStorage{}
var defaultReplayStorage = &storage.ServerReplayStorage{StorageType: "server"}
func NewReplayStorage() ReplayStorage {
cf := config.GetConf().ReplayStorage
tp, ok := cf["TYPE"]
if !ok {
tp = "server"
}
switch tp {
case "azure":
endpointSuffix := cf["ENDPOINT_SUFFIX"].(string)
if endpointSuffix == "" {
endpointSuffix = "core.chinacloudapi.cn"
}
return &storage.AzureReplayStorage{
AccountName: cf["ACCOUNT_NAME"].(string),
AccountKey: cf["ACCOUNT_KEY"].(string),
ContainerName: cf["CONTAINER_NAME"].(string),
EndpointSuffix: endpointSuffix,
}
case "oss":
return &storage.OSSReplayStorage{
Endpoint: cf["ENDPOINT"].(string),
Bucket: cf["BUCKET"].(string),
AccessKey: cf["ACCESS_KEY"].(string),
SecretKey: cf["SECRET_KEY"].(string),
}
case "s3":
var region string
var endpoint string
bucket := cf["BUCKET"].(string)
endpoint = cf["ENDPOINT"].(string)
if bucket == "" {
bucket = "jumpserver"
}
if cf["REGION"] != nil {
region = cf["REGION"].(string)
} else {
region = strings.Split(endpoint, ".")[1]
}
return &storage.S3ReplayStorage{
Bucket: bucket,
Region: region,
AccessKey: cf["ACCESS_KEY"].(string),
SecretKey: cf["SECRET_KEY"].(string),
Endpoint: endpoint,
}
default:
return defaultReplayStorage
}
}
func NewCommandStorage() CommandStorage {
cf := config.GetConf().CommandStorage
tp, ok := cf["TYPE"]
if !ok {
tp = "server"
}
switch tp {
case "es", "elasticsearch":
var hosts = make([]string, len(cf["HOSTS"].([]interface{})))
for i, item := range cf["HOSTS"].([]interface{}) {
hosts[i] = item.(string)
}
index := cf["INDEX"].(string)
docType := cf["DOC_TYPE"].(string)
if index == "" {
index = "jumpserver"
}
if docType == "" {
docType = "command_store"
}
return &storage.ESCommandStorage{Hosts: hosts, Index: index, DocType: docType}
default:
return defaultCommandStorage
}
}
......@@ -97,7 +97,7 @@ func GetTokenAsset(token string) (tokenUser model.TokenUser) {
Url := fmt.Sprintf(TokenAssetUrl, token)
err := authClient.Get(Url, &tokenUser)
if err != nil {
logger.Error("Get Token Asset info failed")
logger.Error("Get Token Asset info failed: ", err)
}
return
}
package service
const (
UserAuthURL = "/api/users/v1/auth/" // post 验证用户登陆
UserProfileURL = "/api/users/v1/profile/" // 获取当前用户的基本信息
UserListUrl = "/api/users/v1/users/" // 用户列表地址
UserDetailURL = "/api/users/v1/users/%s/" // 获取用户信息
UserAuthOTPURL = "/api/authentication/v1/otp/auth/" // 验证OTP
UserAuthURL = "/api/users/v1/auth/" // post 验证用户登陆
UserProfileURL = "/api/users/v1/profile/" // 获取当前用户的基本信息
UserListUrl = "/api/users/v1/users/" // 用户列表地址
UserDetailURL = "/api/users/v1/users/%s/" // 获取用户信息
UserAuthOTPURL = "/api/authentication/v1/otp/auth/" // 验证OTP
TokenAssetUrl = "/api/authentication/v1/connection-token/?token=%s" // Token name
SystemUserAssetAuthURL = "/api/assets/v1/system-user/%s/asset/%s/auth-info/" // 该系统用户对某资产的授权
SystemUserCmdFilterRules = "/api/assets/v1/system-user/%s/cmd-filter-rules/" // 过滤规则url
SystemUserDetailURL = "/api/assets/v1/system-user/%s/" // 某个系统用户的信息
AssetDetailURL = "/api/assets/v1/assets/%s/" // 某一个资产信息
DomainDetailURL = "/api/assets/v1/domain/%s/"
TokenAssetUrl = "/api/users/v1/connection-token/?token=%s" // Token name
TerminalRegisterURL = "/api/terminal/v2/terminal-registrations/" // 注册当前coco
TerminalConfigURL = "/api/terminal/v1/terminal/config/" // 从jumpserver获取coco的配置
......
......@@ -27,9 +27,9 @@ func Authenticate(username, password, publicKey, remoteAddr, loginType string) (
return
}
func GetUserProfile(userID string) (user *model.User) {
func GetUserDetail(userID string) (user *model.User) {
Url := fmt.Sprintf(UserDetailURL, userID)
err := authClient.Get(Url, user)
err := authClient.Get(Url, &user)
if err != nil {
logger.Error(err)
}
......
......@@ -140,15 +140,6 @@ func newClient(asset *model.Asset, systemUser *model.SystemUser, timeout time.Du
Timeout: timeout,
Proxy: proxyConfigs,
}
sshConfig = SSHClientConfig{
Host: "127.0.0.1",
Port: "22",
User: "root",
Password: "redhat",
Proxy: []*SSHClientConfig{
{Host: "192.168.244.185", Port: "22", User: "root", Password: "redhat"},
},
}
client, err = sshConfig.Dial()
return
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment