Commit 021f9193 authored by ibuler's avatar ibuler

[Feture] 添加task处理,如kill session

parent fe7bcb81
...@@ -12,6 +12,7 @@ from .httpd import HttpServer ...@@ -12,6 +12,7 @@ from .httpd import HttpServer
from .logging import create_logger from .logging import create_logger
from .queue import get_queue from .queue import get_queue
from .record import get_recorder, START_SENTINEL, END_SENTINEL from .record import get_recorder, START_SENTINEL, END_SENTINEL
from .tasks import TaskHandler
__version__ = '0.4.0' __version__ = '0.4.0'
...@@ -66,6 +67,7 @@ class Coco: ...@@ -66,6 +67,7 @@ class Coco:
self._command_queue = None self._command_queue = None
self._replay_recorder = None self._replay_recorder = None
self._command_recorder = None self._command_recorder = None
self._task_handler = None
@property @property
def service(self): def service(self):
...@@ -85,6 +87,12 @@ class Coco: ...@@ -85,6 +87,12 @@ class Coco:
self._httpd = HttpServer(self) self._httpd = HttpServer(self)
return self._httpd return self._httpd
@property
def task_handler(self):
if self._task_handler is None:
self._task_handler = TaskHandler(self)
return self._task_handler
def make_logger(self): def make_logger(self):
create_logger(self) create_logger(self)
...@@ -118,6 +126,10 @@ class Coco: ...@@ -118,6 +126,10 @@ class Coco:
else: else:
return True return True
def handle_task(self, tasks):
for task in tasks:
self.task_handler.handle(task)
def keep_heartbeat(self): def keep_heartbeat(self):
def func(): def func():
while not self.stop_evt.is_set(): while not self.stop_evt.is_set():
...@@ -175,9 +187,6 @@ class Coco: ...@@ -175,9 +187,6 @@ class Coco:
thread = threading.Thread(target=func) thread = threading.Thread(target=func)
thread.start() thread.start()
def handle_task(self, tasks):
pass
def run_forever(self): def run_forever(self):
self.bootstrap() self.bootstrap()
print(time.ctime()) print(time.ctime())
...@@ -245,6 +254,7 @@ class Coco: ...@@ -245,6 +254,7 @@ class Coco:
self.sessions.remove(session) self.sessions.remove(session)
self.put_command_done_queue(session) self.put_command_done_queue(session)
self.put_replay_done_queue(session) self.put_replay_done_queue(session)
break
else: else:
time.sleep(1) time.sleep(1)
......
...@@ -137,7 +137,7 @@ class Session: ...@@ -137,7 +137,7 @@ class Session:
def to_json(self): def to_json(self):
return { return {
"id": self.id, "uuid": self.id,
"user": self.client.user.username, "user": self.client.user.username,
"asset": self.server.asset.hostname, "asset": self.server.asset.hostname,
"system_user": self.server.system_user.username, "system_user": self.server.system_user.username,
......
# coding: utf-8
import weakref
import logging
logger = logging.getLogger(__file__)
class TaskHandler:
def __init__(self, app):
self._app = weakref.ref(app)
@property
def app(self):
return self._app()
def handle_kill_session(self, task):
logger.info("Handle kill session task: {}".format(task.args))
session_id = task.args
session = None
for s in self.app.sessions:
if s.id == session_id:
session = s
break
if session:
session.close()
self.app.service.finish_task(task.id)
def handle(self, task):
if task.name == "kill_session":
self.handle_kill_session(task)
else:
logger.error("No handler for this task: {}".format(task.name))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment