diff --git a/libp2p/io/abc.py b/libp2p/io/abc.py index eea7b72..8f2c758 100644 --- a/libp2p/io/abc.py +++ b/libp2p/io/abc.py @@ -8,7 +8,7 @@ class Closer(ABC): class Reader(ABC): @abstractmethod - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: ... diff --git a/libp2p/io/msgio.py b/libp2p/io/msgio.py index f60b0ff..32c0d09 100644 --- a/libp2p/io/msgio.py +++ b/libp2p/io/msgio.py @@ -54,7 +54,7 @@ class MsgIOReader(ReadCloser): self.read_closer = read_closer self.next_length = None - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: return await self.read_msg() async def read_msg(self) -> bytes: diff --git a/libp2p/io/trio.py b/libp2p/io/trio.py index 840c3bc..b8571c8 100644 --- a/libp2p/io/trio.py +++ b/libp2p/io/trio.py @@ -26,22 +26,13 @@ class TrioTCPStream(ReadWriteCloser): await self.stream.send_all(data) except (trio.ClosedResourceError, trio.BrokenResourceError) as error: raise IOException from error - except trio.BusyResourceError as error: - # This should never happen, since we already access streams with read/write locks. - raise Exception( - "this should never happen " - "since we already access streams with read/write locks." - ) from error - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: async with self.read_lock: - if n == 0: - # Checkpoint - await trio.hazmat.checkpoint() + if n is not None and n == 0: return b"" - max_bytes = n if n != -1 else None try: - return await self.stream.receive_some(max_bytes) + return await self.stream.receive_some(n) except (trio.ClosedResourceError, trio.BrokenResourceError) as error: raise IOException from error except trio.BusyResourceError as error: diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 25b1049..69ef56a 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -20,7 +20,7 @@ class RawConnection(IRawConnection): except IOException as error: raise RawConnError(error) - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: """ Read up to ``n`` bytes from the underlying stream. This call is delegated directly to the underlying ``self.reader``. diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 7ab609d..b2bac06 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -37,7 +37,7 @@ class NetStream(INetStream): """ self.protocol_id = protocol_id - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: """ reads from stream. diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 4199c61..abce868 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -39,7 +39,7 @@ class InsecureSession(BaseSession): await self.conn.write(data) return len(data) - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: return await self.conn.read(n) async def close(self) -> None: diff --git a/libp2p/security/secio/transport.py b/libp2p/security/secio/transport.py index 08ab0e2..2335921 100644 --- a/libp2p/security/secio/transport.py +++ b/libp2p/security/secio/transport.py @@ -94,7 +94,7 @@ class SecureSession(BaseSession): data = self.buf.getbuffer()[self.low_watermark : self.high_watermark] - if n < 0: + if n is None: n = len(data) result = data[:n].tobytes() self.low_watermark += len(result) @@ -111,7 +111,7 @@ class SecureSession(BaseSession): self.low_watermark = 0 self.high_watermark = len(msg) - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: if n == 0: return bytes() diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index ae6f7ea..933de20 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -81,22 +81,23 @@ class MplexStream(IMuxedStream): break return buf - async def read(self, n: int = -1) -> bytes: + async def read(self, n: int = None) -> bytes: """ Read up to n bytes. Read possibly returns fewer than `n` bytes, if - there are not enough bytes in the Mplex buffer. If `n == -1`, read + there are not enough bytes in the Mplex buffer. If `n is None`, read until EOF. :param n: number of bytes to read :return: bytes actually read """ - if n < 0 and n != -1: + if n is not None and n < 0: raise ValueError( - f"the number of bytes to read `n` must be positive or -1 to indicate read until EOF" + f"the number of bytes to read `n` must be non-negative or " + "`None` to indicate read until EOF" ) if self.event_reset.is_set(): raise MplexStreamReset - if n == -1: + if n is None: return await self._read_until_eof() if len(self._buf) == 0: data: bytes diff --git a/tests_interop/conftest.py b/tests_interop/conftest.py index bad0054..067db83 100644 --- a/tests_interop/conftest.py +++ b/tests_interop/conftest.py @@ -84,11 +84,8 @@ class DaemonStream(ReadWriteCloser): async def close(self) -> None: await self.stream.close() - async def read(self, n: int = -1) -> bytes: - if n == -1: - return await self.stream.receive_some() - else: - return await self.stream.receive_some(n) + async def read(self, n: int = None) -> bytes: + return await self.stream.receive_some(n) async def write(self, data: bytes) -> None: return await self.stream.send_all(data)