Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
coco
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ops
coco
Commits
67ff6b5c
Commit
67ff6b5c
authored
Apr 04, 2018
by
ibuler
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Update] 修改sftp
parent
515ece56
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
38 additions
and
67 deletions
+38
-67
sftp.py
coco/sftp.py
+38
-67
No files found.
coco/sftp.py
View file @
67ff6b5c
...
...
@@ -22,61 +22,34 @@ A stub SFTP server for loopback SFTP testing.
import
os
import
tempfile
from
paramiko
import
ServerInterface
,
SFTPServerInterface
,
SFTPServer
,
\
SFTPAttributes
,
SFTPHandle
,
SFTP_OK
,
AUTH_SUCCESSFUL
,
OPEN_SUCCEEDED
,
\
SFTPClient
import
time
import
socket
import
argparse
import
sys
import
textwrap
import
paramiko
sftp_client
=
None
# paramiko.util.log_to_file('/tmp/ftpserver.log')
paramiko
.
util
.
log_to_file
(
'/tmp/ftpserver.log'
,
'DEBUG'
)
class
StubServer
(
ServerInterface
):
class
StubServer
(
paramiko
.
ServerInterface
):
def
check_auth_password
(
self
,
username
,
password
):
# all are allowed
return
AUTH_SUCCESSFUL
return
paramiko
.
AUTH_SUCCESSFUL
def
check_auth_publickey
(
self
,
username
,
key
):
# all are allowed
return
AUTH_SUCCESSFUL
return
paramiko
.
AUTH_SUCCESSFUL
def
check_channel_request
(
self
,
kind
,
chanid
):
return
OPEN_SUCCEEDED
return
paramiko
.
OPEN_SUCCEEDED
def
get_allowed_auths
(
self
,
username
):
"""List availble auth mechanisms."""
return
"password,publickey"
class
StubSFTPHandle
(
SFTPHandle
):
def
stat
(
self
):
print
(
"Call handle stat"
)
try
:
return
SFTPAttributes
.
from_stat
(
os
.
fstat
(
self
.
readfile
.
fileno
()))
except
OSError
as
e
:
return
SFTPServer
.
convert_errno
(
e
.
errno
)
def
chattr
(
self
,
attr
):
# python doesn't have equivalents to fchown or fchmod, so we have to
# use the stored filename
try
:
SFTPServer
.
set_file_attr
(
self
.
filename
,
attr
)
return
SFTP_OK
except
OSError
as
e
:
return
SFTPServer
.
convert_errno
(
e
.
errno
)
class
StubSFTPServer
(
SFTPServerInterface
):
# assume current folder is a fine root
# (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess)
ROOT
=
os
.
getcwd
()
class
StubSFTPServer
(
paramiko
.
SFTPServerInterface
):
hosts
=
None
def
__init__
(
self
,
*
args
,
**
kwargs
):
...
...
@@ -105,25 +78,18 @@ class StubSFTPServer(SFTPServerInterface):
def
get_sftp_rpath
(
self
,
path
):
host
,
rpath
=
self
.
parse_path
(
path
)
sftp
=
None
if
host
:
sftp
=
self
.
get_host_sftp
(
host
)
sftp
=
self
.
get_host_sftp
(
host
)
if
host
else
None
return
sftp
,
rpath
@staticmethod
def
stat_host_dir
():
tmp
=
tempfile
.
TemporaryDirectory
()
d
=
tmp
.
name
attr
=
SFTPAttributes
.
from_stat
(
os
.
stat
(
d
)
)
attr
=
paramiko
.
SFTPAttributes
.
from_stat
(
os
.
stat
(
tmp
.
name
))
tmp
.
cleanup
()
return
attr
def
list_folder
(
self
,
path
):
print
(
"Call list folder: {}"
.
format
(
path
))
output
=
[]
if
path
==
"/"
:
for
filename
in
self
.
hosts
:
attr
=
self
.
stat_host_dir
()
...
...
@@ -140,7 +106,6 @@ class StubSFTPServer(SFTPServerInterface):
def
stat
(
self
,
path
):
host
,
rpath
=
self
.
parse_path
(
path
)
if
host
and
not
rpath
:
attr
=
self
.
stat_host_dir
()
attr
.
filename
=
host
...
...
@@ -150,7 +115,6 @@ class StubSFTPServer(SFTPServerInterface):
return
sftp
.
stat
(
rpath
)
def
lstat
(
self
,
path
):
print
(
"Call lstat: {}"
.
format
(
path
))
host
,
rpath
=
self
.
parse_path
(
path
)
if
host
==
''
:
...
...
@@ -163,15 +127,9 @@ class StubSFTPServer(SFTPServerInterface):
return
attr
def
open
(
self
,
path
,
flags
,
attr
):
print
(
"Call {}: {}**{}**{}"
.
format
(
"Open"
,
path
,
flags
,
attr
))
host
,
rpath
=
self
.
parse_path
(
path
)
binary_flag
=
getattr
(
os
,
'O_BINARY'
,
0
)
flags
|=
binary_flag
if
(
flags
&
os
.
O_CREAT
)
and
(
attr
is
not
None
):
attr
.
_flags
&=
~
attr
.
FLAG_PERMISSIONS
SFTPServer
.
set_file_attr
(
path
,
attr
)
if
flags
&
os
.
O_WRONLY
:
if
flags
&
os
.
O_APPEND
:
mode
=
'ab'
...
...
@@ -185,46 +143,59 @@ class StubSFTPServer(SFTPServerInterface):
else
:
mode
=
'rb'
if
host
!=
""
:
sftp
=
self
.
get_host_sftp
(
host
)
f
=
sftp
.
open
(
rpath
,
mode
,
bufsize
=
1024
)
obj
=
Stub
SFTPHandle
(
flags
)
obj
.
filename
=
path
sftp
,
rpath
=
self
.
get_sftp_rpath
(
path
)
if
sftp
is
not
None
:
f
=
sftp
.
open
(
rpath
,
mode
,
bufsize
=
4096
)
obj
=
paramiko
.
SFTPHandle
(
flags
)
obj
.
filename
=
r
path
obj
.
readfile
=
f
obj
.
writefile
=
f
return
obj
def
remove
(
self
,
path
):
print
(
"Call {}"
.
format
(
"Remove"
))
sftp
,
rpath
=
self
.
get_sftp_rpath
(
path
)
if
sftp
is
not
None
:
sftp
.
remove
(
rpath
)
return
SFTP_OK
try
:
sftp
.
remove
(
rpath
)
except
OSError
as
e
:
return
paramiko
.
SFTPServer
.
convert_errno
(
e
.
errno
)
return
paramiko
.
SFTP_OK
else
:
return
paramiko
.
SFTP_FAILURE
def
rename
(
self
,
src
,
dest
):
print
(
"Call {}"
.
format
(
"Rename"
))
host1
,
rsrc
=
self
.
parse_path
(
src
)
host2
,
rdest
=
self
.
parse_path
(
dest
)
if
host1
==
host2
and
host1
:
sftp
=
self
.
get_host_sftp
(
host2
)
sftp
.
rename
(
rsrc
,
rdest
)
return
SFTP_OK
try
:
sftp
.
rename
(
rsrc
,
rdest
)
except
OSError
as
e
:
return
paramiko
.
SFTPServer
.
convert_errno
(
e
.
errno
)
return
paramiko
.
SFTP_OK
return
paramiko
.
SFTP_FAILURE
def
mkdir
(
self
,
path
,
attr
):
print
(
"Call {}"
.
format
(
"Mkdir"
))
sftp
,
rpath
=
self
.
get_sftp_rpath
(
path
)
if
sftp
is
not
None
:
sftp
.
mkdir
(
rpath
)
return
SFTP_OK
try
:
sftp
.
mkdir
(
rpath
)
except
OSError
as
e
:
return
paramiko
.
SFTPServer
.
convert_errno
(
e
.
errno
)
return
paramiko
.
SFTP_OK
return
paramiko
.
SFTP_FAILURE
def
rmdir
(
self
,
path
):
print
(
"Call {}"
.
format
(
"Rmdir"
))
sftp
,
rpath
=
self
.
get_sftp_rpath
(
path
)
if
sftp
is
not
None
:
sftp
.
rmdir
(
rpath
)
return
SFTP_OK
try
:
sftp
.
rmdir
(
rpath
)
except
OSError
as
e
:
return
paramiko
.
SFTPServer
.
convert_errno
(
e
.
errno
)
return
paramiko
.
SFTP_OK
def
chattr
(
self
,
path
,
attr
):
print
(
"Call {}"
.
format
(
"Chattr"
))
...
...
@@ -238,7 +209,7 @@ class StubSFTPServer(SFTPServerInterface):
sftp
.
utime
(
rpath
,
(
attr
.
st_atime
,
attr
.
st_mtime
))
if
attr
.
_flags
&
attr
.
FLAG_SIZE
:
sftp
.
truncate
(
rpath
,
attr
.
st_size
)
return
SFTP_OK
return
paramiko
.
SFTP_OK
HOST
,
PORT
=
'localhost'
,
3373
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment