From 10415cb95638acbfb2c6ae392a768b04ab4d0446 Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 5 Sep 2019 23:24:17 +0800 Subject: [PATCH] Use `ReadWriteCloser` for conns and streams --- libp2p/network/stream/net_stream.py | 3 +- libp2p/network/stream/net_stream_interface.py | 27 ++--------------- libp2p/network/swarm.py | 6 ++-- .../multiselect_communicator.py | 30 +++++-------------- libp2p/security/security_multistream.py | 4 +-- libp2p/stream_muxer/abc.py | 25 ++-------------- libp2p/stream_muxer/muxer_multistream.py | 4 +-- libp2p/typing.py | 6 +--- libp2p/utils.py | 7 ++--- 9 files changed, 24 insertions(+), 88 deletions(-) diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index ff78f5a..7383f73 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -44,13 +44,12 @@ class NetStream(INetStream): """ return await self.muxed_stream.write(data) - async def close(self) -> bool: + async def close(self) -> None: """ close stream :return: true if successful """ await self.muxed_stream.close() - return True async def reset(self) -> bool: return await self.muxed_stream.reset() diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 4df95d8..aaa775a 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -1,10 +1,11 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod +from libp2p.io.abc import ReadWriteCloser from libp2p.stream_muxer.abc import IMuxedConn from libp2p.typing import TProtocol -class INetStream(ABC): +class INetStream(ReadWriteCloser): mplex_conn: IMuxedConn @@ -21,28 +22,6 @@ class INetStream(ABC): :return: true if successful """ - @abstractmethod - async def read(self, n: int = -1) -> bytes: - """ - reads from the underlying muxed_stream - :param n: number of bytes to read - :return: bytes of input - """ - - @abstractmethod - async def write(self, data: bytes) -> int: - """ - write to the underlying muxed_stream - :return: number of bytes written - """ - - @abstractmethod - async def close(self) -> bool: - """ - close the underlying muxed stream - :return: true if successful - """ - @abstractmethod async def reset(self) -> bool: """ diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index bccfdac..38cbf71 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -8,7 +8,7 @@ from libp2p.peer.peerstore import PeerStoreError from libp2p.peer.peerstore_interface import IPeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient -from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator +from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator from libp2p.routing.interfaces import IPeerRouting from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure @@ -161,7 +161,7 @@ class Swarm(INetwork): # Perform protocol muxing to determine protocol to use selected_protocol = await self.multiselect_client.select_one_of( - list(protocol_ids), StreamCommunicator(muxed_stream) + list(protocol_ids), MultiselectCommunicator(muxed_stream) ) # Create a net stream with the selected protocol @@ -294,7 +294,7 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None: # Perform protocol muxing to determine protocol to use protocol, handler = await multiselect.negotiate( - StreamCommunicator(muxed_stream) + MultiselectCommunicator(muxed_stream) ) net_stream = NetStream(muxed_stream) diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index e252304..59252c5 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,35 +1,19 @@ -from libp2p.network.connection.raw_connection_interface import IRawConnection -from libp2p.stream_muxer.abc import IMuxedStream +from libp2p.io.abc import ReadWriteCloser from libp2p.utils import encode_delim, read_delim from .multiselect_communicator_interface import IMultiselectCommunicator -class RawConnectionCommunicator(IMultiselectCommunicator): - conn: IRawConnection +class MultiselectCommunicator(IMultiselectCommunicator): + read_writer: ReadWriteCloser - def __init__(self, conn: IRawConnection) -> None: - self.conn = conn + def __init__(self, read_writer: ReadWriteCloser) -> None: + self.read_writer = read_writer async def write(self, msg_str: str) -> None: msg_bytes = encode_delim(msg_str.encode()) - await self.conn.write(msg_bytes) + await self.read_writer.write(msg_bytes) async def read(self) -> str: - data = await read_delim(self.conn) - return data.decode() - - -class StreamCommunicator(IMultiselectCommunicator): - stream: IMuxedStream - - def __init__(self, stream: IMuxedStream) -> None: - self.stream = stream - - async def write(self, msg_str: str) -> None: - msg_bytes = encode_delim(msg_str.encode()) - await self.stream.write(msg_bytes) - - async def read(self) -> str: - data = await read_delim(self.stream) + data = await read_delim(self.read_writer) return data.decode() diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 6e69d7a..466d60a 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -6,7 +6,7 @@ from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient -from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator +from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.typing import TProtocol @@ -88,7 +88,7 @@ class SecurityMultistream(ABC): :return: selected secure transport """ protocol: TProtocol - communicator = RawConnectionCommunicator(conn) + communicator = MultiselectCommunicator(conn) if initiator: # Select protocol if initiator protocol = await self.multiselect_client.select_one_of( diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 6e7737e..2c577c7 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING +from libp2p.io.abc import ReadWriteCloser from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.mplex.constants import HeaderTags @@ -76,32 +77,10 @@ class IMuxedConn(ABC): """ -class IMuxedStream(ABC): +class IMuxedStream(ReadWriteCloser): mplex_conn: IMuxedConn - @abstractmethod - async def read(self, n: int = -1) -> bytes: - """ - reads from the underlying muxed_conn - :param n: number of bytes to read - :return: bytes of input - """ - - @abstractmethod - async def write(self, data: bytes) -> int: - """ - writes to the underlying muxed_conn - :return: number of bytes written - """ - - @abstractmethod - async def close(self) -> bool: - """ - close the underlying muxed_conn - :return: true if successful - """ - @abstractmethod async def reset(self) -> bool: """ diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index 703c4e2..b118cee 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -6,7 +6,7 @@ from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient -from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator +from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator from libp2p.security.secure_conn_interface import ISecureConn from libp2p.typing import TProtocol @@ -60,7 +60,7 @@ class MuxerMultistream: :return: selected muxer transport """ protocol: TProtocol - communicator = RawConnectionCommunicator(conn) + communicator = MultiselectCommunicator(conn) if conn.initiator: protocol = await self.multiselect_client.select_one_of( tuple(self.transports.keys()), communicator diff --git a/libp2p/typing.py b/libp2p/typing.py index ba776e1..be0b584 100644 --- a/libp2p/typing.py +++ b/libp2p/typing.py @@ -1,6 +1,4 @@ -from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union - -from libp2p.network.connection.raw_connection_interface import IRawConnection +from typing import TYPE_CHECKING, Awaitable, Callable, NewType if TYPE_CHECKING: from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401 @@ -8,5 +6,3 @@ if TYPE_CHECKING: TProtocol = NewType("TProtocol", str) StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] - -StreamReader = Union["IMuxedStream", "INetStream", IRawConnection] diff --git a/libp2p/utils.py b/libp2p/utils.py index e1c45fd..7374993 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -3,7 +3,6 @@ import math from libp2p.exceptions import ParseError from libp2p.io.abc import Reader -from libp2p.typing import StreamReader # Unsigned LEB128(varint codec) # Reference: https://github.com/ethereum/py-wasm/blob/master/wasm/parsers/leb128.py @@ -31,7 +30,7 @@ def encode_uvarint(number: int) -> bytes: return buf -async def decode_uvarint_from_stream(reader: StreamReader) -> int: +async def decode_uvarint_from_stream(reader: Reader) -> int: """ https://en.wikipedia.org/wiki/LEB128 """ @@ -61,7 +60,7 @@ def encode_varint_prefixed(msg_bytes: bytes) -> bytes: return varint_len + msg_bytes -async def read_varint_prefixed_bytes(reader: StreamReader) -> bytes: +async def read_varint_prefixed_bytes(reader: Reader) -> bytes: len_msg = await decode_uvarint_from_stream(reader) data = await reader.read(len_msg) if len(data) != len_msg: @@ -80,7 +79,7 @@ def encode_delim(msg: bytes) -> bytes: return encode_varint_prefixed(delimited_msg) -async def read_delim(reader: StreamReader) -> bytes: +async def read_delim(reader: Reader) -> bytes: msg_bytes = await read_varint_prefixed_bytes(reader) # TODO: Investigate if it is possible to have empty `msg_bytes` if len(msg_bytes) != 0 and msg_bytes[-1:] != b"\n":