Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
coco
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ops
coco
Commits
fdd31846
Commit
fdd31846
authored
Apr 19, 2018
by
BaiJiangjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Feature] 支持OTP认证
parent
b2cd9d72
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
41 additions
and
14 deletions
+41
-14
interface.py
coco/interface.py
+41
-13
logo.txt
logo.txt
+0
-1
No files found.
coco/interface.py
View file @
fdd31846
...
...
@@ -24,6 +24,8 @@ class SSHInterface(paramiko.ServerInterface):
self
.
_request
=
weakref
.
ref
(
request
)
self
.
event
=
threading
.
Event
()
self
.
auth_valid
=
False
self
.
otp_auth
=
False
self
.
info
=
None
@property
def
app
(
self
):
...
...
@@ -35,58 +37,84 @@ class SSHInterface(paramiko.ServerInterface):
def
check_auth_interactive
(
self
,
username
,
submethods
):
logger
.
info
(
"Check auth interactive:
%
s
%
s"
%
(
username
,
submethods
))
return
paramiko
.
AUTH_FAILED
instructions
=
'Please enter 6 digits.'
interactive
=
paramiko
.
server
.
InteractiveQuery
(
instructions
=
instructions
)
interactive
.
add_prompt
(
prompt
=
'[OTP auth]: '
)
return
interactive
def
check_auth_interactive_response
(
self
,
responses
):
logger
.
info
(
"Check auth interactive response:
%
s "
%
responses
)
# TODO:MFA Auth
pass
otp_code
=
responses
[
0
]
if
not
otp_code
or
not
len
(
otp_code
)
==
6
or
not
otp_code
.
isdigit
():
return
paramiko
.
AUTH_FAILED
return
self
.
check_auth_otp
(
otp_code
)
def
check_auth_otp
(
self
,
otp_code
):
seed
=
self
.
info
.
get
(
'seed'
,
''
)
if
not
seed
:
return
paramiko
.
AUTH_FAILED
is_valid
=
self
.
app
.
service
.
authenticate_otp
(
seed
,
otp_code
)
if
is_valid
:
return
paramiko
.
AUTH_SUCCESSFUL
return
paramiko
.
AUTH_FAILED
def
enable_auth_gssapi
(
self
):
return
False
def
get_allowed_auths
(
self
,
username
):
supported
=
[]
if
self
.
otp_auth
:
return
'keyboard-interactive'
if
self
.
app
.
config
[
"PASSWORD_AUTH"
]:
supported
.
append
(
"password"
)
if
self
.
app
.
config
[
"PUBLIC_KEY_AUTH"
]:
supported
.
append
(
"publickey"
)
return
","
.
join
(
supported
)
def
check_auth_none
(
self
,
username
):
return
paramiko
.
AUTH_FAILED
def
check_auth_password
(
self
,
username
,
password
):
valid
=
self
.
validate_auth
(
username
,
password
=
password
)
if
not
valid
:
user
=
self
.
validate_auth
(
username
,
password
=
password
)
if
not
user
:
logger
.
warning
(
"Password and public key auth <
%
s> failed, reject it"
%
username
)
return
paramiko
.
AUTH_FAILED
else
:
logger
.
info
(
"Password auth <
%
s> success"
%
username
)
if
self
.
otp_auth
:
return
paramiko
.
AUTH_PARTIALLY_SUCCESSFUL
return
paramiko
.
AUTH_SUCCESSFUL
def
check_auth_publickey
(
self
,
username
,
key
):
key
=
key
.
get_base64
()
valid
=
self
.
validate_auth
(
username
,
public_key
=
key
)
if
not
valid
:
user
=
self
.
validate_auth
(
username
,
public_key
=
key
)
if
not
user
:
logger
.
debug
(
"Public key auth <
%
s> failed, try to password"
%
username
)
return
paramiko
.
AUTH_FAILED
else
:
logger
.
debug
(
"Public key auth <
%
s> success"
%
username
)
if
self
.
otp_auth
:
return
paramiko
.
AUTH_PARTIALLY_SUCCESSFUL
return
paramiko
.
AUTH_SUCCESSFUL
def
validate_auth
(
self
,
username
,
password
=
""
,
public_key
=
""
):
user
,
_
=
self
.
app
.
service
.
authenticate
(
info
=
self
.
app
.
service
.
authenticate
(
username
,
password
=
password
,
public_key
=
public_key
,
remote_addr
=
self
.
request
.
remote_ip
,
remote_addr
=
self
.
request
.
remote_ip
)
user
=
info
.
get
(
'user'
,
None
)
if
user
:
self
.
request
.
user
=
user
return
True
else
:
return
False
self
.
info
=
info
seed
=
info
.
get
(
'seed'
,
None
)
token
=
info
.
get
(
'token'
,
None
)
if
seed
and
not
token
:
self
.
otp_auth
=
True
return
user
def
check_channel_direct_tcpip_request
(
self
,
chanid
,
origin
,
destination
):
logger
.
debug
(
"Check channel direct tcpip request:
%
d
%
s
%
s"
%
...
...
logo.txt
View file @
fdd31846
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment