Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
coco
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ops
coco
Commits
8a8c81b6
Commit
8a8c81b6
authored
Dec 20, 2017
by
i317280
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev' of github.com:jumpserver/coco into dev
parents
ff68b830
859cc410
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
188 additions
and
216 deletions
+188
-216
app.py
coco/app.py
+12
-102
httpd.py
coco/httpd.py
+1
-3
interactive.py
coco/interactive.py
+5
-5
logger.py
coco/logger.py
+6
-5
models.py
coco/models.py
+7
-12
proxy.py
coco/proxy.py
+10
-5
recorder.py
coco/recorder.py
+107
-63
session.py
coco/session.py
+40
-18
conf_example.py
conf_example.py
+0
-3
No files found.
coco/app.py
View file @
8a8c81b6
...
@@ -14,9 +14,8 @@ from .config import Config
...
@@ -14,9 +14,8 @@ from .config import Config
from
.sshd
import
SSHServer
from
.sshd
import
SSHServer
from
.httpd
import
HttpServer
from
.httpd
import
HttpServer
from
.logger
import
create_logger
from
.logger
import
create_logger
from
.alignment
import
get_queue
from
.record
import
get_recorder
,
START_SENTINEL
,
END_SENTINEL
from
.tasks
import
TaskHandler
from
.tasks
import
TaskHandler
from
.recorder
import
get_command_recorder_class
,
get_replay_recorder_class
__version__
=
'0.4.0'
__version__
=
'0.4.0'
...
@@ -38,7 +37,7 @@ class Coco:
...
@@ -38,7 +37,7 @@ class Coco:
'ACCESS_KEY_ENV'
:
'COCO_ACCESS_KEY'
,
'ACCESS_KEY_ENV'
:
'COCO_ACCESS_KEY'
,
'ACCESS_KEY_FILE'
:
os
.
path
.
join
(
BASE_DIR
,
'keys'
,
'.access_key'
),
'ACCESS_KEY_FILE'
:
os
.
path
.
join
(
BASE_DIR
,
'keys'
,
'.access_key'
),
'SECRET_KEY'
:
None
,
'SECRET_KEY'
:
None
,
'LOG_LEVEL'
:
'
INFO
'
,
'LOG_LEVEL'
:
'
DEBUG
'
,
'LOG_DIR'
:
os
.
path
.
join
(
BASE_DIR
,
'logs'
),
'LOG_DIR'
:
os
.
path
.
join
(
BASE_DIR
,
'logs'
),
'SESSION_DIR'
:
os
.
path
.
join
(
BASE_DIR
,
'sessions'
),
'SESSION_DIR'
:
os
.
path
.
join
(
BASE_DIR
,
'sessions'
),
'ASSET_LIST_SORT_BY'
:
'hostname'
,
# hostname, ip
'ASSET_LIST_SORT_BY'
:
'hostname'
,
# hostname, ip
...
@@ -49,11 +48,6 @@ class Coco:
...
@@ -49,11 +48,6 @@ class Coco:
'ADMINS'
:
''
,
'ADMINS'
:
''
,
'REPLAY_RECORD_ENGINE'
:
'server'
,
# local, server
'REPLAY_RECORD_ENGINE'
:
'server'
,
# local, server
'COMMAND_RECORD_ENGINE'
:
'server'
,
# local, server, elasticsearch(not yet)
'COMMAND_RECORD_ENGINE'
:
'server'
,
# local, server, elasticsearch(not yet)
'QUEUE_ENGINE'
:
'memory'
,
'QUEUE_MAX_SIZE'
:
0
,
'MAX_PUSH_THREADS'
:
5
,
'MAX_RECORD_INPUT_LENGTH'
:
128
,
'MAX_RECORD_OUTPUT_LENGTH'
:
1024
,
}
}
def
__init__
(
self
,
name
=
None
,
root_path
=
None
):
def
__init__
(
self
,
name
=
None
,
root_path
=
None
):
...
@@ -67,10 +61,8 @@ class Coco:
...
@@ -67,10 +61,8 @@ class Coco:
self
.
_service
=
None
self
.
_service
=
None
self
.
_sshd
=
None
self
.
_sshd
=
None
self
.
_httpd
=
None
self
.
_httpd
=
None
self
.
_replay_queue
=
None
self
.
replay_recorder_class
=
None
self
.
_command_queue
=
None
self
.
command_recorder_class
=
None
self
.
_replay_recorder
=
None
self
.
_command_recorder
=
None
self
.
_task_handler
=
None
self
.
_task_handler
=
None
@property
@property
...
@@ -104,20 +96,22 @@ class Coco:
...
@@ -104,20 +96,22 @@ class Coco:
def
load_extra_conf_from_server
(
self
):
def
load_extra_conf_from_server
(
self
):
pass
pass
def
initial_queue
(
self
):
self
.
_replay_queue
,
self
.
_command_queue
=
get_queue
(
self
.
config
)
def
initial_recorder
(
self
):
def
initial_recorder
(
self
):
self
.
_replay_recorder
,
self
.
_command_recorder
=
get_recorder
(
self
)
self
.
replay_recorder_class
=
get_replay_recorder_class
(
self
)
self
.
command_recorder_class
=
get_command_recorder_class
(
self
)
def
new_command_recorder
(
self
):
return
self
.
command_recorder_class
(
self
)
def
new_replay_recorder
(
self
):
return
self
.
replay_recorder_class
(
self
)
def
bootstrap
(
self
):
def
bootstrap
(
self
):
self
.
make_logger
()
self
.
make_logger
()
self
.
service
.
initial
()
self
.
service
.
initial
()
self
.
load_extra_conf_from_server
()
self
.
load_extra_conf_from_server
()
self
.
initial_queue
()
self
.
initial_recorder
()
self
.
initial_recorder
()
self
.
keep_heartbeat
()
self
.
keep_heartbeat
()
self
.
keep_push_record
()
self
.
monitor_sessions
()
self
.
monitor_sessions
()
def
heartbeat
(
self
):
def
heartbeat
(
self
):
...
@@ -143,29 +137,6 @@ class Coco:
...
@@ -143,29 +137,6 @@ class Coco:
thread
=
threading
.
Thread
(
target
=
func
)
thread
=
threading
.
Thread
(
target
=
func
)
thread
.
start
()
thread
.
start
()
def
keep_push_record
(
self
):
threads
=
[]
def
worker
(
q
,
callback
,
size
=
10
):
while
not
self
.
stop_evt
.
is_set
():
data_set
=
q
.
mget
(
size
)
if
data_set
and
not
callback
(
data_set
):
q
.
mput
(
data_set
)
for
i
in
range
(
self
.
config
[
'MAX_PUSH_THREADS'
]):
t
=
threading
.
Thread
(
target
=
worker
,
args
=
(
self
.
_command_queue
,
self
.
_command_recorder
.
record_command
,
))
threads
.
append
(
t
)
t
=
threading
.
Thread
(
target
=
worker
,
args
=
(
self
.
_replay_queue
,
self
.
_replay_recorder
.
record_replay
,
))
threads
.
append
(
t
)
for
t
in
threads
:
t
.
start
()
logger
.
info
(
"Start push record process: {}"
.
format
(
t
))
def
monitor_sessions
(
self
):
def
monitor_sessions
(
self
):
interval
=
self
.
config
[
"HEARTBEAT_INTERVAL"
]
interval
=
self
.
config
[
"HEARTBEAT_INTERVAL"
]
...
@@ -241,8 +212,6 @@ class Coco:
...
@@ -241,8 +212,6 @@ class Coco:
with
self
.
lock
:
with
self
.
lock
:
self
.
sessions
.
append
(
session
)
self
.
sessions
.
append
(
session
)
self
.
heartbeat
()
self
.
heartbeat
()
self
.
put_command_start_queue
(
session
)
self
.
put_replay_start_queue
(
session
)
def
remove_session
(
self
,
session
):
def
remove_session
(
self
,
session
):
with
self
.
lock
:
with
self
.
lock
:
...
@@ -250,66 +219,7 @@ class Coco:
...
@@ -250,66 +219,7 @@ class Coco:
for
i
in
range
(
10
):
for
i
in
range
(
10
):
if
self
.
heartbeat
():
if
self
.
heartbeat
():
self
.
sessions
.
remove
(
session
)
self
.
sessions
.
remove
(
session
)
self
.
put_command_done_queue
(
session
)
self
.
put_replay_done_queue
(
session
)
break
break
else
:
else
:
time
.
sleep
(
1
)
time
.
sleep
(
1
)
def
put_replay_queue
(
self
,
session
,
data
):
logger
.
info
(
"Put replay data: {} {}"
.
format
(
session
,
data
))
self
.
_replay_queue
.
put
({
"session"
:
session
.
id
,
"data"
:
data
,
"timestamp"
:
time
.
time
()
})
def
put_replay_start_queue
(
self
,
session
):
self
.
_replay_queue
.
put
({
"session"
:
session
.
id
,
"data"
:
START_SENTINEL
,
"timestamp"
:
time
.
time
()
})
def
put_replay_done_queue
(
self
,
session
):
self
.
_replay_queue
.
put
({
"session"
:
session
.
id
,
"data"
:
END_SENTINEL
,
"timestamp"
:
time
.
time
()
})
def
put_command_queue
(
self
,
session
,
_input
,
_output
):
logger
.
debug
(
"Put command data: {} {} {}"
.
format
(
session
,
_input
,
_output
))
if
not
_input
:
return
self
.
_command_queue
.
put
({
"session"
:
session
.
id
,
"input"
:
_input
[:
128
],
"output"
:
_output
[:
1024
],
"user"
:
session
.
client
.
user
.
username
,
"asset"
:
session
.
server
.
asset
.
hostname
,
"system_user"
:
session
.
server
.
system_user
.
username
,
"timestamp"
:
int
(
time
.
time
())
})
def
put_command_start_queue
(
self
,
session
):
self
.
_command_queue
.
put
({
"session"
:
session
.
id
,
"input"
:
START_SENTINEL
,
"output"
:
START_SENTINEL
,
"user"
:
session
.
client
.
user
.
username
,
"asset"
:
session
.
server
.
asset
.
hostname
,
"system_user"
:
session
.
server
.
system_user
.
username
,
"timestamp"
:
int
(
time
.
time
())
})
def
put_command_done_queue
(
self
,
session
):
self
.
_command_queue
.
put
({
"session"
:
session
.
id
,
"input"
:
END_SENTINEL
,
"output"
:
END_SENTINEL
,
"user"
:
session
.
client
.
user
.
username
,
"asset"
:
session
.
server
.
asset
.
hostname
,
"system_user"
:
session
.
server
.
system_user
.
username
,
"timestamp"
:
int
(
time
.
time
())
})
coco/httpd.py
View file @
8a8c81b6
#!/usr/bin/env python3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
#
#
import
io
import
os
import
os
import
paramiko
import
logging
import
logging
import
socket
import
socket
from
flask_socketio
import
SocketIO
,
Namespace
,
emit
,
join_room
,
leave_room
from
flask_socketio
import
SocketIO
,
Namespace
,
emit
,
join_room
,
leave_room
...
@@ -13,7 +11,7 @@ import uuid
...
@@ -13,7 +11,7 @@ import uuid
# Todo: Remove for future
# Todo: Remove for future
from
jms.models
import
User
from
jms.models
import
User
from
.models
import
Request
,
Client
,
WSProxy
from
.models
import
Request
,
Client
,
WSProxy
from
.
forward
import
ProxyServer
from
.
proxy
import
ProxyServer
__version__
=
'0.4.0'
__version__
=
'0.4.0'
BASE_DIR
=
os
.
path
.
dirname
(
os
.
path
.
dirname
(
__file__
))
BASE_DIR
=
os
.
path
.
dirname
(
os
.
path
.
dirname
(
__file__
))
...
...
coco/interactive.py
View file @
8a8c81b6
...
@@ -15,7 +15,7 @@ from .utils import wrap_with_line_feed as wr, wrap_with_title as title, \
...
@@ -15,7 +15,7 @@ from .utils import wrap_with_line_feed as wr, wrap_with_title as title, \
wrap_with_primary
as
primary
,
wrap_with_warning
as
warning
,
\
wrap_with_primary
as
primary
,
wrap_with_warning
as
warning
,
\
is_obj_attr_has
,
is_obj_attr_eq
,
sort_assets
,
TtyIOParser
,
\
is_obj_attr_has
,
is_obj_attr_eq
,
sort_assets
,
TtyIOParser
,
\
ugettext
as
_
ugettext
as
_
from
.
forward
import
ProxyServer
from
.
proxy
import
ProxyServer
logger
=
logging
.
getLogger
(
__file__
)
logger
=
logging
.
getLogger
(
__file__
)
...
@@ -115,15 +115,15 @@ class InteractiveServer:
...
@@ -115,15 +115,15 @@ class InteractiveServer:
return
self
.
_sentinel
return
self
.
_sentinel
elif
opt
.
startswith
(
"/"
):
elif
opt
.
startswith
(
"/"
):
self
.
search_and_display
(
opt
.
lstrip
(
"/"
))
self
.
search_and_display
(
opt
.
lstrip
(
"/"
))
elif
opt
in
[
'p'
,
'P'
,
'3'
]:
elif
opt
in
[
'p'
,
'P'
]:
self
.
display_assets
()
self
.
display_assets
()
elif
opt
in
[
'g'
,
'G'
,
'4'
]:
elif
opt
in
[
'g'
,
'G'
]:
self
.
display_asset_groups
()
self
.
display_asset_groups
()
elif
opt
.
startswith
(
"g"
)
and
opt
.
lstrip
(
"g"
)
.
isdigit
():
elif
opt
.
startswith
(
"g"
)
and
opt
.
lstrip
(
"g"
)
.
isdigit
():
self
.
display_group_assets
(
int
(
opt
.
lstrip
(
"g"
)))
self
.
display_group_assets
(
int
(
opt
.
lstrip
(
"g"
)))
elif
opt
in
[
'q'
,
'Q'
,
'0'
]:
elif
opt
in
[
'q'
,
'Q'
]:
return
self
.
_sentinel
return
self
.
_sentinel
elif
opt
in
[
'h'
,
'H'
,
'9'
]:
elif
opt
in
[
'h'
,
'H'
]:
self
.
display_banner
()
self
.
display_banner
()
else
:
else
:
self
.
search_and_proxy
(
opt
)
self
.
search_and_proxy
(
opt
)
...
...
coco/logger.py
View file @
8a8c81b6
...
@@ -26,14 +26,14 @@ def create_logger(app):
...
@@ -26,14 +26,14 @@ def create_logger(app):
log_path
=
os
.
path
.
join
(
log_dir
,
'coco.log'
)
log_path
=
os
.
path
.
join
(
log_dir
,
'coco.log'
)
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(module)s %(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
main_formatter
=
logging
.
Formatter
(
main_formatter
=
logging
.
Formatter
(
fmt
=
'
%(asctime)
s [
%(levelname)
s]
%(message)
s'
,
fmt
=
'
%(asctime)
s [
%(
module)
s
%(
levelname)
s]
%(message)
s'
,
datefmt
=
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
datefmt
=
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
)
# main_formatter = logging.Formatter(
# fmt='%(asctime)s [%(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
console_handler
=
StreamHandler
()
console_handler
=
StreamHandler
()
file_handler
=
TimedRotatingFileHandler
(
file_handler
=
TimedRotatingFileHandler
(
filename
=
log_path
,
when
=
'D'
,
backupCount
=
10
filename
=
log_path
,
when
=
'D'
,
backupCount
=
10
...
@@ -43,3 +43,4 @@ def create_logger(app):
...
@@ -43,3 +43,4 @@ def create_logger(app):
handler
.
setFormatter
(
main_formatter
)
handler
.
setFormatter
(
main_formatter
)
logger
.
addHandler
(
handler
)
logger
.
addHandler
(
handler
)
logger
.
setLevel
(
level
)
logger
.
setLevel
(
level
)
logging
.
getLogger
(
"requests"
)
.
setLevel
(
logging
.
WARNING
)
coco/models.py
View file @
8a8c81b6
...
@@ -8,7 +8,6 @@ import weakref
...
@@ -8,7 +8,6 @@ import weakref
from
.
import
char
from
.
import
char
from
.
import
utils
from
.
import
utils
from
.record
import
START_SENTINEL
,
END_SENTINEL
BUF_SIZE
=
4096
BUF_SIZE
=
4096
logger
=
logging
.
getLogger
(
__file__
)
logger
=
logging
.
getLogger
(
__file__
)
...
@@ -87,24 +86,23 @@ class Server:
...
@@ -87,24 +86,23 @@ class Server:
self
.
_input_initial
=
False
self
.
_input_initial
=
False
self
.
_in_vim_state
=
False
self
.
_in_vim_state
=
False
self
.
filters
=
[]
self
.
_input
=
""
self
.
_input
=
""
self
.
_output
=
""
self
.
_output
=
""
self
.
_session_ref
=
None
self
.
_session_ref
=
None
@property
def
session
(
self
):
return
self
.
_session_ref
()
if
self
.
_session_ref
is
not
None
else
None
def
add_filter
(
self
,
_filter
):
self
.
filters
.
append
(
_filter
)
def
fileno
(
self
):
def
fileno
(
self
):
return
self
.
chan
.
fileno
()
return
self
.
chan
.
fileno
()
def
set_session
(
self
,
session
):
def
set_session
(
self
,
session
):
self
.
_session_ref
=
weakref
.
ref
(
session
)
self
.
_session_ref
=
weakref
.
ref
(
session
)
@property
def
session
(
self
):
if
self
.
_session_ref
:
return
self
.
_session_ref
()
else
:
return
None
def
send
(
self
,
b
):
def
send
(
self
,
b
):
if
isinstance
(
b
,
str
):
if
isinstance
(
b
,
str
):
b
=
b
.
encode
(
"utf-8"
)
b
=
b
.
encode
(
"utf-8"
)
...
@@ -125,14 +123,11 @@ class Server:
...
@@ -125,14 +123,11 @@ class Server:
self
.
session
.
put_command
(
self
.
_input
,
self
.
_output
)
self
.
session
.
put_command
(
self
.
_input
,
self
.
_output
)
del
self
.
input_data
[:]
del
self
.
input_data
[:]
del
self
.
output_data
[:]
del
self
.
output_data
[:]
# self._input = ""
# self._output = ""
self
.
_in_input_state
=
True
self
.
_in_input_state
=
True
return
self
.
chan
.
send
(
b
)
return
self
.
chan
.
send
(
b
)
def
recv
(
self
,
size
):
def
recv
(
self
,
size
):
data
=
self
.
chan
.
recv
(
size
)
data
=
self
.
chan
.
recv
(
size
)
self
.
session
.
put_replay
(
data
)
if
self
.
_input_initial
:
if
self
.
_input_initial
:
if
self
.
_in_input_state
:
if
self
.
_in_input_state
:
self
.
input_data
.
append
(
data
)
self
.
input_data
.
append
(
data
)
...
...
coco/
forward
.py
→
coco/
proxy
.py
View file @
8a8c81b6
...
@@ -27,7 +27,6 @@ class ProxyServer:
...
@@ -27,7 +27,6 @@ class ProxyServer:
self
.
request
=
client
.
request
self
.
request
=
client
.
request
self
.
server
=
None
self
.
server
=
None
self
.
connecting
=
True
self
.
connecting
=
True
self
.
session
=
None
@property
@property
def
app
(
self
):
def
app
(
self
):
...
@@ -38,11 +37,17 @@ class ProxyServer:
...
@@ -38,11 +37,17 @@ class ProxyServer:
self
.
server
=
self
.
get_server_conn
(
asset
,
system_user
)
self
.
server
=
self
.
get_server_conn
(
asset
,
system_user
)
if
self
.
server
is
None
:
if
self
.
server
is
None
:
return
return
self
.
session
=
Session
(
self
.
app
,
self
.
client
,
self
.
server
)
command_recorder
=
self
.
app
.
new_command_recorder
()
self
.
app
.
add_session
(
self
.
session
)
replay_recorder
=
self
.
app
.
new_replay_recorder
()
session
=
Session
(
self
.
client
,
self
.
server
,
command_recorder
=
command_recorder
,
replay_recorder
=
replay_recorder
,
)
self
.
app
.
add_session
(
session
)
self
.
watch_win_size_change_async
()
self
.
watch_win_size_change_async
()
se
lf
.
se
ssion
.
bridge
()
session
.
bridge
()
self
.
app
.
remove_session
(
se
lf
.
se
ssion
)
self
.
app
.
remove_session
(
session
)
def
validate_permission
(
self
,
asset
,
system_user
):
def
validate_permission
(
self
,
asset
,
system_user
):
"""
"""
...
...
coco/record.py
→
coco/record
er
.py
View file @
8a8c81b6
...
@@ -4,80 +4,92 @@
...
@@ -4,80 +4,92 @@
import
abc
import
abc
import
logging
import
logging
import
threading
import
os
from
.alignment
import
MemoryQueue
logger
=
logging
.
getLogger
(
__file__
)
logger
=
logging
.
getLogger
(
__file__
)
BUF_SIZE
=
1024
BUF_SIZE
=
1024
START_SENTINEL
=
object
()
END_SENTINEL
=
object
()
class
Singleton
(
type
):
def
__init__
(
cls
,
*
args
,
**
kwargs
):
cls
.
__instance
=
None
super
()
.
__init__
(
*
args
,
**
kwargs
)
def
__call__
(
cls
,
*
args
,
**
kwargs
):
if
cls
.
__instance
is
None
:
cls
.
__instance
=
super
()
.
__call__
(
*
args
,
**
kwargs
)
return
cls
.
__instance
else
:
return
cls
.
__instance
class
ReplayRecorder
(
metaclass
=
abc
.
ABCMeta
):
class
ReplayRecorder
(
metaclass
=
abc
.
ABCMeta
):
def
__init__
(
self
,
app
):
def
__init__
(
self
,
app
,
session
=
None
):
self
.
app
=
app
self
.
app
=
app
self
.
session
=
session
@abc.abstractmethod
@abc.abstractmethod
def
record
_replay
(
self
,
data_set
):
def
record
(
self
,
data
):
"""
"""
记录replay数据
记录replay数据
:param data_set: 数据集 [{"session": "", "data": "", "timestamp": ""},]
:param data: 数据 {
"session": "",
"data": "",
"timestamp": ""
}
:return:
:return:
"""
"""
for
data
in
data_set
:
if
data
[
"data"
]
is
START_SENTINEL
:
data_set
.
remove
(
data
)
self
.
session_start
(
data
[
"session"
])
if
data
[
"data"
]
is
END_SENTINEL
:
data_set
.
remove
(
data
)
self
.
session_end
(
data
[
"session"
])
@abc.abstractmethod
@abc.abstractmethod
def
session_start
(
self
,
session_id
):
def
session_start
(
self
,
session_id
):
print
(
"Session start
"
)
print
(
"Session start
: {}"
.
format
(
session_id
)
)
pass
pass
@abc.abstractmethod
@abc.abstractmethod
def
session_end
(
self
,
session_id
):
def
session_end
(
self
,
session_id
):
print
(
"Session end: {}"
.
format
(
session_id
))
pass
pass
class
CommandRecorder
(
metaclass
=
abc
.
ABCMeta
)
:
class
CommandRecorder
:
def
__init__
(
self
,
app
):
def
__init__
(
self
,
app
,
session
=
None
):
self
.
app
=
app
self
.
app
=
app
self
.
session
=
session
@abc.abstractmethod
def
record
(
self
,
data
):
def
record_command
(
self
,
data_set
):
"""
"""
:param data_set: 数据集
:param data: 数据 {
[("session", "input", "output", "user",
"session":
"asset", "system_user", "timestamp"),]
"input":
"output":
"user":
"asset":
"system_user":
"timestamp":
}
:return:
:return:
"""
"""
for
data
in
data_set
:
if
data
[
"input"
]
is
START_SENTINEL
:
data_set
.
remove
(
data
)
self
.
session_start
(
data
[
"session"
])
if
data
[
"input"
]
is
END_SENTINEL
:
data_set
.
remove
(
data
)
self
.
session_end
(
data
[
"session"
])
@abc.abstractmethod
def
session_start
(
self
,
session_id
):
def
session_start
(
self
,
session_id
):
print
(
"Session start: {}"
.
format
(
session_id
))
pass
pass
@abc.abstractmethod
def
session_end
(
self
,
session_id
):
def
session_end
(
self
,
session_id
):
print
(
"Session end: {}"
.
format
(
session_id
))
pass
pass
class
ServerReplayRecorder
(
ReplayRecorder
):
class
ServerReplayRecorder
(
ReplayRecorder
):
filelist
=
dict
()
def
__init__
(
self
,
app
):
super
()
.
__init__
(
app
)
self
.
file
=
None
def
record
_replay
(
self
,
data_set
):
def
record
(
self
,
data
):
"""
"""
:param data
_set
:
:param data:
[{
[{
"session": session.id,
"session": session.id,
"data": data,
"data": data,
...
@@ -86,52 +98,84 @@ class ServerReplayRecorder(ReplayRecorder):
...
@@ -86,52 +98,84 @@ class ServerReplayRecorder(ReplayRecorder):
:return:
:return:
"""
"""
# Todo: <liuzheng712@gmail.com>
# Todo: <liuzheng712@gmail.com>
super
()
.
record_replay
(
data_set
)
self
.
file
.
write
(
data
)
for
data
in
data_set
:
try
:
ServerReplayRecorder
.
filelist
[
data
[
"session"
]]
.
write
(
str
(
data
)
+
'
\n
'
)
except
KeyError
:
logger
.
error
(
"session ({})file does not exist!"
.
format
(
data
[
"session"
]))
except
ValueError
:
logger
.
error
(
"session ({}) file cloesd!"
.
format
(
data
[
"session"
]))
return
True
def
session_start
(
self
,
session_id
):
def
session_start
(
self
,
session_id
):
ServerReplayRecorder
.
filelist
[
session_id
]
=
open
(
'logs/'
+
session_id
+
'.log'
,
'a'
)
self
.
file
=
open
(
os
.
path
.
join
(
print
(
"When session {} start exec"
.
format
(
session_id
))
self
.
app
.
config
[
'LOG_DIR'
],
session_id
+
'.replay'
),
'a'
)
def
session_end
(
self
,
session_id
):
def
session_end
(
self
,
session_id
):
ServerReplayRecorder
.
filelist
[
session_id
]
.
close
()
self
.
file
.
close
()
# Todo: upload the file
print
(
"When session {} end start"
.
format
(
session_id
))
def
push_to_server
(
self
):
pass
class
ServerCommandRecorder
(
CommandRecorder
):
def
__del__
(
self
):
def
record_command
(
self
,
data_set
):
print
(
"{} has been gc"
.
format
(
self
))
del
self
.
file
class
ServerCommandRecorder
(
CommandRecorder
,
metaclass
=
Singleton
):
batch_size
=
10
timeout
=
5
no
=
0
def
__init__
(
self
,
app
):
super
()
.
__init__
(
app
)
self
.
queue
=
MemoryQueue
()
self
.
stop_evt
=
threading
.
Event
()
self
.
push_to_server_async
()
self
.
__class__
.
no
+=
1
def
record
(
self
,
data
):
if
data
and
data
[
'input'
]:
data
[
'input'
]
=
data
[
'input'
][:
128
]
data
[
'output'
]
=
data
[
'output'
][:
1024
]
data
[
'timestamp'
]
=
int
(
data
[
'timestamp'
])
self
.
queue
.
put
(
data
)
def
push_to_server_async
(
self
):
def
func
():
while
not
self
.
stop_evt
.
is_set
():
data_set
=
self
.
queue
.
mget
(
self
.
batch_size
,
timeout
=
self
.
timeout
)
logger
.
debug
(
"<Session command recorder {}> queue size: {}"
.
format
(
self
.
no
,
self
.
queue
.
qsize
())
)
if
not
data_set
:
if
not
data_set
:
return
True
continue
super
()
.
record_command
(
data_set
)
logger
.
debug
(
"Send {} commands to server"
.
format
(
len
(
data_set
)))
return
self
.
app
.
service
.
push_session_command
(
data_set
)
ok
=
self
.
app
.
service
.
push_session_command
(
data_set
)
if
not
ok
:
self
.
queue
.
mput
(
data_set
)
thread
=
threading
.
Thread
(
target
=
func
)
thread
.
daemon
=
True
thread
.
start
()
def
session_start
(
self
,
session_id
):
def
session_start
(
self
,
session_id
):
print
(
"When session {} start exec"
.
format
(
session_id
))
print
(
"When session {} start exec"
.
format
(
session_id
))
def
session_end
(
self
,
session_id
):
def
session_end
(
self
,
session_id
):
self
.
stop_evt
.
set
()
print
(
"When session {} end start"
.
format
(
session_id
))
print
(
"When session {} end start"
.
format
(
session_id
))
def
__del__
(
self
):
print
(
"{} has been gc"
.
format
(
self
))
def
get_recorder
(
app
):
replay_engine
=
app
.
config
[
"REPLAY_RECORD_ENGINE"
]
command_engine
=
app
.
config
[
"COMMAND_RECORD_ENGINE"
]
if
replay_engine
==
"server"
:
def
get_command_recorder_class
(
app
):
replay_recorder
=
ServerReplayRecorder
(
app
)
command_engine
=
app
.
config
[
"COMMAND_RECORD_ENGINE"
]
else
:
replay_recorder
=
ServerReplayRecorder
(
app
)
if
command_engine
==
"server"
:
if
command_engine
==
"server"
:
command_recorder
=
ServerCommandRecorder
(
app
)
return
ServerCommandRecorder
else
:
else
:
command_recorder
=
ServerCommandRecorder
(
app
)
return
ServerCommandRecorder
return
replay_recorder
,
command_recorder
def
get_replay_recorder_class
(
app
):
replay_engine
=
app
.
config
[
"REPLAY_RECORD_ENGINE"
]
if
replay_engine
==
"server"
:
return
ServerReplayRecorder
else
:
return
ServerReplayRecorder
coco/session.py
View file @
8a8c81b6
#!/usr/bin/env python3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
#
#
import
threading
import
threading
import
uuid
import
uuid
import
logging
import
logging
import
datetime
import
datetime
import
selectors
import
selectors
import
weakref
import
time
BUF_SIZE
=
1024
BUF_SIZE
=
1024
...
@@ -16,9 +15,8 @@ logger = logging.getLogger(__file__)
...
@@ -16,9 +15,8 @@ logger = logging.getLogger(__file__)
class
Session
:
class
Session
:
def
__init__
(
self
,
app
,
client
,
server
):
def
__init__
(
self
,
client
,
server
,
command_recorder
=
None
,
replay_recorder
=
None
):
self
.
id
=
str
(
uuid
.
uuid4
())
self
.
id
=
str
(
uuid
.
uuid4
())
self
.
_app
=
weakref
.
ref
(
app
)
self
.
client
=
client
# Master of the session, it's a client sock
self
.
client
=
client
# Master of the session, it's a client sock
self
.
server
=
server
# Server channel
self
.
server
=
server
# Server channel
self
.
_watchers
=
[]
# Only watch session
self
.
_watchers
=
[]
# Only watch session
...
@@ -28,15 +26,9 @@ class Session:
...
@@ -28,15 +26,9 @@ class Session:
self
.
date_finished
=
None
self
.
date_finished
=
None
self
.
stop_evt
=
threading
.
Event
()
self
.
stop_evt
=
threading
.
Event
()
self
.
sel
=
selectors
.
DefaultSelector
()
self
.
sel
=
selectors
.
DefaultSelector
()
self
.
_command_recorder
=
command_recorder
self
.
_replay_recorder
=
replay_recorder
self
.
server
.
set_session
(
self
)
self
.
server
.
set_session
(
self
)
self
.
_replay_queue
=
None
self
.
_command_queue
=
None
self
.
_replay_recorder
=
None
self
.
_command_recorder
=
None
@property
def
app
(
self
):
return
self
.
_app
()
def
add_watcher
(
self
,
watcher
,
silent
=
False
):
def
add_watcher
(
self
,
watcher
,
silent
=
False
):
"""
"""
...
@@ -79,6 +71,40 @@ class Session:
...
@@ -79,6 +71,40 @@ class Session:
self
.
sel
.
unregister
(
sharer
)
self
.
sel
.
unregister
(
sharer
)
self
.
_sharers
.
remove
(
sharer
)
self
.
_sharers
.
remove
(
sharer
)
def
set_command_recorder
(
self
,
recorder
):
self
.
_command_recorder
=
recorder
def
set_replay_recorder
(
self
,
recorder
):
self
.
_replay_recorder
=
recorder
def
put_command
(
self
,
_input
,
_output
):
if
not
_input
:
return
self
.
_command_recorder
.
record
({
"session"
:
self
.
id
,
"input"
:
_input
,
"output"
:
_output
,
"user"
:
self
.
client
.
user
.
username
,
"asset"
:
self
.
server
.
asset
.
hostname
,
"system_user"
:
self
.
server
.
system_user
.
username
,
"timestamp"
:
time
.
time
(),
})
def
put_replay
(
self
,
data
):
self
.
_replay_recorder
.
record
({
"session"
:
self
.
id
,
"data"
:
data
,
"timestamp"
:
time
.
time
(),
})
def
pre_bridge
(
self
):
self
.
_replay_recorder
.
session_start
(
self
.
id
)
self
.
_command_recorder
.
session_start
(
self
.
id
)
def
post_bridge
(
self
):
self
.
_replay_recorder
.
session_end
(
self
.
id
)
self
.
_command_recorder
.
session_end
(
self
.
id
)
def
terminate
(
self
):
def
terminate
(
self
):
msg
=
b
"Terminate by administrator
\r\n
"
msg
=
b
"Terminate by administrator
\r\n
"
self
.
client
.
send
(
msg
)
self
.
client
.
send
(
msg
)
...
@@ -90,6 +116,7 @@ class Session:
...
@@ -90,6 +116,7 @@ class Session:
:return:
:return:
"""
"""
logger
.
info
(
"Start bridge session: {}"
.
format
(
self
.
id
))
logger
.
info
(
"Start bridge session: {}"
.
format
(
self
.
id
))
self
.
pre_bridge
()
self
.
sel
.
register
(
self
.
client
,
selectors
.
EVENT_READ
)
self
.
sel
.
register
(
self
.
client
,
selectors
.
EVENT_READ
)
self
.
sel
.
register
(
self
.
server
,
selectors
.
EVENT_READ
)
self
.
sel
.
register
(
self
.
server
,
selectors
.
EVENT_READ
)
while
not
self
.
stop_evt
.
is_set
():
while
not
self
.
stop_evt
.
is_set
():
...
@@ -130,15 +157,10 @@ class Session:
...
@@ -130,15 +157,10 @@ class Session:
logger
.
debug
(
"Resize server chan size {}*{}"
.
format
(
width
,
height
))
logger
.
debug
(
"Resize server chan size {}*{}"
.
format
(
width
,
height
))
self
.
server
.
resize_pty
(
width
=
width
,
height
=
height
)
self
.
server
.
resize_pty
(
width
=
width
,
height
=
height
)
def
put_command
(
self
,
_input
,
_output
):
self
.
app
.
put_command_queue
(
self
,
_input
,
_output
)
def
put_replay
(
self
,
data
):
self
.
app
.
put_replay_queue
(
self
,
data
)
def
close
(
self
):
def
close
(
self
):
logger
.
info
(
"Close the session: {} "
.
format
(
self
.
id
))
logger
.
info
(
"Close the session: {} "
.
format
(
self
.
id
))
self
.
stop_evt
.
set
()
self
.
stop_evt
.
set
()
self
.
post_bridge
()
self
.
date_finished
=
datetime
.
datetime
.
now
()
self
.
date_finished
=
datetime
.
datetime
.
now
()
self
.
server
.
close
()
self
.
server
.
close
()
...
...
conf_example.py
View file @
8a8c81b6
...
@@ -21,9 +21,6 @@ APP_NAME = "coco"
...
@@ -21,9 +21,6 @@ APP_NAME = "coco"
# 监听的HTTP/WS端口号,默认5000
# 监听的HTTP/WS端口号,默认5000
# HTTPD_PORT = 5000
# HTTPD_PORT = 5000
# 是否开启DEBUG
# DEBUG = True
# 项目使用的ACCESS KEY, 默认会注册,并保存到 ACCESS_KEY_STORE中,
# 项目使用的ACCESS KEY, 默认会注册,并保存到 ACCESS_KEY_STORE中,
# 如果有需求, 可以写到配置文件中, 格式 access_key_id:access_key_secret
# 如果有需求, 可以写到配置文件中, 格式 access_key_id:access_key_secret
# ACCESS_KEY = None
# ACCESS_KEY = None
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment