Commit cb4f6cd4 authored by Eric's avatar Eric

[Update] fix some elasticsearch bugs

parent 0e374565
...@@ -18,8 +18,8 @@ type Config struct { ...@@ -18,8 +18,8 @@ type Config struct {
HostKey string `json:"TERMINAL_HOST_KEY" yaml:"HOST_KEY"` HostKey string `json:"TERMINAL_HOST_KEY" yaml:"HOST_KEY"`
PasswordAuth bool `json:"TERMINAL_PASSWORD_AUTH" yaml:"PASSWORD_AUTH"` PasswordAuth bool `json:"TERMINAL_PASSWORD_AUTH" yaml:"PASSWORD_AUTH"`
PublicKeyAuth bool `json:"TERMINAL_PUBLIC_KEY_AUTH" yaml:"PUBLIC_KEY_AUTH"` PublicKeyAuth bool `json:"TERMINAL_PUBLIC_KEY_AUTH" yaml:"PUBLIC_KEY_AUTH"`
CommandStorage map[string]string `json:"TERMINAL_COMMAND_STORAGE"` CommandStorage map[string]interface{} `json:"TERMINAL_COMMAND_STORAGE"`
ReplayStorage map[string]string `json:"TERMINAL_REPLAY_STORAGE" yaml:"REPLAY_STORAGE"` ReplayStorage map[string]interface{} `json:"TERMINAL_REPLAY_STORAGE" yaml:"REPLAY_STORAGE"`
SessionKeepDuration int `json:"TERMINAL_SESSION_KEEP_DURATION"` SessionKeepDuration int `json:"TERMINAL_SESSION_KEEP_DURATION"`
TelnetRegex string `json:"TERMINAL_TELNET_REGEX"` TelnetRegex string `json:"TERMINAL_TELNET_REGEX"`
MaxIdleTime int `json:"SECURITY_MAX_IDLE_TIME"` MaxIdleTime int `json:"SECURITY_MAX_IDLE_TIME"`
...@@ -122,8 +122,8 @@ var Conf = &Config{ ...@@ -122,8 +122,8 @@ var Conf = &Config{
RootPath: rootPath, RootPath: rootPath,
Comment: "Coco", Comment: "Coco",
Language: "zh", Language: "zh",
ReplayStorage: map[string]string{"TYPE": "server"}, ReplayStorage: map[string]interface{}{"TYPE": "server"},
CommandStorage: map[string]string{"TYPE": "server"}, CommandStorage: map[string]interface{}{"TYPE": "server"},
UploadFailedReplay: true, UploadFailedReplay: true,
} }
......
...@@ -43,34 +43,34 @@ func NewReplayStorage() ReplayStorage { ...@@ -43,34 +43,34 @@ func NewReplayStorage() ReplayStorage {
} }
switch tp { switch tp {
case "azure": case "azure":
endpointSuffix := cf["ENDPOINT_SUFFIX"] endpointSuffix := cf["ENDPOINT_SUFFIX"].(string)
if endpointSuffix == "" { if endpointSuffix == "" {
endpointSuffix = "core.chinacloudapi.cn" endpointSuffix = "core.chinacloudapi.cn"
} }
return &AzureReplayStorage{ return &AzureReplayStorage{
accountName: cf["ACCOUNT_NAME"], accountName: cf["ACCOUNT_NAME"].(string),
accountKey: cf["ACCOUNT_KEY"], accountKey: cf["ACCOUNT_KEY"].(string),
containerName: cf["CONTAINER_NAME"], containerName: cf["CONTAINER_NAME"].(string),
endpointSuffix: endpointSuffix, endpointSuffix: endpointSuffix,
} }
case "oss": case "oss":
return &OSSReplayStorage{ return &OSSReplayStorage{
endpoint: cf["ENDPOINT"], endpoint: cf["ENDPOINT"].(string),
bucket: cf["BUCKET"], bucket: cf["BUCKET"].(string),
accessKey: cf["ACCESS_KEY"], accessKey: cf["ACCESS_KEY"].(string),
secretKey: cf["SECRET_KEY"], secretKey: cf["SECRET_KEY"].(string),
} }
case "s3": case "s3":
bucket := cf["BUCKET"] bucket := cf["BUCKET"].(string)
if bucket == "" { if bucket == "" {
bucket = "jumpserver" bucket = "jumpserver"
} }
return &S3ReplayStorage{ return &S3ReplayStorage{
bucket: bucket, bucket: bucket,
region: cf["REGION"], region: cf["REGION"].(string),
accessKey: cf["ACCESS_KEY"], accessKey: cf["ACCESS_KEY"].(string),
secretKey: cf["SECRET_KEY"], secretKey: cf["SECRET_KEY"].(string),
endpoint: cf["ENDPOINT"], endpoint: cf["ENDPOINT"].(string),
} }
default: default:
return defaultReplayStorage return defaultReplayStorage
...@@ -85,17 +85,19 @@ func NewCommandStorage() CommandStorage { ...@@ -85,17 +85,19 @@ func NewCommandStorage() CommandStorage {
} }
switch tp { switch tp {
case "es", "elasticsearch": case "es", "elasticsearch":
hosts := cf["HOSTS"] var hosts = make([]string, len(cf["HOSTS"].([]interface{})))
index := cf["INDEX"] for i, item := range cf["HOSTS"].([]interface{}) {
docType := cf["DOC_TYPE"] hosts[i] = item.(string)
hostsArray := strings.Split(strings.Trim(hosts, ","), ",") }
index := cf["INDEX"].(string)
docType := cf["DOC_TYPE"].(string)
if index == "" { if index == "" {
index = "jumpserver" index = "jumpserver"
} }
if docType == "" { if docType == "" {
docType = "command_store" docType = "command_store"
} }
return &ESCommandStorage{hosts: hostsArray, index: index, docType: docType} return &ESCommandStorage{hosts: hosts, index: index, docType: docType}
default: default:
return defaultCommandStorage return defaultCommandStorage
} }
...@@ -115,11 +117,7 @@ type ESCommandStorage struct { ...@@ -115,11 +117,7 @@ type ESCommandStorage struct {
} }
func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) { func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) {
data, err := json.Marshal(commands) var buf bytes.Buffer
if err != nil {
return
}
esClinet, err := elasticsearch.NewClient(elasticsearch.Config{ esClinet, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: es.hosts, Addresses: es.hosts,
}) })
...@@ -127,15 +125,25 @@ func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) { ...@@ -127,15 +125,25 @@ func (es *ESCommandStorage) BulkSave(commands []*model.Command) (err error) {
logger.Error(err.Error()) logger.Error(err.Error())
return return
} }
for _, item := range commands {
meta := []byte(fmt.Sprintf(`{ "index" : { } }%s`, "\n"))
data, err := json.Marshal(item)
if err != nil {
return err
}
data = append(data, "\n"...)
buf.Write(meta)
buf.Write(data)
}
_, err = esClinet.Bulk(bytes.NewBuffer(data), _, err = esClinet.Bulk(bytes.NewReader(buf.Bytes()),
esClinet.Bulk.WithIndex(es.index), esClinet.Bulk.WithDocumentType(es.docType)) esClinet.Bulk.WithIndex(es.index), esClinet.Bulk.WithDocumentType(es.docType))
if err == nil { if err != nil {
logger.Debug("Successfully uploaded total %d commands to Elasticsearch\n", len(commands)) logger.Error(err.Error())
} }
return return
} }
func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) { func NewFileCommandStorage(name string) (storage *FileCommandStorage, err error) {
file, err := os.Create(name) file, err := os.Create(name)
if err != nil { if err != nil {
...@@ -181,6 +189,7 @@ func (o *OSSReplayStorage) Upload(gZipFilePath, target string) (err error) { ...@@ -181,6 +189,7 @@ func (o *OSSReplayStorage) Upload(gZipFilePath, target string) (err error) {
} }
bucket, err := client.Bucket(o.bucket) bucket, err := client.Bucket(o.bucket)
if err != nil { if err != nil {
logger.Error(err.Error())
return return
} }
return bucket.PutObjectFromFile(target, gZipFilePath) return bucket.PutObjectFromFile(target, gZipFilePath)
...@@ -217,8 +226,8 @@ func (s *S3ReplayStorage) Upload(gZipFilePath, target string) (err error) { ...@@ -217,8 +226,8 @@ func (s *S3ReplayStorage) Upload(gZipFilePath, target string) (err error) {
Key: aws.String(target), Key: aws.String(target),
Body: file, Body: file,
}) })
if err == nil { if err != nil {
logger.Debug("Successfully uploaded %q to %q\n", file.Name(), s.bucket) logger.Error(err.Error())
} }
return return
...@@ -250,8 +259,8 @@ func (a *AzureReplayStorage) Upload(gZipFilePath, target string) (err error) { ...@@ -250,8 +259,8 @@ func (a *AzureReplayStorage) Upload(gZipFilePath, target string) (err error) {
_, err = azblob.UploadFileToBlockBlob(context.TODO(), file, blobURL, azblob.UploadToBlockBlobOptions{ _, err = azblob.UploadFileToBlockBlob(context.TODO(), file, blobURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024, BlockSize: 4 * 1024 * 1024,
Parallelism: 16}) Parallelism: 16})
if err == nil { if err != nil {
logger.Debug("Successfully uploaded %q to Azure\n", file.Name()) logger.Error(err.Error())
} }
return return
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment