Commit 0e9aa67c authored by ibuler's avatar ibuler

[Feature] 支持es存储命令

parent f7e1cbcd
......@@ -11,6 +11,8 @@ import gzip
import json
import shutil
from jms_es_storage import ESStore
from .alignment import MemoryQueue
logger = logging.getLogger(__file__)
......@@ -183,11 +185,63 @@ class ServerCommandRecorder(CommandRecorder, metaclass=Singleton):
print("{} has been gc".format(self))
class ESCommandRecorder(CommandRecorder, metaclass=Singleton):
batch_size = 10
timeout = 5
no = 0
def __init__(self, app):
super().__init__(app)
self.queue = MemoryQueue()
self.stop_evt = threading.Event()
self.push_to_es_async()
self.__class__.no += 1
self.store = ESStore(**app.config["COMMAND_RECORD_OPTIONS"])
if not self.store.ping():
raise AssertionError("ESCommand storage init error")
def record(self, data):
if data and data['input']:
data['input'] = data['input'][:128]
data['output'] = data['output'][:1024]
data['timestamp'] = int(data['timestamp'])
self.queue.put(data)
def push_to_es_async(self):
def func():
while not self.stop_evt.is_set():
data_set = self.queue.mget(self.batch_size,
timeout=self.timeout)
logger.debug(
"<Session command recorder {}> queue size: {}".format(
self.no, self.queue.qsize())
)
if not data_set:
continue
logger.debug("Send {} commands to server".format(len(data_set)))
ok = self.store.bulk_save(data_set)
if not ok:
self.queue.mput(data_set)
thread = threading.Thread(target=func)
thread.daemon = True
thread.start()
def session_start(self, session_id):
pass
def session_end(self, session_id):
pass
def __del__(self):
print("{} has been gc".format(self))
def get_command_recorder_class(app):
command_engine = app.config["COMMAND_RECORD_ENGINE"]
if command_engine == "server":
return ServerCommandRecorder
if command_engine == "elasticsearch":
return ESCommandRecorder
else:
return ServerCommandRecorder
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment