From c5f32bf4313b090fdde27a3770484c452417d723 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 16 Aug 2019 10:21:51 +0800 Subject: [PATCH] PR feedback for `MplexStream.read` --- libp2p/stream_muxer/mplex/mplex.py | 4 +++- libp2p/stream_muxer/mplex/mplex_stream.py | 9 +++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 00df44c..f154a16 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -75,7 +75,8 @@ class Mplex(IMuxedConn): async def read_buffer(self, stream_id: int) -> bytes: """ - Read a message from stream_id's buffer, check raw connection for new messages + Read a message from stream_id's buffer, check raw connection for new messages. + `StreamNotFound` is raised when stream `stream_id` is not found in `Mplex`. :param stream_id: stream id of stream to read from :return: message read """ @@ -86,6 +87,7 @@ class Mplex(IMuxedConn): async def read_buffer_nonblocking(self, stream_id: int) -> Optional[bytes]: """ Read a message from `stream_id`'s buffer, non-blockingly. + `StreamNotFound` is raised when stream `stream_id` is not found in `Mplex`. """ if stream_id not in self.buffers: raise StreamNotFound(f"stream {stream_id} is not found") diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 1d1d7a6..2ec23f1 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -46,14 +46,15 @@ class MplexStream(IMuxedStream): :param n: number of bytes to read :return: bytes actually read """ + # TODO: Handle `StreamNotFound` raised in `self.mplex_conn.read_buffer`. + # TODO: Add exceptions and handle/raise them in this class. if n < 0 and n != -1: - raise ValueError(f"the number of bytes to read ``n`` must be positive or -1 to indicate read until EOF") + raise ValueError( + f"the number of bytes to read `n` must be positive or -1 to indicate read until EOF" + ) # If the buffer is empty at first, blocking wait for data. if len(self._buf) == 0: self._buf.extend(await self.mplex_conn.read_buffer(self.stream_id)) - # Sanity check: `self._buf` should never be empty here. - if self._buf is None or len(self._buf) == 0: - raise Exception("`self._buf` should never be empty here") # FIXME: If `n == -1`, we should blocking read until EOF, instead of returning when # no message is available.