Commit 56d23da6 authored by ibuler's avatar ibuler

[Update] 修改上传逻辑

parent 4f0c0719
......@@ -4,8 +4,7 @@ env/
*.pyo
.access_key
*.log
logs/*
data/*
host_rsa_key
sessions/*
coco.pid
config.yml
......@@ -11,8 +11,6 @@ import signal
import copy
from collections import defaultdict
import psutil
from .conf import config
from .sshd import SSHServer
from .httpd import HttpServer
......@@ -133,41 +131,43 @@ class Coco:
thread.start()
def monitor_sessions_replay(self):
interval = 10
log_dir = os.path.join(config['LOG_DIR'])
interval = 60 * 60 * 5
replay_dir = os.path.join(config.REPLAY_DIR)
max_try = 5
upload_failed = defaultdict(int)
def func():
while not self.stop_evt.is_set():
for filename in os.listdir(log_dir):
suffix = filename.split('.')[-1]
if suffix != 'gz':
continue
session_id = filename.split('.')[0]
if len(session_id) != 36:
continue
full_path = os.path.join(log_dir, filename)
stat = os.stat(full_path)
# 是否是一天前的,因为现在多个coco共享了日志目录,
# 不能单纯判断session是否关闭
if stat.st_mtime > time.time() - 24*60*60:
continue
# 失败次数过多
if session_id in upload_failed \
and upload_failed[session_id] >= max_try:
continue
recorder = get_replay_recorder()
recorder.file_path = full_path
ok = recorder.upload_replay(session_id, 1)
if ok:
upload_failed.pop(session_id, None)
elif not ok and os.path.getsize(full_path) == 0:
os.unlink(full_path)
else:
upload_failed[session_id] += 1
time.sleep(1)
for d in os.listdir(replay_dir):
date_path = os.path.join(replay_dir, d)
for filename in os.listdir(date_path):
suffix = filename.split('.')[-1]
if suffix != 'gz':
continue
session_id = filename.split('.')[0]
if len(session_id) != 36:
continue
full_path = os.path.join(date_path, filename)
stat = os.stat(full_path)
# 是否是一天前的,因为现在多个coco共享了日志目录,
# 不能单纯判断session是否关闭
if stat.st_mtime > time.time() - 24*60*60:
continue
# 失败次数过多
if session_id in upload_failed \
and upload_failed[session_id] >= max_try:
continue
recorder = get_replay_recorder()
recorder.file_path = full_path
recorder.session_id = session_id
recorder.target = os.path.join(d, filename)
ok, msg = recorder.upload_replay()
if ok:
upload_failed.pop(session_id, None)
else:
upload_failed[session_id] += 1
time.sleep(1)
time.sleep(interval)
thread = threading.Thread(target=func)
thread.start()
......
......@@ -332,6 +332,7 @@ defaults = {
'SECRET_KEY': 'SDK29K03%MM0ksf&#2',
'LOG_LEVEL': 'INFO',
'LOG_DIR': os.path.join(root_path, 'data', 'logs'),
'REPLAY_DIR': os.path.join(root_path, 'data', 'replays'),
'ASSET_LIST_SORT_BY': 'hostname', # hostname, ip
'PASSWORD_AUTH': True,
'PUBLIC_KEY_AUTH': True,
......
......@@ -3,6 +3,7 @@
#
import threading
import datetime
import time
import os
import gzip
......@@ -22,14 +23,21 @@ BUF_SIZE = 1024
class ReplayRecorder(object):
time_start = None
target = None
storage = None
session_id = None
filename = None
file = None
file_path = None
def __init__(self):
super(ReplayRecorder, self).__init__()
self.file = None
self.file_path = None
self.get_storage()
def get_storage(self):
conf = deepcopy(config["REPLAY_STORAGE"])
conf["SERVICE"] = app_service
self.storage = jms_storage.get_object_storage(conf)
def record(self, data):
"""
:param data:
......@@ -47,49 +55,56 @@ class ReplayRecorder(object):
def session_start(self, session_id):
self.time_start = time.time()
filename = session_id + '.replay.gz'
self.file_path = os.path.join(config['LOG_DIR'], filename)
self.session_id = session_id
self.filename = session_id + '.replay.gz'
date = datetime.datetime.utcnow().strftime('%Y-%m-%d')
self.target = date + '/' + self.filename
replay_dir = os.path.join(config.REPLAY_DIR, date)
if not os.path.isdir(replay_dir):
os.makedirs(replay_dir, exist_ok=True)
self.file_path = os.path.join(replay_dir, self.filename)
self.file = gzip.open(self.file_path, 'at')
self.file.write('{')
def session_end(self, session_id):
self.file.write('"0":""}')
self.file.close()
self.upload_replay(session_id)
def get_storage(self):
conf = deepcopy(config["REPLAY_STORAGE"])
conf["SERVICE"] = app_service
self.storage = jms_storage.get_object_storage(conf)
self.upload_replay_some_times()
def upload_replay(self, session_id, times=3):
def upload_replay_some_times(self, times=3):
# 如果上传OSS、S3失败则尝试上传到服务器
if times < 1:
if self.storage.type == 'jms':
return False
else:
self.storage = jms_storage.JMSReplayStorage(
{"SERVICE": app_service}
)
self.upload_replay(session_id, times=3)
self.storage = jms_storage.JMSReplayStorage(
{"SERVICE": app_service}
)
self.upload_replay_some_times(times=3)
ok, msg = self.push_to_storage(session_id)
ok, msg = self.upload_replay()
if not ok:
msg = 'Failed push replay file {}: {}, try again {}'.format(
session_id, msg, times
self.filename, msg, times
)
logger.warn(msg)
self.upload_replay(session_id, times-1)
self.upload_replay_some_times(times - 1)
else:
msg = 'Success push replay file: {}'.format(session_id)
msg = 'Success push replay file: {}'.format(self.session_id)
logger.debug(msg)
self.finish_replay(3, session_id)
os.unlink(self.file_path)
return True
def push_to_storage(self, session_id):
dt = time.strftime('%Y-%m-%d', time.localtime(self.time_start))
target = dt + '/' + session_id + '.replay.gz'
return self.storage.upload(self.file_path, target)
def upload_replay(self):
# 如果文件为空就直接删除
if not os.path.isfile(self.file_path):
return False, 'Not found the file: {}'.format(self.file_path)
if os.path.getsize(self.file_path) == 0:
os.unlink(self.file_path)
return True, ''
ok, msg = self.storage.upload(self.file_path, self.target)
if ok:
self.finish_replay(3, self.session_id)
os.unlink(self.file_path)
return ok, msg
def finish_replay(self, times, session_id):
if times < 1:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment