From a0923d202a86fef26690e3ce5ffab8a483452ae1 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 17 Aug 2019 00:19:37 +0800 Subject: [PATCH] Move varint and delim read/write to toplevel To `libp2p.utils`. --- .../multiselect_communicator.py | 23 +++----------- libp2p/security/base_session.py | 2 +- libp2p/stream_muxer/abc.py | 6 +++- libp2p/stream_muxer/mplex/mplex.py | 2 +- libp2p/transport/tcp/tcp.py | 1 + libp2p/{stream_muxer/mplex => }/utils.py | 31 +++++++++++++++++++ 6 files changed, 44 insertions(+), 21 deletions(-) rename libp2p/{stream_muxer/mplex => }/utils.py (59%) diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 783c380..ebfcc23 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,23 +1,10 @@ from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.stream_muxer.abc import IMuxedStream -from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint -from libp2p.typing import StreamReader +from libp2p.utils import encode_delim, read_delim from .multiselect_communicator_interface import IMultiselectCommunicator -def delim_encode(msg_str: str) -> bytes: - msg_bytes = msg_str.encode() - varint_len_msg = encode_uvarint(len(msg_bytes) + 1) - return varint_len_msg + msg_bytes + b"\n" - - -async def delim_read(reader: StreamReader, timeout: int = 10) -> str: - len_msg = await decode_uvarint_from_stream(reader, timeout) - msg_bytes = await reader.read(len_msg) - return msg_bytes.decode().rstrip() - - class RawConnectionCommunicator(IMultiselectCommunicator): conn: IRawConnection @@ -25,12 +12,12 @@ class RawConnectionCommunicator(IMultiselectCommunicator): self.conn = conn async def write(self, msg_str: str) -> None: - msg_bytes = delim_encode(msg_str) + msg_bytes = encode_delim(msg_str) self.conn.writer.write(msg_bytes) await self.conn.writer.drain() async def read(self) -> str: - return await delim_read(self.conn.reader) + return await read_delim(self.conn.reader) class StreamCommunicator(IMultiselectCommunicator): @@ -40,8 +27,8 @@ class StreamCommunicator(IMultiselectCommunicator): self.stream = stream async def write(self, msg_str: str) -> None: - msg_bytes = delim_encode(msg_str) + msg_bytes = encode_delim(msg_str) await self.stream.write(msg_bytes) async def read(self) -> str: - return await delim_read(self.stream) + return await read_delim(self.stream) diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index 8fc4dab..ba14037 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -7,7 +7,7 @@ from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn -class BaseSession(ISecureConn, IRawConnection): +class BaseSession(ISecureConn): """ ``BaseSession`` is not fully instantiated from its abstract classes as it is only meant to be used in clases that derive from it. diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 26ad509..7270c15 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -17,7 +17,6 @@ class IMuxedConn(ABC): reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ - initiator: bool peer_id: ID @abstractmethod @@ -35,6 +34,11 @@ class IMuxedConn(ABC): :param peer_id: peer_id of peer the connection is to """ + @property + @abstractmethod + def initiator(self) -> bool: + pass + @abstractmethod def close(self) -> None: """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 2aa59c9..aeae0ba 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -13,7 +13,7 @@ from libp2p.typing import TProtocol from .constants import HeaderTags from .exceptions import StreamNotFound from .mplex_stream import MplexStream -from .utils import decode_uvarint_from_stream, encode_uvarint +from libp2p.utils import decode_uvarint_from_stream, encode_uvarint MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index ae1e776..aee0313 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -62,6 +62,7 @@ class TCPListener(IListener): class TCP(ITransport): + # TODO: Remove `self_id` async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/utils.py similarity index 59% rename from libp2p/stream_muxer/mplex/utils.py rename to libp2p/utils.py index 44326ad..9ecaa83 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/utils.py @@ -5,6 +5,9 @@ from typing import Tuple from libp2p.typing import StreamReader +TIMEOUT = 10 + + def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" buf = b"" @@ -45,3 +48,31 @@ async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> in break return result + + +# Varint-prefixed read/write + + +def encode_varint_prefixed(msg_bytes: bytes) -> bytes: + varint_len = encode_uvarint(len(msg_bytes)) + return varint_len + msg_bytes + + +async def read_varint_prefixed_bytes( + reader: StreamReader, timeout: int = TIMEOUT +) -> bytes: + len_msg = await decode_uvarint_from_stream(reader, timeout) + return await reader.read(len_msg) + + +# Delimited read/write + + +def encode_delim(msg_str: str) -> bytes: + delimited_msg = msg_str + "\n" + return encode_varint_prefixed(delimited_msg.encode()) + + +async def read_delim(reader: StreamReader, timeout: int = TIMEOUT) -> str: + msg_bytes = await read_varint_prefixed_bytes(reader, timeout) + return msg_bytes.decode().rstrip()