Move varint and delim read/write to toplevel

To `libp2p.utils`.
This commit is contained in:
mhchia 2019-08-17 00:19:37 +08:00
parent 5192944724
commit a0923d202a
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
6 changed files with 44 additions and 21 deletions

View File

@ -1,23 +1,10 @@
from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.stream_muxer.abc import IMuxedStream from libp2p.stream_muxer.abc import IMuxedStream
from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint from libp2p.utils import encode_delim, read_delim
from libp2p.typing import StreamReader
from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_communicator_interface import IMultiselectCommunicator
def delim_encode(msg_str: str) -> bytes:
msg_bytes = msg_str.encode()
varint_len_msg = encode_uvarint(len(msg_bytes) + 1)
return varint_len_msg + msg_bytes + b"\n"
async def delim_read(reader: StreamReader, timeout: int = 10) -> str:
len_msg = await decode_uvarint_from_stream(reader, timeout)
msg_bytes = await reader.read(len_msg)
return msg_bytes.decode().rstrip()
class RawConnectionCommunicator(IMultiselectCommunicator): class RawConnectionCommunicator(IMultiselectCommunicator):
conn: IRawConnection conn: IRawConnection
@ -25,12 +12,12 @@ class RawConnectionCommunicator(IMultiselectCommunicator):
self.conn = conn self.conn = conn
async def write(self, msg_str: str) -> None: async def write(self, msg_str: str) -> None:
msg_bytes = delim_encode(msg_str) msg_bytes = encode_delim(msg_str)
self.conn.writer.write(msg_bytes) self.conn.writer.write(msg_bytes)
await self.conn.writer.drain() await self.conn.writer.drain()
async def read(self) -> str: async def read(self) -> str:
return await delim_read(self.conn.reader) return await read_delim(self.conn.reader)
class StreamCommunicator(IMultiselectCommunicator): class StreamCommunicator(IMultiselectCommunicator):
@ -40,8 +27,8 @@ class StreamCommunicator(IMultiselectCommunicator):
self.stream = stream self.stream = stream
async def write(self, msg_str: str) -> None: async def write(self, msg_str: str) -> None:
msg_bytes = delim_encode(msg_str) msg_bytes = encode_delim(msg_str)
await self.stream.write(msg_bytes) await self.stream.write(msg_bytes)
async def read(self) -> str: async def read(self) -> str:
return await delim_read(self.stream) return await read_delim(self.stream)

View File

@ -7,7 +7,7 @@ from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
class BaseSession(ISecureConn, IRawConnection): class BaseSession(ISecureConn):
""" """
``BaseSession`` is not fully instantiated from its abstract classes as it ``BaseSession`` is not fully instantiated from its abstract classes as it
is only meant to be used in clases that derive from it. is only meant to be used in clases that derive from it.

View File

@ -17,7 +17,6 @@ class IMuxedConn(ABC):
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
""" """
initiator: bool
peer_id: ID peer_id: ID
@abstractmethod @abstractmethod
@ -35,6 +34,11 @@ class IMuxedConn(ABC):
:param peer_id: peer_id of peer the connection is to :param peer_id: peer_id of peer the connection is to
""" """
@property
@abstractmethod
def initiator(self) -> bool:
pass
@abstractmethod @abstractmethod
def close(self) -> None: def close(self) -> None:
""" """

View File

@ -13,7 +13,7 @@ from libp2p.typing import TProtocol
from .constants import HeaderTags from .constants import HeaderTags
from .exceptions import StreamNotFound from .exceptions import StreamNotFound
from .mplex_stream import MplexStream from .mplex_stream import MplexStream
from .utils import decode_uvarint_from_stream, encode_uvarint from libp2p.utils import decode_uvarint_from_stream, encode_uvarint
MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0")

View File

@ -62,6 +62,7 @@ class TCPListener(IListener):
class TCP(ITransport): class TCP(ITransport):
# TODO: Remove `self_id`
async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection:
""" """
dial a transport to peer listening on multiaddr dial a transport to peer listening on multiaddr

View File

@ -5,6 +5,9 @@ from typing import Tuple
from libp2p.typing import StreamReader from libp2p.typing import StreamReader
TIMEOUT = 10
def encode_uvarint(number: int) -> bytes: def encode_uvarint(number: int) -> bytes:
"""Pack `number` into varint bytes""" """Pack `number` into varint bytes"""
buf = b"" buf = b""
@ -45,3 +48,31 @@ async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> in
break break
return result return result
# Varint-prefixed read/write
def encode_varint_prefixed(msg_bytes: bytes) -> bytes:
varint_len = encode_uvarint(len(msg_bytes))
return varint_len + msg_bytes
async def read_varint_prefixed_bytes(
reader: StreamReader, timeout: int = TIMEOUT
) -> bytes:
len_msg = await decode_uvarint_from_stream(reader, timeout)
return await reader.read(len_msg)
# Delimited read/write
def encode_delim(msg_str: str) -> bytes:
delimited_msg = msg_str + "\n"
return encode_varint_prefixed(delimited_msg.encode())
async def read_delim(reader: StreamReader, timeout: int = TIMEOUT) -> str:
msg_bytes = await read_varint_prefixed_bytes(reader, timeout)
return msg_bytes.decode().rstrip()