diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 59252c5..d946850 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,6 +1,9 @@ +from libp2p.exceptions import ParseError from libp2p.io.abc import ReadWriteCloser +from libp2p.io.exceptions import IOException from libp2p.utils import encode_delim, read_delim +from .exceptions import MultiselectCommunicatorError from .multiselect_communicator_interface import IMultiselectCommunicator @@ -15,5 +18,11 @@ class MultiselectCommunicator(IMultiselectCommunicator): await self.read_writer.write(msg_bytes) async def read(self) -> str: - data = await read_delim(self.read_writer) + try: + data = await read_delim(self.read_writer) + # `IOException` includes `IncompleteReadError` and `StreamError` + except (ParseError, IOException, ValueError): + raise MultiselectCommunicatorError( + "fail to read from multiselect communicator" + ) return data.decode() diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index b162b89..ef069d8 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -16,12 +16,14 @@ from typing import ( import base58 from lru import LRU -from libp2p.exceptions import ValidationError +from libp2p.exceptions import ParseError, ValidationError from libp2p.host.host_interface import IHost +from libp2p.io.exceptions import IncompleteReadError from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes +from libp2p.network.stream.exceptions import StreamEOF, StreamReset from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -154,7 +156,13 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - incoming: bytes = await read_varint_prefixed_bytes(stream) + try: + incoming: bytes = await read_varint_prefixed_bytes(stream) + except (ParseError, IncompleteReadError) as error: + logger.debug( + "read corrupted data from peer %s, error=%s", peer_id, error + ) + continue rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: @@ -228,7 +236,13 @@ class Pubsub: on one of the supported pubsub protocols. :param stream: newly created stream """ - await self.continuously_read_stream(stream) + try: + await self.continuously_read_stream(stream) + except (StreamEOF, StreamReset) as error: + logger.debug("fail to read from stream, error=%s", error) + stream.reset() + # TODO: what to do when the stream is terminated? + # disconnect the peer? async def _handle_new_peer(self, peer_id: ID) -> None: stream: INetStream = await self.host.new_stream(peer_id, self.protocols) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index c75000d..5d09b75 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -2,6 +2,8 @@ import asyncio from typing import Any # noqa: F401 from typing import Dict, List, Optional, Tuple +from libp2p.exceptions import ParseError +from libp2p.io.exceptions import IncompleteReadError from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn @@ -248,13 +250,15 @@ class Mplex(IMuxedConn): # FIXME: No timeout is used in Go implementation. # Timeout is set to a relatively small value to alleviate wait time to exit # loop in handle_incoming - header = await decode_uvarint_from_stream(self.secured_conn) - # TODO: Handle the case of EOF and other exceptions? + try: + header = await decode_uvarint_from_stream(self.secured_conn) + except ParseError: + return None, None, None try: message = await asyncio.wait_for( read_varint_prefixed_bytes(self.secured_conn), timeout=5 ) - except asyncio.TimeoutError: + except (ParseError, IncompleteReadError, asyncio.TimeoutError): # TODO: Investigate what we should do if time is out. return None, None, None diff --git a/libp2p/utils.py b/libp2p/utils.py index c69f61b..8362a5a 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -65,10 +65,6 @@ def encode_varint_prefixed(msg_bytes: bytes) -> bytes: async def read_varint_prefixed_bytes(reader: Reader) -> bytes: len_msg = await decode_uvarint_from_stream(reader) data = await read_exactly(reader, len_msg) - if len(data) != len_msg: - raise ValueError( - f"failed to read enough bytes: len_msg={len_msg}, data={data!r}" - ) return data