Mplex: catch exceptions from channel.send

This commit is contained in:
mhchia 2020-02-05 17:05:30 +08:00
parent 12cb0d9ac4
commit 996b5cf15d
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
3 changed files with 8 additions and 9 deletions

View File

@ -111,11 +111,11 @@ class Mplex(IMuxedConn):
async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream:
# Use an unbounded buffer, to avoid `handle_incoming` being blocked when doing
# `send_channel.send`.
channels = trio.open_memory_channel[bytes](math.inf)
stream = MplexStream(name, stream_id, self, channels[1])
send_channel, receive_channel = trio.open_memory_channel[bytes](math.inf)
stream = MplexStream(name, stream_id, self, receive_channel)
async with self.streams_lock:
self.streams[stream_id] = stream
self.streams_msg_channels[stream_id] = channels[0]
self.streams_msg_channels[stream_id] = send_channel
return stream
async def open_stream(self) -> IMuxedStream:
@ -269,7 +269,10 @@ class Mplex(IMuxedConn):
if stream.event_remote_closed.is_set():
# TODO: Warn "Received data from remote after stream was closed by them. (len = %d)" # noqa: E501
return
await send_channel.send(message)
try:
await send_channel.send(message)
except (trio.BrokenResourceError, trio.ClosedResourceError):
raise MplexUnavailable
async def _handle_close(self, stream_id: StreamID) -> None:
async with self.streams_lock:
@ -325,7 +328,7 @@ class Mplex(IMuxedConn):
stream.event_local_closed.set()
send_channel = self.streams_msg_channels[stream_id]
await send_channel.aclose()
self.streams = None
self.event_closed.set()
# FIXME: It's enough to just close one side.
await self.new_stream_send_channel.aclose()
await self.new_stream_receive_channel.aclose()

View File

@ -31,12 +31,10 @@ async def test_mplex_conn(mplex_conn_pair):
assert stream_0.event_remote_closed.is_set()
assert stream_0.event_reset.is_set()
assert stream_0.event_local_closed.is_set()
assert conn_0.streams is None
# Test: All streams on the other side are also closed.
assert stream_1.event_remote_closed.is_set()
assert stream_1.event_reset.is_set()
assert stream_1.event_local_closed.is_set()
assert conn_1.streams is None
# Test: No effect to close more than once between two side.
await conn_0.close()

View File

@ -69,9 +69,7 @@ async def test_mplex_stream_read_after_remote_closed(mplex_stream_pair):
await stream_0.close()
assert stream_0.event_local_closed.is_set()
await trio.sleep(0.01)
# await trio.sleep(100000)
await wait_all_tasks_blocked()
# await trio.sleep_forever()
assert stream_1.event_remote_closed.is_set()
assert (await stream_1.read(MAX_READ_LEN)) == DATA
with pytest.raises(MplexStreamEOF):