Commit 30713105 authored by ibuler's avatar ibuler

[Update] 修改录像策略

parent 0508afc4
......@@ -84,7 +84,8 @@ class Coco:
self.keep_load_extra_conf()
self.keep_heartbeat()
self.monitor_sessions()
self.monitor_sessions_replay()
if config.UPLOAD_FAILED_REPLAY_ON_START:
self.upload_failed_replay()
# @ignore_error
def heartbeat(self):
......@@ -132,24 +133,18 @@ class Coco:
thread = threading.Thread(target=func)
thread.start()
def monitor_sessions_replay(self):
interval = 60 * 60 * 5
@staticmethod
def upload_failed_replay():
replay_dir = os.path.join(config.REPLAY_DIR)
max_try = 5
upload_failed = defaultdict(int)
def retry_upload_replay(session_id, full_path, target):
def retry_upload_replay(session_id, file_gz_path, target):
recorder = get_replay_recorder()
recorder.file_path = full_path
recorder.file_gz_path = file_gz_path
recorder.session_id = session_id
recorder.target = target
ok, msg = recorder.upload_replay()
if ok:
upload_failed.pop(session_id, None)
else:
upload_failed[session_id] += 1
recorder.upload_replay()
def check_replay_need_upload(full_path):
def check_replay_is_need_upload(full_path):
filename = os.path.basename(full_path)
suffix = filename.split('.')[-1]
if suffix != 'gz':
......@@ -157,30 +152,21 @@ class Coco:
session_id = filename.split('.')[0]
if len(session_id) != 36:
return False
stat = os.stat(full_path)
if stat.st_mtime > time.time() - 24 * 60 * 60:
return False
return True
def func():
while not self.stop_evt.is_set():
for d in os.listdir(replay_dir):
date_path = os.path.join(replay_dir, d)
for filename in os.listdir(date_path):
full_path = os.path.join(date_path, filename)
session_id = filename.split('.')[0]
# 是否是一天前的,因为现在多个coco共享了日志目录,
# 不能单纯判断session是否关闭
if not check_replay_need_upload(full_path):
continue
# 失败次数过多
if session_id in upload_failed \
and upload_failed[session_id] >= max_try:
continue
target = os.path.join(d, filename)
retry_upload_replay(session_id, full_path, target)
time.sleep(1)
time.sleep(interval)
for d in os.listdir(replay_dir):
date_path = os.path.join(replay_dir, d)
for filename in os.listdir(date_path):
full_path = os.path.join(date_path, filename)
session_id = filename.split('.')[0]
# 检查是否需要上传
if not check_replay_is_need_upload(full_path):
continue
logger.debug("Retry upload retain replay: {}".format(filename))
target = os.path.join(d, filename)
retry_upload_replay(session_id, full_path, target)
time.sleep(1)
thread = threading.Thread(target=func)
thread.start()
......
......@@ -352,7 +352,8 @@ defaults = {
'SECURITY_MAX_IDLE_TIME': 60,
'ASSET_LIST_PAGE_SIZE': 'auto',
'SFTP_ROOT': '/tmp',
'SFTP_SHOW_HIDDEN_FILE': False
'SFTP_SHOW_HIDDEN_FILE': False,
'UPLOAD_FAILED_REPLAY_ON_START': True
}
......
......@@ -6,14 +6,13 @@ import threading
import datetime
import time
import os
import gzip
import json
from copy import deepcopy
import jms_storage
from .conf import config
from .utils import get_logger
from .utils import get_logger, gzip_file
from .struct import MemoryQueue
from .service import app_service
......@@ -29,6 +28,8 @@ class ReplayRecorder(object):
filename = None
file = None
file_path = None
filename_gz = None
file_gz_path = None
def __init__(self):
self.get_storage()
......@@ -56,19 +57,26 @@ class ReplayRecorder(object):
def session_start(self, session_id):
self.time_start = time.time()
self.session_id = session_id
self.filename = session_id + '.replay.gz'
self.filename = session_id
self.filename_gz = session_id + '.replay.gz'
date = datetime.datetime.utcnow().strftime('%Y-%m-%d')
self.target = date + '/' + self.filename
replay_dir = os.path.join(config.REPLAY_DIR, date)
if not os.path.isdir(replay_dir):
os.makedirs(replay_dir, exist_ok=True)
# 录像记录路径
self.file_path = os.path.join(replay_dir, self.filename)
self.file = gzip.open(self.file_path, 'at')
# 录像压缩到的路径
self.file_gz_path = os.path.join(replay_dir, self.filename_gz)
# 录像上传上去的路径
self.target = date + '/' + self.filename_gz
self.file = open(self.file_path, 'at')
self.file.write('{')
def session_end(self, session_id):
self.file.write('"0":""}')
self.file.close()
gzip_file(self.file_path, self.file_gz_path)
self.upload_replay_some_times()
def upload_replay_some_times(self, times=3):
......@@ -95,15 +103,15 @@ class ReplayRecorder(object):
def upload_replay(self):
# 如果文件为空就直接删除
if not os.path.isfile(self.file_path):
return False, 'Not found the file: {}'.format(self.file_path)
if os.path.getsize(self.file_path) == 0:
os.unlink(self.file_path)
if not os.path.isfile(self.file_gz_path):
return False, 'Not found the file: {}'.format(self.file_gz_path)
if os.path.getsize(self.file_gz_path) == 0:
os.unlink(self.file_gz_path)
return True, ''
ok, msg = self.storage.upload(self.file_path, self.target)
ok, msg = self.storage.upload(self.file_gz_path, self.target)
if ok:
self.finish_replay(3, self.session_id)
os.unlink(self.file_path)
os.unlink(self.file_gz_path)
return ok, msg
def finish_replay(self, times, session_id):
......
......@@ -8,6 +8,7 @@ import logging
import re
import os
import gettext
import gzip
from io import StringIO
from binascii import hexlify
from werkzeug.local import Local, LocalProxy
......@@ -464,4 +465,11 @@ def ignore_error(func):
return wrapper
def gzip_file(src_path, dst_path, unlink_ori=True):
with open(src_path, 'rt') as src, gzip.open(dst_path, 'at') as dst:
dst.writelines(src)
if unlink_ori:
os.unlink(src_path)
ugettext = LocalProxy(partial(_find, 'LANGUAGE_CODE'))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment