Commit 0e374565 authored by Eric's avatar Eric

[Update] add S3 OSS Azure and elasticsearch storage

parent 88f5215a
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
digest = "1:279540310125d2b219920588d7e2edb2a85b3317b528839166e896ce6b6f211c"
name = "github.com/Azure/azure-pipeline-go"
packages = ["pipeline"]
pruneopts = "UT"
revision = "55fedc85a614dcd0e942a66f302ae3efb83d563c"
version = "v0.1.9"
[[projects]]
digest = "1:b15d5bdadce5d98f1e06353508a4029fccfeb68761957b3a2a8cb38ebb8caca4"
name = "github.com/Azure/azure-storage-blob-go"
packages = ["azblob"]
pruneopts = "UT"
revision = "678206e7e6e55abf0265a6440135e92005176ebf"
version = "v0.6.0"
[[projects]]
digest = "1:c5c73365822fb6a7bbe288316868057ae58447ddd95ecef466b6dbe93c37670e"
name = "github.com/aliyun/aliyun-oss-go-sdk"
packages = ["oss"]
pruneopts = "UT"
revision = "08079eb9f6aaa13ee00b8fade6ceeffa138bf877"
version = "v1.9.6"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:1a200e7e73293b75eb8e5c93d023b5472663432da0b663e1532624fcfede9ca8" digest = "1:1a200e7e73293b75eb8e5c93d023b5472663432da0b663e1532624fcfede9ca8"
...@@ -9,6 +33,63 @@ ...@@ -9,6 +33,63 @@
pruneopts = "UT" pruneopts = "UT"
revision = "648efa622239a2f6ff949fed78ee37b48d499ba4" revision = "648efa622239a2f6ff949fed78ee37b48d499ba4"
[[projects]]
digest = "1:495d57b9444476dbdcad8420c73a84b5b72e7e058b011b17e218e759c16f66fd"
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
"aws/awserr",
"aws/awsutil",
"aws/client",
"aws/client/metadata",
"aws/corehandlers",
"aws/credentials",
"aws/credentials/ec2rolecreds",
"aws/credentials/endpointcreds",
"aws/credentials/processcreds",
"aws/credentials/stscreds",
"aws/csm",
"aws/defaults",
"aws/ec2metadata",
"aws/endpoints",
"aws/request",
"aws/session",
"aws/signer/v4",
"internal/ini",
"internal/s3err",
"internal/sdkio",
"internal/sdkrand",
"internal/sdkuri",
"internal/shareddefaults",
"private/protocol",
"private/protocol/eventstream",
"private/protocol/eventstream/eventstreamapi",
"private/protocol/query",
"private/protocol/query/queryutil",
"private/protocol/rest",
"private/protocol/restxml",
"private/protocol/xml/xmlutil",
"service/s3",
"service/s3/s3iface",
"service/s3/s3manager",
"service/sts",
]
pruneopts = "UT"
revision = "0db84dcbcc56669065730700b054eb6d1438a0f7"
version = "v1.19.33"
[[projects]]
digest = "1:2af3a6e1f12e54cef95c6051cd1cb1e154629a4b82c692ac8a92f00259f570eb"
name = "github.com/elastic/go-elasticsearch"
packages = [
".",
"esapi",
"estransport",
]
pruneopts = "UT"
revision = "3aa68f5e96e138fa4dafc9403b10674b4310a632"
version = "v0.0.0"
[[projects]] [[projects]]
branch = "dev" branch = "dev"
digest = "1:b728156c8c642481f6b30c46bfd5f6117f7e3c8495b1f0c47795e2db958c6729" digest = "1:b728156c8c642481f6b30c46bfd5f6117f7e3c8495b1f0c47795e2db958c6729"
...@@ -61,6 +142,13 @@ ...@@ -61,6 +142,13 @@
revision = "ac2099de8d3789d30b99b740d1a9d242097462df" revision = "ac2099de8d3789d30b99b740d1a9d242097462df"
version = "v1.0.4" version = "v1.0.4"
[[projects]]
digest = "1:bb81097a5b62634f3e9fec1014657855610c82d19b9a40c17612e32651e35dca"
name = "github.com/jmespath/go-jmespath"
packages = ["."]
pruneopts = "UT"
revision = "c2b33e84"
[[projects]] [[projects]]
digest = "1:31e761d97c76151dde79e9d28964a812c46efc5baee4085b86f68f0c654450de" digest = "1:31e761d97c76151dde79e9d28964a812c46efc5baee4085b86f68f0c654450de"
name = "github.com/konsorten/go-windows-terminal-sequences" name = "github.com/konsorten/go-windows-terminal-sequences"
...@@ -180,6 +268,14 @@ ...@@ -180,6 +268,14 @@
pruneopts = "UT" pruneopts = "UT"
revision = "3a4b5fb9f71f5874b2374ae059bc0e0bcb52e145" revision = "3a4b5fb9f71f5874b2374ae059bc0e0bcb52e145"
[[projects]]
branch = "master"
digest = "1:9fdc2b55e8e0fafe4b41884091e51e77344f7dc511c5acedcfd98200003bff90"
name = "golang.org/x/time"
packages = ["rate"]
pruneopts = "UT"
revision = "9d24e82272b4f38b78bc8cff74fa936d31ccd8ef"
[[projects]] [[projects]]
digest = "1:c805e517269b0ba4c21ded5836019ed7d16953d4026cb7d00041d039c7906be9" digest = "1:c805e517269b0ba4c21ded5836019ed7d16953d4026cb7d00041d039c7906be9"
name = "gopkg.in/natefinch/lumberjack.v2" name = "gopkg.in/natefinch/lumberjack.v2"
...@@ -200,6 +296,13 @@ ...@@ -200,6 +296,13 @@
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
input-imports = [ input-imports = [
"github.com/Azure/azure-storage-blob-go/azblob",
"github.com/aliyun/aliyun-oss-go-sdk/oss",
"github.com/aws/aws-sdk-go/aws",
"github.com/aws/aws-sdk-go/aws/credentials",
"github.com/aws/aws-sdk-go/aws/session",
"github.com/aws/aws-sdk-go/service/s3/s3manager",
"github.com/elastic/go-elasticsearch",
"github.com/gliderlabs/ssh", "github.com/gliderlabs/ssh",
"github.com/googollee/go-socket.io", "github.com/googollee/go-socket.io",
"github.com/jarcoal/httpmock", "github.com/jarcoal/httpmock",
......
...@@ -76,5 +76,17 @@ ...@@ -76,5 +76,17 @@
version = "2.1.0" version = "2.1.0"
[[constraint]] [[constraint]]
name = "github.com/mattn/go-sqlite3" name = "github.com/aliyun/aliyun-oss-go-sdk"
version = "1.10.0" version = "1.9.6"
[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.19.33"
[[constraint]]
name = "github.com/Azure/azure-storage-blob-go"
version = "0.6.0"
[[constraint]]
name = "github.com/elastic/go-elasticsearch"
version = "0.0.0"
package proxy package proxy
import ( import (
"cocogo/pkg/config" "bytes"
"cocogo/pkg/model" "context"
"cocogo/pkg/service"
"fmt" "fmt"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/elastic/go-elasticsearch"
"cocogo/pkg/config"
"cocogo/pkg/logger"
"cocogo/pkg/model"
"cocogo/pkg/service"
"encoding/json"
) )
type ReplayStorage interface { type ReplayStorage interface {
...@@ -28,6 +42,36 @@ func NewReplayStorage() ReplayStorage { ...@@ -28,6 +42,36 @@ func NewReplayStorage() ReplayStorage {
tp = "server" tp = "server"
} }
switch tp { switch tp {
case "azure":
endpointSuffix := cf["ENDPOINT_SUFFIX"]
if endpointSuffix == "" {
endpointSuffix = "core.chinacloudapi.cn"
}
return &AzureReplayStorage{
accountName: cf["ACCOUNT_NAME"],
accountKey: cf["ACCOUNT_KEY"],
containerName: cf["CONTAINER_NAME"],
endpointSuffix: endpointSuffix,
}
case "oss":
return &OSSReplayStorage{
endpoint: cf["ENDPOINT"],
bucket: cf["BUCKET"],
accessKey: cf["ACCESS_KEY"],
secretKey: cf["SECRET_KEY"],
}
case "s3":
bucket := cf["BUCKET"]
if bucket == "" {
bucket = "jumpserver"
}
return &S3ReplayStorage{
bucket: bucket,
region: cf["REGION"],
accessKey: cf["ACCESS_KEY"],
secretKey: cf["SECRET_KEY"],
endpoint: cf["ENDPOINT"],
}
default: default:
return defaultReplayStorage return defaultReplayStorage
} }
...@@ -40,6 +84,18 @@ func NewCommandStorage() CommandStorage { ...@@ -40,6 +84,18 @@ func NewCommandStorage() CommandStorage {
tp = "server" tp = "server"
} }
switch tp { switch tp {
case "es", "elasticsearch":
hosts := cf["HOSTS"]
index := cf["INDEX"]
docType := cf["DOC_TYPE"]
hostsArray := strings.Split(strings.Trim(hosts, ","), ",")
if index == "" {
index = "jumpserver"
}
if docType == "" {
docType = "command_store"
}
return &ESCommandStorage{hosts: hostsArray, index: index, docType: docType}
default: default:
return defaultCommandStorage return defaultCommandStorage
} }
...@@ -52,6 +108,34 @@ func (s *ServerCommandStorage) BulkSave(commands []*model.Command) (err error) { ...@@ -52,6 +108,34 @@ func (s *ServerCommandStorage) BulkSave(commands []*model.Command) (err error) {
return service.PushSessionCommand(commands) return service.PushSessionCommand(commands)
} }
type ESCommandStorage struct {
hosts []string
index string
docType string
}
func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) {
data, err := json.Marshal(commands)
if err != nil {
return
}
esClinet, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: es.hosts,
})
if err != nil {
logger.Error(err.Error())
return
}
_, err = esClinet.Bulk(bytes.NewBuffer(data),
esClinet.Bulk.WithIndex(es.index), esClinet.Bulk.WithDocumentType(es.docType))
if err == nil {
logger.Debug("Successfully uploaded total %d commands to Elasticsearch\n", len(commands))
}
return
}
func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) { func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) {
file, err := os.Create(name) file, err := os.Create(name)
if err != nil { if err != nil {
...@@ -82,3 +166,92 @@ func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) { ...@@ -82,3 +166,92 @@ func (s *ServerReplayStorage) Upload(gZipFilePath, target string) (err error) {
sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0] sessionID := strings.Split(filepath.Base(gZipFilePath), ".")[0]
return service.PushSessionReplay(sessionID, gZipFilePath) return service.PushSessionReplay(sessionID, gZipFilePath)
} }
type OSSReplayStorage struct {
endpoint string
bucket string
accessKey string
secretKey string
}
func (o *OSSReplayStorage) Upload(gZipFilePath, target string) (err error) {
client, err := oss.New(o.endpoint, o.accessKey, o.secretKey)
if err != nil {
return
}
bucket, err := client.Bucket(o.bucket)
if err != nil {
return
}
return bucket.PutObjectFromFile(target, gZipFilePath)
}
type S3ReplayStorage struct {
bucket string
region string
accessKey string
secretKey string
endpoint string
}
func (s *S3ReplayStorage) Upload(gZipFilePath, target string) (err error) {
file, err := os.Open(gZipFilePath)
if err != nil {
logger.Debug("Failed to open file", err)
return
}
defer file.Close()
s3Config := &aws.Config{
Credentials: credentials.NewStaticCredentials(s.accessKey, s.secretKey, ""),
Endpoint: aws.String(s.endpoint),
Region: aws.String(s.region),
}
sess := session.Must(session.NewSession(s3Config))
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = 64 * 1024 * 1024 // 64MB per part
})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(target),
Body: file,
})
if err == nil {
logger.Debug("Successfully uploaded %q to %q\n", file.Name(), s.bucket)
}
return
}
type AzureReplayStorage struct {
accountName string
accountKey string
containerName string
endpointSuffix string
}
func (a *AzureReplayStorage) Upload(gZipFilePath, target string) (err error) {
file, err := os.Open(gZipFilePath)
if err != nil {
return
}
credential, err := azblob.NewSharedKeyCredential(a.accountName, a.accountKey)
if err != nil {
logger.Error("Invalid credentials with error: " + err.Error())
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
URL, _ := url.Parse(
fmt.Sprintf("https://%s.blob.%s/%s", a.accountName, a.endpointSuffix, a.containerName))
containerURL := azblob.NewContainerURL(*URL, p)
blobURL := containerURL.NewBlockBlobURL(target)
_, err = azblob.UploadFileToBlockBlob(context.TODO(), file, blobURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
if err == nil {
logger.Debug("Successfully uploaded %q to Azure\n", file.Name())
}
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment