1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import errno
import select
import os
import tempfile
import sys
import pytest
from .._core.tests.tutil import gc_collect_harder, skip_if_fbsd_pipes_broken
from .. import _core, move_on_after
from ..testing import wait_all_tasks_blocked, check_one_way_stream
posix = os.name == "posix"
pytestmark = pytest.mark.skipif(not posix, reason="posix only")
if posix:
from .._unix_pipes import FdStream
else:
with pytest.raises(ImportError):
from .._unix_pipes import FdStream
# Have to use quoted types so import doesn't crash on windows
async def make_pipe() -> "Tuple[FdStream, FdStream]":
"""Makes a new pair of pipes."""
(r, w) = os.pipe()
return FdStream(w), FdStream(r)
async def make_clogged_pipe():
s, r = await make_pipe()
try:
while True:
# We want to totally fill up the pipe buffer.
# This requires working around a weird feature that POSIX pipes
# have.
# If you do a write of <= PIPE_BUF bytes, then it's guaranteed
# to either complete entirely, or not at all. So if we tried to
# write PIPE_BUF bytes, and the buffer's free space is only
# PIPE_BUF/2, then the write will raise BlockingIOError... even
# though a smaller write could still succeed! To avoid this,
# make sure to write >PIPE_BUF bytes each time, which disables
# the special behavior.
# For details, search for PIPE_BUF here:
# http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
# for the getattr:
# https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3
buf_size = getattr(select, "PIPE_BUF", 8192)
os.write(s.fileno(), b"x" * buf_size * 2)
except BlockingIOError:
pass
return s, r
async def test_send_pipe():
r, w = os.pipe()
async with FdStream(w) as send:
assert send.fileno() == w
await send.send_all(b"123")
assert (os.read(r, 8)) == b"123"
os.close(r)
async def test_receive_pipe():
r, w = os.pipe()
async with FdStream(r) as recv:
assert (recv.fileno()) == r
os.write(w, b"123")
assert (await recv.receive_some(8)) == b"123"
os.close(w)
async def test_pipes_combined():
write, read = await make_pipe()
count = 2 ** 20
async def sender():
big = bytearray(count)
await write.send_all(big)
async def reader():
await wait_all_tasks_blocked()
received = 0
while received < count:
received += len(await read.receive_some(4096))
assert received == count
async with _core.open_nursery() as n:
n.start_soon(sender)
n.start_soon(reader)
await read.aclose()
await write.aclose()
async def test_pipe_errors():
with pytest.raises(TypeError):
FdStream(None)
r, w = os.pipe()
os.close(w)
async with FdStream(r) as s:
with pytest.raises(ValueError):
await s.receive_some(0)
async def test_del():
w, r = await make_pipe()
f1, f2 = w.fileno(), r.fileno()
del w, r
gc_collect_harder()
with pytest.raises(OSError) as excinfo:
os.close(f1)
assert excinfo.value.errno == errno.EBADF
with pytest.raises(OSError) as excinfo:
os.close(f2)
assert excinfo.value.errno == errno.EBADF
async def test_async_with():
w, r = await make_pipe()
async with w, r:
pass
assert w.fileno() == -1
assert r.fileno() == -1
with pytest.raises(OSError) as excinfo:
os.close(w.fileno())
assert excinfo.value.errno == errno.EBADF
with pytest.raises(OSError) as excinfo:
os.close(r.fileno())
assert excinfo.value.errno == errno.EBADF
async def test_misdirected_aclose_regression():
# https://github.com/python-trio/trio/issues/661#issuecomment-456582356
w, r = await make_pipe()
old_r_fd = r.fileno()
# Close the original objects
await w.aclose()
await r.aclose()
# Do a little dance to get a new pipe whose receive handle matches the old
# receive handle.
r2_fd, w2_fd = os.pipe()
if r2_fd != old_r_fd: # pragma: no cover
os.dup2(r2_fd, old_r_fd)
os.close(r2_fd)
async with FdStream(old_r_fd) as r2:
assert r2.fileno() == old_r_fd
# And now set up a background task that's working on the new receive
# handle
async def expect_eof():
assert await r2.receive_some(10) == b""
async with _core.open_nursery() as nursery:
nursery.start_soon(expect_eof)
await wait_all_tasks_blocked()
# Here's the key test: does calling aclose() again on the *old*
# handle, cause the task blocked on the *new* handle to raise
# ClosedResourceError?
await r.aclose()
await wait_all_tasks_blocked()
# Guess we survived! Close the new write handle so that the task
# gets an EOF and can exit cleanly.
os.close(w2_fd)
async def test_close_at_bad_time_for_receive_some(monkeypatch):
# We used to have race conditions where if one task was using the pipe,
# and another closed it at *just* the wrong moment, it would give an
# unexpected error instead of ClosedResourceError:
# https://github.com/python-trio/trio/issues/661
#
# This tests what happens if the pipe gets closed in the moment *between*
# when receive_some wakes up, and when it tries to call os.read
async def expect_closedresourceerror():
with pytest.raises(_core.ClosedResourceError):
await r.receive_some(10)
orig_wait_readable = _core._run.TheIOManager.wait_readable
async def patched_wait_readable(*args, **kwargs):
await orig_wait_readable(*args, **kwargs)
await r.aclose()
monkeypatch.setattr(_core._run.TheIOManager, "wait_readable", patched_wait_readable)
s, r = await make_pipe()
async with s, r:
async with _core.open_nursery() as nursery:
nursery.start_soon(expect_closedresourceerror)
await wait_all_tasks_blocked()
# Trigger everything by waking up the receiver
await s.send_all(b"x")
async def test_close_at_bad_time_for_send_all(monkeypatch):
# We used to have race conditions where if one task was using the pipe,
# and another closed it at *just* the wrong moment, it would give an
# unexpected error instead of ClosedResourceError:
# https://github.com/python-trio/trio/issues/661
#
# This tests what happens if the pipe gets closed in the moment *between*
# when send_all wakes up, and when it tries to call os.write
async def expect_closedresourceerror():
with pytest.raises(_core.ClosedResourceError):
await s.send_all(b"x" * 100)
orig_wait_writable = _core._run.TheIOManager.wait_writable
async def patched_wait_writable(*args, **kwargs):
await orig_wait_writable(*args, **kwargs)
await s.aclose()
monkeypatch.setattr(_core._run.TheIOManager, "wait_writable", patched_wait_writable)
s, r = await make_clogged_pipe()
async with s, r:
async with _core.open_nursery() as nursery:
nursery.start_soon(expect_closedresourceerror)
await wait_all_tasks_blocked()
# Trigger everything by waking up the sender
await r.receive_some(10000)
# On FreeBSD, directories are readable, and we haven't found any other trick
# for making an unreadable fd, so there's no way to run this test. Fortunately
# the logic this is testing doesn't depend on the platform, so testing on
# other platforms is probably good enough.
@pytest.mark.skipif(
sys.platform.startswith("freebsd"),
reason="no way to make read() return a bizarro error on FreeBSD",
)
async def test_bizarro_OSError_from_receive():
# Make sure that if the read syscall returns some bizarro error, then we
# get a BrokenResourceError. This is incredibly unlikely; there's almost
# no way to trigger a failure here intentionally (except for EBADF, but we
# exploit that to detect file closure, so it takes a different path). So
# we set up a strange scenario where the pipe fd somehow transmutes into a
# directory fd, causing os.read to raise IsADirectoryError (yes, that's a
# real built-in exception type).
s, r = await make_pipe()
async with s, r:
dir_fd = os.open("/", os.O_DIRECTORY, 0)
try:
os.dup2(dir_fd, r.fileno())
with pytest.raises(_core.BrokenResourceError):
await r.receive_some(10)
finally:
os.close(dir_fd)
@skip_if_fbsd_pipes_broken
async def test_pipe_fully():
await check_one_way_stream(make_pipe, make_clogged_pipe)