From 68573e94d32b4b28bd6678e3865831f4ce4bb2e1 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 15 Sep 2019 16:34:16 +0800 Subject: [PATCH 01/20] Have `StreamError` inherit from `IOException` --- libp2p/network/stream/exceptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/network/stream/exceptions.py b/libp2p/network/stream/exceptions.py index 58f3ddf..7af28ec 100644 --- a/libp2p/network/stream/exceptions.py +++ b/libp2p/network/stream/exceptions.py @@ -1,7 +1,7 @@ -from libp2p.exceptions import BaseLibp2pError +from libp2p.io.exceptions import IOException -class StreamError(BaseLibp2pError): +class StreamError(IOException): pass From 905a473ac37fa5a73eaaedb60aa4aefbcffc7ea4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 15 Sep 2019 16:37:37 +0800 Subject: [PATCH 02/20] Add `MultiselectCommunicatorError` --- libp2p/protocol_muxer/exceptions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libp2p/protocol_muxer/exceptions.py b/libp2p/protocol_muxer/exceptions.py index cf47aca..a34e318 100644 --- a/libp2p/protocol_muxer/exceptions.py +++ b/libp2p/protocol_muxer/exceptions.py @@ -1,6 +1,10 @@ from libp2p.exceptions import BaseLibp2pError +class MultiselectCommunicatorError(BaseLibp2pError): + """Raised when an error occurs during read/write via communicator""" + + class MultiselectError(BaseLibp2pError): """Raised when an error occurs in multiselect process""" From 879f193aa144ed870053b7f4b8aae41a2c0cb707 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 15 Sep 2019 16:58:08 +0800 Subject: [PATCH 03/20] Handle errors from - `read_delim` - `read_varint_prefixed_bytes` - `decode_uvarint_from_stream` --- .../multiselect_communicator.py | 11 +++++++++- libp2p/pubsub/pubsub.py | 20 ++++++++++++++++--- libp2p/stream_muxer/mplex/mplex.py | 10 +++++++--- libp2p/utils.py | 4 ---- 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 59252c5..d946850 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,6 +1,9 @@ +from libp2p.exceptions import ParseError from libp2p.io.abc import ReadWriteCloser +from libp2p.io.exceptions import IOException from libp2p.utils import encode_delim, read_delim +from .exceptions import MultiselectCommunicatorError from .multiselect_communicator_interface import IMultiselectCommunicator @@ -15,5 +18,11 @@ class MultiselectCommunicator(IMultiselectCommunicator): await self.read_writer.write(msg_bytes) async def read(self) -> str: - data = await read_delim(self.read_writer) + try: + data = await read_delim(self.read_writer) + # `IOException` includes `IncompleteReadError` and `StreamError` + except (ParseError, IOException, ValueError): + raise MultiselectCommunicatorError( + "fail to read from multiselect communicator" + ) return data.decode() diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index b162b89..ef069d8 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -16,12 +16,14 @@ from typing import ( import base58 from lru import LRU -from libp2p.exceptions import ValidationError +from libp2p.exceptions import ParseError, ValidationError from libp2p.host.host_interface import IHost +from libp2p.io.exceptions import IncompleteReadError from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes +from libp2p.network.stream.exceptions import StreamEOF, StreamReset from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -154,7 +156,13 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - incoming: bytes = await read_varint_prefixed_bytes(stream) + try: + incoming: bytes = await read_varint_prefixed_bytes(stream) + except (ParseError, IncompleteReadError) as error: + logger.debug( + "read corrupted data from peer %s, error=%s", peer_id, error + ) + continue rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: @@ -228,7 +236,13 @@ class Pubsub: on one of the supported pubsub protocols. :param stream: newly created stream """ - await self.continuously_read_stream(stream) + try: + await self.continuously_read_stream(stream) + except (StreamEOF, StreamReset) as error: + logger.debug("fail to read from stream, error=%s", error) + stream.reset() + # TODO: what to do when the stream is terminated? + # disconnect the peer? async def _handle_new_peer(self, peer_id: ID) -> None: stream: INetStream = await self.host.new_stream(peer_id, self.protocols) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index c75000d..5d09b75 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -2,6 +2,8 @@ import asyncio from typing import Any # noqa: F401 from typing import Dict, List, Optional, Tuple +from libp2p.exceptions import ParseError +from libp2p.io.exceptions import IncompleteReadError from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn @@ -248,13 +250,15 @@ class Mplex(IMuxedConn): # FIXME: No timeout is used in Go implementation. # Timeout is set to a relatively small value to alleviate wait time to exit # loop in handle_incoming - header = await decode_uvarint_from_stream(self.secured_conn) - # TODO: Handle the case of EOF and other exceptions? + try: + header = await decode_uvarint_from_stream(self.secured_conn) + except ParseError: + return None, None, None try: message = await asyncio.wait_for( read_varint_prefixed_bytes(self.secured_conn), timeout=5 ) - except asyncio.TimeoutError: + except (ParseError, IncompleteReadError, asyncio.TimeoutError): # TODO: Investigate what we should do if time is out. return None, None, None diff --git a/libp2p/utils.py b/libp2p/utils.py index c69f61b..8362a5a 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -65,10 +65,6 @@ def encode_varint_prefixed(msg_bytes: bytes) -> bytes: async def read_varint_prefixed_bytes(reader: Reader) -> bytes: len_msg = await decode_uvarint_from_stream(reader) data = await read_exactly(reader, len_msg) - if len(data) != len_msg: - raise ValueError( - f"failed to read enough bytes: len_msg={len_msg}, data={data!r}" - ) return data From eaa74c4e26ac4c666928e2882a587ba1bcd229c1 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 15 Sep 2019 16:58:22 +0800 Subject: [PATCH 04/20] Handle `MultiselectCommunicatorError` --- libp2p/protocol_muxer/multiselect.py | 12 +++++++++--- libp2p/protocol_muxer/multiselect_client.py | 12 +++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 0c3dc72..c76e07f 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -2,7 +2,7 @@ from typing import Dict, Tuple from libp2p.typing import StreamHandlerFn, TProtocol -from .exceptions import MultiselectError +from .exceptions import MultiselectCommunicatorError, MultiselectError from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_muxer_interface import IMultiselectMuxer @@ -46,7 +46,10 @@ class Multiselect(IMultiselectMuxer): # Read and respond to commands until a valid protocol ID is sent while True: # Read message - command = await communicator.read() + try: + command = await communicator.read() + except MultiselectCommunicatorError as error: + raise MultiselectError(str(error)) # Command is ls or a protocol if command == "ls": @@ -76,7 +79,10 @@ class Multiselect(IMultiselectMuxer): await communicator.write(MULTISELECT_PROTOCOL_ID) # Read in the protocol ID from other party - handshake_contents = await communicator.read() + try: + handshake_contents = await communicator.read() + except MultiselectCommunicatorError as error: + raise MultiselectError(str(error)) # Confirm that the protocols are the same if not validate_handshake(handshake_contents): diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index fcd55d0..51af025 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -2,7 +2,7 @@ from typing import Sequence from libp2p.typing import TProtocol -from .exceptions import MultiselectClientError +from .exceptions import MultiselectClientError, MultiselectCommunicatorError from .multiselect_client_interface import IMultiselectClient from .multiselect_communicator_interface import IMultiselectCommunicator @@ -30,7 +30,10 @@ class MultiselectClient(IMultiselectClient): await communicator.write(MULTISELECT_PROTOCOL_ID) # Read in the protocol ID from other party - handshake_contents = await communicator.read() + try: + handshake_contents = await communicator.read() + except MultiselectCommunicatorError as error: + raise MultiselectClientError(str(error)) # Confirm that the protocols are the same if not validate_handshake(handshake_contents): @@ -79,7 +82,10 @@ class MultiselectClient(IMultiselectClient): await communicator.write(protocol) # Get what counterparty says in response - response = await communicator.read() + try: + response = await communicator.read() + except MultiselectCommunicatorError as error: + raise MultiselectClientError(str(error)) # Return protocol if response is equal to protocol or raise error if response == protocol: From 76af835af862a31b309b140d83e1c8705c3960b0 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 15 Sep 2019 17:35:01 +0800 Subject: [PATCH 05/20] Handle `MultiselectError` in `stream_muxer.accept_stream` --- libp2p/stream_muxer/mplex/mplex.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 5d09b75..d85a7c2 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -6,6 +6,7 @@ from libp2p.exceptions import ParseError from libp2p.io.exceptions import IncompleteReadError from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID +from libp2p.protocol_muxer.exceptions import MultiselectError from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.typing import TProtocol @@ -102,12 +103,6 @@ class Mplex(IMuxedConn): self.next_channel_id += 1 return next_id - async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream: - async with self.streams_lock: - stream = MplexStream(name, stream_id, self) - self.streams[stream_id] = stream - return stream - async def open_stream(self) -> IMuxedStream: """ creates a new muxed_stream @@ -117,17 +112,28 @@ class Mplex(IMuxedConn): stream_id = StreamID(channel_id=channel_id, is_initiator=True) # Default stream name is the `channel_id` name = str(channel_id) - stream = await self._initialize_stream(stream_id, name) + async with self.streams_lock: + stream = MplexStream(name, stream_id, self) await self.send_message(HeaderTags.NewStream, name.encode(), stream_id) + # TODO: is there a way to know if the peer accepted the stream? + # then we can safely register the stream + self.streams[stream_id] = stream return stream async def accept_stream(self, stream_id: StreamID, name: str) -> None: """ accepts a muxed stream opened by the other end """ - stream = await self._initialize_stream(stream_id, name) + async with self.streams_lock: + stream = MplexStream(name, stream_id, self) # Perform protocol negotiation for the stream. - self._tasks.append(asyncio.ensure_future(self.generic_protocol_handler(stream))) + try: + await self.generic_protocol_handler(stream) + except MultiselectError: + # TODO: what to do when stream protocol negotiation fail? + return + + self.streams[stream_id] = stream async def send_message( self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID @@ -180,7 +186,11 @@ class Mplex(IMuxedConn): # `NewStream` for the same id is received twice... # TODO: Shutdown pass - await self.accept_stream(stream_id, message.decode()) + self._tasks.append( + asyncio.ensure_future( + self.accept_stream(stream_id, message.decode()) + ) + ) elif flag in ( HeaderTags.MessageInitiator.value, HeaderTags.MessageReceiver.value, From cb632fa509fd4ff14d344b650495e5f3e85ed6d2 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 18:35:48 +0800 Subject: [PATCH 06/20] Add `RawConnError` --- libp2p/network/connection/exceptions.py | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 libp2p/network/connection/exceptions.py diff --git a/libp2p/network/connection/exceptions.py b/libp2p/network/connection/exceptions.py new file mode 100644 index 0000000..ecbf3fa --- /dev/null +++ b/libp2p/network/connection/exceptions.py @@ -0,0 +1,5 @@ +from libp2p.io.exceptions import IOException + + +class RawConnError(IOException): + pass From 4cd5b77f1057c7ee5e0ecd8bceec73f539cc74ba Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 18:37:00 +0800 Subject: [PATCH 07/20] Raise `RawConnError` in `RawConnection` --- libp2p/network/connection/raw_connection.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index fe09c6f..2bdb12a 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -1,5 +1,6 @@ import asyncio +from .exceptions import RawConnError from .raw_connection_interface import IRawConnection @@ -23,19 +24,28 @@ class RawConnection(IRawConnection): self._drain_lock = asyncio.Lock() async def write(self, data: bytes) -> None: - self.writer.write(data) + try: + self.writer.write(data) + except ConnectionResetError: + raise RawConnError() # Reference: https://github.com/ethereum/lahja/blob/93610b2eb46969ff1797e0748c7ac2595e130aef/lahja/asyncio/endpoint.py#L99-L102 # noqa: E501 # Use a lock to serialize drain() calls. Circumvents this bug: # https://bugs.python.org/issue29930 async with self._drain_lock: - await self.writer.drain() + try: + await self.writer.drain() + except ConnectionResetError: + raise RawConnError() async def read(self, n: int = -1) -> bytes: """ Read up to ``n`` bytes from the underlying stream. This call is delegated directly to the underlying ``self.reader``. """ - return await self.reader.read(n) + try: + return await self.reader.read(n) + except ConnectionResetError: + raise RawConnError() async def close(self) -> None: self.writer.close() From d6dda91482e74e7e638739cf97b6b66367ae13a4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 19:09:09 +0800 Subject: [PATCH 08/20] Move `HandshakeFailure` to libp2p.security --- libp2p/security/exceptions.py | 5 +++++ libp2p/transport/exceptions.py | 4 ---- libp2p/transport/upgrader.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 libp2p/security/exceptions.py diff --git a/libp2p/security/exceptions.py b/libp2p/security/exceptions.py new file mode 100644 index 0000000..269b2cb --- /dev/null +++ b/libp2p/security/exceptions.py @@ -0,0 +1,5 @@ +from libp2p.exceptions import BaseLibp2pError + + +class HandshakeFailure(BaseLibp2pError): + pass diff --git a/libp2p/transport/exceptions.py b/libp2p/transport/exceptions.py index b10cfc9..00f7d38 100644 --- a/libp2p/transport/exceptions.py +++ b/libp2p/transport/exceptions.py @@ -12,7 +12,3 @@ class SecurityUpgradeFailure(UpgradeFailure): class MuxerUpgradeFailure(UpgradeFailure): pass - - -class HandshakeFailure(BaseLibp2pError): - pass diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 762a811..46656aa 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -4,13 +4,13 @@ from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.protocol_muxer.exceptions import MultiselectClientError, MultiselectError +from libp2p.security.exceptions import HandshakeFailure from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.security_multistream import SecurityMultistream from libp2p.stream_muxer.abc import IMuxedConn from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream from libp2p.transport.exceptions import ( - HandshakeFailure, MuxerUpgradeFailure, SecurityUpgradeFailure, ) From 359bcf45ff6bd94b2633bbdd72621221e3397b98 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 19:11:06 +0800 Subject: [PATCH 09/20] `SecioException` inherit from `HandshakeFailure` --- libp2p/security/secio/exceptions.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libp2p/security/secio/exceptions.py b/libp2p/security/secio/exceptions.py index f9ea8cf..892f63d 100644 --- a/libp2p/security/secio/exceptions.py +++ b/libp2p/security/secio/exceptions.py @@ -1,4 +1,7 @@ -class SecioException(Exception): +from libp2p.security.exceptions import HandshakeFailure + + +class SecioException(HandshakeFailure): pass @@ -19,9 +22,5 @@ class InvalidSignatureOnExchange(SecioException): pass -class HandshakeFailed(SecioException): - pass - - class IncompatibleChoices(SecioException): pass From c7593bff976a11da618722d8141d514ddf03bb7b Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 19:11:46 +0800 Subject: [PATCH 10/20] Add `InconsistentNonce` in secio --- libp2p/security/secio/exceptions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libp2p/security/secio/exceptions.py b/libp2p/security/secio/exceptions.py index 892f63d..c03fda4 100644 --- a/libp2p/security/secio/exceptions.py +++ b/libp2p/security/secio/exceptions.py @@ -24,3 +24,7 @@ class InvalidSignatureOnExchange(SecioException): class IncompatibleChoices(SecioException): pass + + +class InconsistentNonce(SecioException): + pass From 09bfa0ab0940324333ef73e482b61fed7fa1962d Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 19:15:40 +0800 Subject: [PATCH 11/20] Handle `IOException` in `create_secure_session` --- libp2p/security/secio/transport.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/libp2p/security/secio/transport.py b/libp2p/security/secio/transport.py index e223a94..d99dc15 100644 --- a/libp2p/security/secio/transport.py +++ b/libp2p/security/secio/transport.py @@ -16,6 +16,7 @@ from libp2p.crypto.ecc import ECCPublicKey from libp2p.crypto.key_exchange import create_ephemeral_key_pair from libp2p.crypto.keys import PrivateKey, PublicKey from libp2p.crypto.serialization import deserialize_public_key +from libp2p.io.exceptions import IOException from libp2p.io.msgio import MsgIOReadWriter from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID as PeerID @@ -24,8 +25,8 @@ from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn from .exceptions import ( - HandshakeFailed, IncompatibleChoices, + InconsistentNonce, InvalidSignatureOnExchange, PeerMismatchException, SecioException, @@ -408,14 +409,21 @@ async def create_secure_session( except SecioException as e: await conn.close() raise e + # `IOException` includes errors raised while read from/write to raw connection + except IOException: + raise SecioException("connection closed") initiator = remote_peer is not None session = _mk_session_from(local_private_key, session_parameters, msg_io, initiator) - received_nonce = await _finish_handshake(session, remote_nonce) + try: + received_nonce = await _finish_handshake(session, remote_nonce) + # `IOException` includes errors raised while read from/write to raw connection + except IOException: + raise SecioException("connection closed") if received_nonce != local_nonce: await conn.close() - raise HandshakeFailed() + raise InconsistentNonce() return session From 0080466d867e36151a7b99a0f03c8943922ea2b5 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 16 Sep 2019 19:16:41 +0800 Subject: [PATCH 12/20] Handle `RawConnError` in `InsecureSession.run_handshake` --- libp2p/security/insecure/transport.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 27efc86..5d552ac 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -4,12 +4,13 @@ from libp2p.crypto.keys import PrivateKey, PublicKey from libp2p.crypto.pb import crypto_pb2 from libp2p.crypto.utils import pubkey_from_protobuf from libp2p.io.abc import ReadWriteCloser +from libp2p.network.connection.exceptions import RawConnError from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport +from libp2p.security.exceptions import HandshakeFailure from libp2p.security.secure_conn_interface import ISecureConn -from libp2p.transport.exceptions import HandshakeFailure from libp2p.typing import TProtocol from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed @@ -47,9 +48,15 @@ class InsecureSession(BaseSession): msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) - await self.write(encoded_msg_bytes) + try: + await self.write(encoded_msg_bytes) + except RawConnError: + raise HandshakeFailure("connection closed") - remote_msg_bytes = await read_fixedint_prefixed(self.conn) + try: + remote_msg_bytes = await read_fixedint_prefixed(self.conn) + except RawConnError: + raise HandshakeFailure("connection closed") remote_msg = plaintext_pb2.Exchange() remote_msg.ParseFromString(remote_msg_bytes) received_peer_id = ID(remote_msg.id) From 559f419b4e01dec7f5748afb9d727b8d65851a77 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Sep 2019 15:42:18 +0800 Subject: [PATCH 13/20] Fix stream registration in `accept_stream` --- libp2p/stream_muxer/mplex/mplex.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index d85a7c2..7a5323d 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -103,6 +103,12 @@ class Mplex(IMuxedConn): self.next_channel_id += 1 return next_id + async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream: + async with self.streams_lock: + stream = MplexStream(name, stream_id, self) + self.streams[stream_id] = stream + return stream + async def open_stream(self) -> IMuxedStream: """ creates a new muxed_stream @@ -112,29 +118,24 @@ class Mplex(IMuxedConn): stream_id = StreamID(channel_id=channel_id, is_initiator=True) # Default stream name is the `channel_id` name = str(channel_id) - async with self.streams_lock: - stream = MplexStream(name, stream_id, self) + stream = await self._initialize_stream(stream_id, name) await self.send_message(HeaderTags.NewStream, name.encode(), stream_id) - # TODO: is there a way to know if the peer accepted the stream? - # then we can safely register the stream - self.streams[stream_id] = stream return stream async def accept_stream(self, stream_id: StreamID, name: str) -> None: """ accepts a muxed stream opened by the other end """ - async with self.streams_lock: - stream = MplexStream(name, stream_id, self) + stream = await self._initialize_stream(stream_id, name) # Perform protocol negotiation for the stream. try: await self.generic_protocol_handler(stream) except MultiselectError: - # TODO: what to do when stream protocol negotiation fail? + # Un-register and reset the stream + del self.streams[stream_id] + await stream.reset() return - self.streams[stream_id] = stream - async def send_message( self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID ) -> int: From f253152858b7d603d9164dba0400b3f183a0ae69 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Sep 2019 16:17:41 +0800 Subject: [PATCH 14/20] Handle protocol negotiation failure in swarm `new_stream` --- libp2p/network/swarm.py | 14 +++++++++++--- libp2p/pubsub/pubsub.py | 9 +++++++-- libp2p/transport/upgrader.py | 5 +---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 4d53b80..711dc88 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -7,6 +7,7 @@ from multiaddr import Multiaddr from libp2p.peer.id import ID from libp2p.peer.peerstore import PeerStoreError from libp2p.peer.peerstore_interface import IPeerStore +from libp2p.protocol_muxer.exceptions import MultiselectClientError from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator @@ -176,9 +177,16 @@ class Swarm(INetwork): muxed_stream = await muxed_conn.open_stream() # Perform protocol muxing to determine protocol to use - selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), MultiselectCommunicator(muxed_stream) - ) + try: + selected_protocol = await self.multiselect_client.select_one_of( + list(protocol_ids), MultiselectCommunicator(muxed_stream) + ) + except MultiselectClientError as error: + logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error) + await muxed_stream.reset() + raise SwarmException( + "failt to open a stream to peer %s", peer_id + ) from error # Create a net stream with the selected protocol net_stream = NetStream(muxed_stream) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index ef069d8..e413b28 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -19,11 +19,12 @@ from lru import LRU from libp2p.exceptions import ParseError, ValidationError from libp2p.host.host_interface import IHost from libp2p.io.exceptions import IncompleteReadError +from libp2p.network.exceptions import SwarmException +from libp2p.network.stream.exceptions import StreamEOF, StreamReset from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes -from libp2p.network.stream.exceptions import StreamEOF, StreamReset from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -245,7 +246,11 @@ class Pubsub: # disconnect the peer? async def _handle_new_peer(self, peer_id: ID) -> None: - stream: INetStream = await self.host.new_stream(peer_id, self.protocols) + try: + stream: INetStream = await self.host.new_stream(peer_id, self.protocols) + except SwarmException as error: + logger.debug("fail to add new peer %s, error %s", peer_id, error) + return self.peers[peer_id] = stream diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 46656aa..96234c6 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -10,10 +10,7 @@ from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.security_multistream import SecurityMultistream from libp2p.stream_muxer.abc import IMuxedConn from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream -from libp2p.transport.exceptions import ( - MuxerUpgradeFailure, - SecurityUpgradeFailure, -) +from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure from libp2p.typing import TProtocol from .listener_interface import IListener From 7243eb97660bbbf48c3b6e66b022d9816f94e0d4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 17 Sep 2019 21:44:48 +0800 Subject: [PATCH 15/20] Fix different exception raised in test --- tests/examples/test_chat.py | 4 ++-- tests/protocol_muxer/test_protocol_muxer.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/examples/test_chat.py b/tests/examples/test_chat.py index 18a172c..e2aa71d 100644 --- a/tests/examples/test_chat.py +++ b/tests/examples/test_chat.py @@ -2,8 +2,8 @@ import asyncio import pytest +from libp2p.network.exceptions import SwarmException from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.protocol_muxer.exceptions import MultiselectClientError from tests.utils import set_up_nodes_by_transport_opt PROTOCOL_ID = "/chat/1.0.0" @@ -84,7 +84,7 @@ async def no_common_protocol(host_a, host_b): host_a.set_stream_handler(PROTOCOL_ID, stream_handler) # try to creates a new new with a procotol not known by the other host - with pytest.raises(MultiselectClientError): + with pytest.raises(SwarmException): await host_b.new_stream(host_a.get_id(), ["/fakeproto/0.0.1"]) diff --git a/tests/protocol_muxer/test_protocol_muxer.py b/tests/protocol_muxer/test_protocol_muxer.py index d7523ac..4e58e5b 100644 --- a/tests/protocol_muxer/test_protocol_muxer.py +++ b/tests/protocol_muxer/test_protocol_muxer.py @@ -1,6 +1,6 @@ import pytest -from libp2p.protocol_muxer.exceptions import MultiselectClientError +from libp2p.network.exceptions import SwarmException from tests.utils import echo_stream_handler, set_up_nodes_by_transport_opt # TODO: Add tests for multiple streams being opened on different @@ -47,7 +47,7 @@ async def test_single_protocol_succeeds(): @pytest.mark.asyncio async def test_single_protocol_fails(): - with pytest.raises(MultiselectClientError): + with pytest.raises(SwarmException): await perform_simple_test("", ["/echo/1.0.0"], ["/potato/1.0.0"]) # Cleanup not reached on error @@ -77,7 +77,7 @@ async def test_multiple_protocol_second_is_valid_succeeds(): async def test_multiple_protocol_fails(): protocols_for_client = ["/rock/1.0.0", "/foo/1.0.0", "/bar/1.0.0"] protocols_for_listener = ["/aspyn/1.0.0", "/rob/1.0.0", "/zx/1.0.0", "/alex/1.0.0"] - with pytest.raises(MultiselectClientError): + with pytest.raises(SwarmException): await perform_simple_test("", protocols_for_client, protocols_for_listener) # Cleanup not reached on error From c44be5e24784ca4d934fe033011838c5c6b155b1 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 19 Sep 2019 21:23:01 +0800 Subject: [PATCH 16/20] Add `OpenConnectionError` for base transport --- libp2p/transport/exceptions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libp2p/transport/exceptions.py b/libp2p/transport/exceptions.py index 00f7d38..d935b3a 100644 --- a/libp2p/transport/exceptions.py +++ b/libp2p/transport/exceptions.py @@ -1,7 +1,10 @@ from libp2p.exceptions import BaseLibp2pError -# TODO: Add `BaseLibp2pError` and `UpgradeFailure` can inherit from it? +class OpenConnectionError(BaseLibp2pError): + pass + + class UpgradeFailure(BaseLibp2pError): pass From 1f76f6ee1ba6a6f1787a2a27737e6ba81505f561 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 19 Sep 2019 21:23:35 +0800 Subject: [PATCH 17/20] Raise `OpenConnectionError` when failed to open connection --- libp2p/transport/tcp/tcp.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index a63dbd0..db2dd74 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -6,6 +6,7 @@ from multiaddr import Multiaddr from libp2p.network.connection.raw_connection import RawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection +from libp2p.transport.exceptions import OpenConnectionError from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.typing import THandler @@ -66,7 +67,10 @@ class TCP(ITransport): self.host = maddr.value_for_protocol("ip4") self.port = int(maddr.value_for_protocol("tcp")) - reader, writer = await asyncio.open_connection(self.host, self.port) + try: + reader, writer = await asyncio.open_connection(self.host, self.port) + except (ConnectionAbortedError, ConnectionRefusedError) as error: + raise OpenConnectionError(error) return RawConnection(reader, writer, True) From b9d18750275492e37b9c9ae0ebb4fd65a8668722 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 19 Sep 2019 21:24:01 +0800 Subject: [PATCH 18/20] Catch `OpenConnectionError` in `swarm.dial_peer` --- libp2p/network/swarm.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 711dc88..83df555 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -13,7 +13,11 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator from libp2p.routing.interfaces import IPeerRouting from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream -from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure +from libp2p.transport.exceptions import ( + MuxerUpgradeFailure, + OpenConnectionError, + SecurityUpgradeFailure, +) from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader @@ -118,7 +122,13 @@ class Swarm(INetwork): multiaddr = self.router.find_peer(peer_id) # Dial peer (connection to peer does not yet exist) # Transport dials peer (gets back a raw conn) - raw_conn = await self.transport.dial(multiaddr) + try: + raw_conn = await self.transport.dial(multiaddr) + except OpenConnectionError as error: + logger.debug("fail to dial peer %s over base transport", peer_id) + raise SwarmException( + "fail to open connection to peer %s", peer_id + ) from error logger.debug("dialed peer %s over base transport", peer_id) From c6294ad19b3c998b04eaa4171f4c659a0028c402 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 19 Sep 2019 21:51:23 +0800 Subject: [PATCH 19/20] Raise `MultiselectCommunicatorError`: when failed to write to communicator --- libp2p/protocol_muxer/multiselect.py | 15 ++++++++++++--- libp2p/protocol_muxer/multiselect_client.py | 10 ++++++++-- libp2p/protocol_muxer/multiselect_communicator.py | 7 ++++++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index c76e07f..13e6a0d 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -59,12 +59,18 @@ class Multiselect(IMultiselectMuxer): protocol = TProtocol(command) if protocol in self.handlers: # Tell counterparty we have decided on a protocol - await communicator.write(protocol) + try: + await communicator.write(protocol) + except MultiselectCommunicatorError as error: + raise MultiselectError(error) # Return the decided on protocol return protocol, self.handlers[protocol] # Tell counterparty this protocol was not found - await communicator.write(PROTOCOL_NOT_FOUND_MSG) + try: + await communicator.write(PROTOCOL_NOT_FOUND_MSG) + except MultiselectCommunicatorError as error: + raise MultiselectError(error) async def handshake(self, communicator: IMultiselectCommunicator) -> None: """ @@ -76,7 +82,10 @@ class Multiselect(IMultiselectMuxer): # TODO: Use format used by go repo for messages # Send our MULTISELECT_PROTOCOL_ID to other party - await communicator.write(MULTISELECT_PROTOCOL_ID) + try: + await communicator.write(MULTISELECT_PROTOCOL_ID) + except MultiselectCommunicatorError as error: + raise MultiselectError(error) # Read in the protocol ID from other party try: diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 51af025..361100b 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -27,7 +27,10 @@ class MultiselectClient(IMultiselectClient): # TODO: Use format used by go repo for messages # Send our MULTISELECT_PROTOCOL_ID to counterparty - await communicator.write(MULTISELECT_PROTOCOL_ID) + try: + await communicator.write(MULTISELECT_PROTOCOL_ID) + except MultiselectCommunicatorError as error: + raise MultiselectClientError(error) # Read in the protocol ID from other party try: @@ -79,7 +82,10 @@ class MultiselectClient(IMultiselectClient): """ # Tell counterparty we want to use protocol - await communicator.write(protocol) + try: + await communicator.write(protocol) + except MultiselectCommunicatorError as error: + raise MultiselectClientError(error) # Get what counterparty says in response try: diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index d946850..6dbc50f 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -15,7 +15,12 @@ class MultiselectCommunicator(IMultiselectCommunicator): async def write(self, msg_str: str) -> None: msg_bytes = encode_delim(msg_str.encode()) - await self.read_writer.write(msg_bytes) + try: + await self.read_writer.write(msg_bytes) + except IOException: + raise MultiselectCommunicatorError( + "fail to write to multiselect communicator" + ) async def read(self) -> str: try: From 7fc958e7bec3863b0487e31e911d72f540f83e20 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 19 Sep 2019 22:19:36 +0800 Subject: [PATCH 20/20] Add exception raised to docstring --- libp2p/network/connection/raw_connection.py | 13 +++++++++---- libp2p/network/swarm.py | 1 + libp2p/protocol_muxer/multiselect.py | 8 ++++---- libp2p/protocol_muxer/multiselect_client.py | 5 +++-- libp2p/protocol_muxer/multiselect_communicator.py | 6 ++++++ libp2p/security/insecure/transport.py | 3 +++ libp2p/security/secio/transport.py | 2 ++ libp2p/transport/tcp/tcp.py | 1 + 8 files changed, 29 insertions(+), 10 deletions(-) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 2bdb12a..144c1a8 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -24,10 +24,13 @@ class RawConnection(IRawConnection): self._drain_lock = asyncio.Lock() async def write(self, data: bytes) -> None: + """ + Raise `RawConnError` if the underlying connection breaks + """ try: self.writer.write(data) - except ConnectionResetError: - raise RawConnError() + except ConnectionResetError as error: + raise RawConnError(error) # Reference: https://github.com/ethereum/lahja/blob/93610b2eb46969ff1797e0748c7ac2595e130aef/lahja/asyncio/endpoint.py#L99-L102 # noqa: E501 # Use a lock to serialize drain() calls. Circumvents this bug: # https://bugs.python.org/issue29930 @@ -41,11 +44,13 @@ class RawConnection(IRawConnection): """ Read up to ``n`` bytes from the underlying stream. This call is delegated directly to the underlying ``self.reader``. + + Raise `RawConnError` if the underlying connection breaks """ try: return await self.reader.read(n) - except ConnectionResetError: - raise RawConnError() + except ConnectionResetError as error: + raise RawConnError(error) async def close(self) -> None: self.writer.close() diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 83df555..da32a02 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -173,6 +173,7 @@ class Swarm(INetwork): """ :param peer_id: peer_id of destination :param protocol_id: protocol id + :raises SwarmException: raised when an error occurs :return: net stream instance """ logger.debug( diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 13e6a0d..a0fa91f 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -37,7 +37,7 @@ class Multiselect(IMultiselectMuxer): Negotiate performs protocol selection :param stream: stream to negotiate on :return: selected protocol name, handler function - :raise Exception: negotiation failed exception + :raise MultiselectError: raised when negotiation failed """ # Perform handshake to ensure multiselect protocol IDs match @@ -49,7 +49,7 @@ class Multiselect(IMultiselectMuxer): try: command = await communicator.read() except MultiselectCommunicatorError as error: - raise MultiselectError(str(error)) + raise MultiselectError(error) # Command is ls or a protocol if command == "ls": @@ -76,7 +76,7 @@ class Multiselect(IMultiselectMuxer): """ Perform handshake to agree on multiselect protocol :param communicator: communicator to use - :raise Exception: error in handshake + :raise MultiselectError: raised when handshake failed """ # TODO: Use format used by go repo for messages @@ -91,7 +91,7 @@ class Multiselect(IMultiselectMuxer): try: handshake_contents = await communicator.read() except MultiselectCommunicatorError as error: - raise MultiselectError(str(error)) + raise MultiselectError(error) # Confirm that the protocols are the same if not validate_handshake(handshake_contents): diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 361100b..24db70a 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -21,7 +21,7 @@ class MultiselectClient(IMultiselectClient): Ensure that the client and multiselect are both using the same multiselect protocol :param stream: stream to communicate with multiselect over - :raise Exception: multiselect protocol ID mismatch + :raise MultiselectClientError: raised when handshake failed """ # TODO: Use format used by go repo for messages @@ -54,6 +54,7 @@ class MultiselectClient(IMultiselectClient): :param protocol: protocol to select :param stream: stream to communicate with multiselect over :return: selected protocol + :raise MultiselectClientError: raised when protocol negotiation failed """ # Perform handshake to ensure multiselect protocol IDs match await self.handshake(communicator) @@ -77,7 +78,7 @@ class MultiselectClient(IMultiselectClient): Try to select the given protocol or raise exception if fails :param communicator: communicator to use to communicate with counterparty :param protocol: protocol to select - :raise Exception: error in protocol selection + :raise MultiselectClientError: raised when protocol negotiation failed :return: selected protocol """ diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 6dbc50f..a66a564 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -14,6 +14,9 @@ class MultiselectCommunicator(IMultiselectCommunicator): self.read_writer = read_writer async def write(self, msg_str: str) -> None: + """ + :raise MultiselectCommunicatorError: raised when failed to write to underlying reader + """ msg_bytes = encode_delim(msg_str.encode()) try: await self.read_writer.write(msg_bytes) @@ -23,6 +26,9 @@ class MultiselectCommunicator(IMultiselectCommunicator): ) async def read(self) -> str: + """ + :raise MultiselectCommunicatorError: raised when failed to read from underlying reader + """ try: data = await read_delim(self.read_writer) # `IOException` includes `IncompleteReadError` and `StreamError` diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 5d552ac..7df0575 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -45,6 +45,9 @@ class InsecureSession(BaseSession): await self.conn.close() async def run_handshake(self) -> None: + """ + Raise `HandshakeFailure` when handshake failed + """ msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) diff --git a/libp2p/security/secio/transport.py b/libp2p/security/secio/transport.py index d99dc15..e1aa022 100644 --- a/libp2p/security/secio/transport.py +++ b/libp2p/security/secio/transport.py @@ -400,6 +400,8 @@ async def create_secure_session( Attempt the initial `secio` handshake with the remote peer. If successful, return an object that provides secure communication to the ``remote_peer``. + Raise `SecioException` when `conn` closed. + Raise `InconsistentNonce` when handshake failed """ msg_io = MsgIOReadWriter(conn) try: diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index db2dd74..5ee2428 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -63,6 +63,7 @@ class TCP(ITransport): dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer :return: `RawConnection` if successful + :raise OpenConnectionError: raised when failed to open connection """ self.host = maddr.value_for_protocol("ip4") self.port = int(maddr.value_for_protocol("tcp"))