Unverified Commit f1637871 authored by 老广's avatar 老广 Committed by GitHub

Merge pull request #181 from jumpserver/dev

Dev
parents 09a898c4 35b5361b
......@@ -4,8 +4,7 @@ env/
*.pyo
.access_key
*.log
logs/*
data/*
host_rsa_key
sessions/*
coco.pid
config.yml
......@@ -6,7 +6,7 @@ WORKDIR /opt/coco
RUN yum -y install epel-release
RUN cd requirements && yum -y install $(cat rpm_requirements.txt)
RUN cd requirements && pip install $(egrep "jumpserver|jms" requirements.txt | tr '\n' ' ') && pip install -r requirements.txt -i https://mirrors.ustc.edu.cn/pypi/web/simple
RUN cd requirements && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ || pip install -r requirements.txt
ENV LANG=zh_CN.UTF-8
ENV LC_ALL=zh_CN.UTF-8
......
......@@ -11,8 +11,6 @@ import signal
import copy
from collections import defaultdict
import psutil
from .conf import config
from .sshd import SSHServer
from .httpd import HttpServer
......@@ -66,7 +64,7 @@ class Coco:
config.update(configs)
tmp = copy.deepcopy(configs)
tmp['HOST_KEY'] = tmp['HOST_KEY'][32:50] + '...'
tmp['HOST_KEY'] = tmp.get('HOST_KEY', '')[32:50] + '...'
logger.debug("Loading config from server: {}".format(
json.dumps(tmp)
))
......@@ -133,41 +131,43 @@ class Coco:
thread.start()
def monitor_sessions_replay(self):
interval = 10
log_dir = os.path.join(config['LOG_DIR'])
interval = 60 * 60 * 5
replay_dir = os.path.join(config.REPLAY_DIR)
max_try = 5
upload_failed = defaultdict(int)
def func():
while not self.stop_evt.is_set():
for filename in os.listdir(log_dir):
suffix = filename.split('.')[-1]
if suffix != 'gz':
continue
session_id = filename.split('.')[0]
if len(session_id) != 36:
continue
full_path = os.path.join(log_dir, filename)
stat = os.stat(full_path)
# 是否是一天前的,因为现在多个coco共享了日志目录,
# 不能单纯判断session是否关闭
if stat.st_mtime > time.time() - 24*60*60:
continue
# 失败次数过多
if session_id in upload_failed \
and upload_failed[session_id] >= max_try:
continue
recorder = get_replay_recorder()
recorder.file_path = full_path
ok = recorder.upload_replay(session_id, 1)
if ok:
upload_failed.pop(session_id, None)
elif not ok and os.path.getsize(full_path) == 0:
os.unlink(full_path)
else:
upload_failed[session_id] += 1
time.sleep(1)
for d in os.listdir(replay_dir):
date_path = os.path.join(replay_dir, d)
for filename in os.listdir(date_path):
suffix = filename.split('.')[-1]
if suffix != 'gz':
continue
session_id = filename.split('.')[0]
if len(session_id) != 36:
continue
full_path = os.path.join(date_path, filename)
stat = os.stat(full_path)
# 是否是一天前的,因为现在多个coco共享了日志目录,
# 不能单纯判断session是否关闭
if stat.st_mtime > time.time() - 24*60*60:
continue
# 失败次数过多
if session_id in upload_failed \
and upload_failed[session_id] >= max_try:
continue
recorder = get_replay_recorder()
recorder.file_path = full_path
recorder.session_id = session_id
recorder.target = os.path.join(d, filename)
ok, msg = recorder.upload_replay()
if ok:
upload_failed.pop(session_id, None)
else:
upload_failed[session_id] += 1
time.sleep(1)
time.sleep(interval)
thread = threading.Thread(target=func)
thread.start()
......
......@@ -332,6 +332,7 @@ defaults = {
'SECRET_KEY': 'SDK29K03%MM0ksf&#2',
'LOG_LEVEL': 'INFO',
'LOG_DIR': os.path.join(root_path, 'data', 'logs'),
'REPLAY_DIR': os.path.join(root_path, 'data', 'replays'),
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
'PASSWORD_AUTH': True,
'PUBLIC_KEY_AUTH': True,
......
......@@ -45,12 +45,11 @@ def create_logger():
},
'file': {
'level': 'DEBUG',
'class': 'logging.handlers.TimedRotatingFileHandler',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'main',
'filename': log_path,
'when': "D",
'interval': 1,
"backupCount": 7
'maxBytes': 1024*1024*100,
'backupCount': 7,
},
},
loggers={
......
......@@ -3,6 +3,7 @@
#
import threading
import datetime
import time
import os
import gzip
......@@ -22,14 +23,21 @@ BUF_SIZE = 1024
class ReplayRecorder(object):
time_start = None
target = None
storage = None
session_id = None
filename = None
file = None
file_path = None
def __init__(self):
super(ReplayRecorder, self).__init__()
self.file = None
self.file_path = None
self.get_storage()
def get_storage(self):
conf = deepcopy(config["REPLAY_STORAGE"])
conf["SERVICE"] = app_service
self.storage = jms_storage.get_object_storage(conf)
def record(self, data):
"""
:param data:
......@@ -47,49 +55,56 @@ class ReplayRecorder(object):
def session_start(self, session_id):
self.time_start = time.time()
filename = session_id + '.replay.gz'
self.file_path = os.path.join(config['LOG_DIR'], filename)
self.session_id = session_id
self.filename = session_id + '.replay.gz'
date = datetime.datetime.utcnow().strftime('%Y-%m-%d')
self.target = date + '/' + self.filename
replay_dir = os.path.join(config.REPLAY_DIR, date)
if not os.path.isdir(replay_dir):
os.makedirs(replay_dir, exist_ok=True)
self.file_path = os.path.join(replay_dir, self.filename)
self.file = gzip.open(self.file_path, 'at')
self.file.write('{')
def session_end(self, session_id):
self.file.write('"0":""}')
self.file.close()
self.upload_replay(session_id)
def get_storage(self):
conf = deepcopy(config["REPLAY_STORAGE"])
conf["SERVICE"] = app_service
self.storage = jms_storage.get_object_storage(conf)
self.upload_replay_some_times()
def upload_replay(self, session_id, times=3):
def upload_replay_some_times(self, times=3):
# 如果上传OSS、S3失败则尝试上传到服务器
if times < 1:
if self.storage.type == 'jms':
return False
else:
self.storage = jms_storage.JMSReplayStorage(
{"SERVICE": app_service}
)
self.upload_replay(session_id, times=3)
self.storage = jms_storage.JMSReplayStorage(
{"SERVICE": app_service}
)
self.upload_replay_some_times(times=3)
ok, msg = self.push_to_storage(session_id)
ok, msg = self.upload_replay()
if not ok:
msg = 'Failed push replay file {}: {}, try again {}'.format(
session_id, msg, times
self.filename, msg, times
)
logger.warn(msg)
self.upload_replay(session_id, times-1)
self.upload_replay_some_times(times - 1)
else:
msg = 'Success push replay file: {}'.format(session_id)
msg = 'Success push replay file: {}'.format(self.session_id)
logger.debug(msg)
self.finish_replay(3, session_id)
os.unlink(self.file_path)
return True
def push_to_storage(self, session_id):
dt = time.strftime('%Y-%m-%d', time.localtime(self.time_start))
target = dt + '/' + session_id + '.replay.gz'
return self.storage.upload(self.file_path, target)
def upload_replay(self):
# 如果文件为空就直接删除
if not os.path.isfile(self.file_path):
return False, 'Not found the file: {}'.format(self.file_path)
if os.path.getsize(self.file_path) == 0:
os.unlink(self.file_path)
return True, ''
ok, msg = self.storage.upload(self.file_path, self.target)
if ok:
self.finish_replay(3, self.session_id)
os.unlink(self.file_path)
return ok, msg
def finish_replay(self, times, session_id):
if times < 1:
......
......@@ -27,14 +27,14 @@ BOOTSTRAP_TOKEN: <PleasgeChangeSameWithJumpserver>
# 加密密钥
# SECRET_KEY: null
# 设置日志级别 ['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL', 'CRITICAL']
# 设置日志级别 [DEBUG, INFO, WARN, ERROR, FATAL, CRITICAL]
# LOG_LEVEL: INFO
# 日志存放的目录
# LOG_DIR: logs
# SSH白名单
# ALLOW_SSH_USER: 'all'
# ALLOW_SSH_USER: all
# SSH黑名单, 如果用户同时在白名单和黑名单,黑名单优先生效
# BLOCK_SSH_USER:
......@@ -49,5 +49,11 @@ BOOTSTRAP_TOKEN: <PleasgeChangeSameWithJumpserver>
# SSH连接超时时间 (default 15 seconds)
# SSH_TIMEOUT: 15
# 语言 = en
# 语言 [en,zh]
# LANGUAGE_CODE: zh
# SFTP的根目录, 可选 /tmp, Home其他自定义目录
# SFTP_ROOT: /tmp
# SFTP是否显示隐藏文件
# SFTP_SHOW_HIDDEN_FILE: false
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment