Use ReadWriteCloser
for conns and streams
This commit is contained in:
parent
eac159c527
commit
10415cb956
|
@ -44,13 +44,12 @@ class NetStream(INetStream):
|
||||||
"""
|
"""
|
||||||
return await self.muxed_stream.write(data)
|
return await self.muxed_stream.write(data)
|
||||||
|
|
||||||
async def close(self) -> bool:
|
async def close(self) -> None:
|
||||||
"""
|
"""
|
||||||
close stream
|
close stream
|
||||||
:return: true if successful
|
:return: true if successful
|
||||||
"""
|
"""
|
||||||
await self.muxed_stream.close()
|
await self.muxed_stream.close()
|
||||||
return True
|
|
||||||
|
|
||||||
async def reset(self) -> bool:
|
async def reset(self) -> bool:
|
||||||
return await self.muxed_stream.reset()
|
return await self.muxed_stream.reset()
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import abstractmethod
|
||||||
|
|
||||||
|
from libp2p.io.abc import ReadWriteCloser
|
||||||
from libp2p.stream_muxer.abc import IMuxedConn
|
from libp2p.stream_muxer.abc import IMuxedConn
|
||||||
from libp2p.typing import TProtocol
|
from libp2p.typing import TProtocol
|
||||||
|
|
||||||
|
|
||||||
class INetStream(ABC):
|
class INetStream(ReadWriteCloser):
|
||||||
|
|
||||||
mplex_conn: IMuxedConn
|
mplex_conn: IMuxedConn
|
||||||
|
|
||||||
|
@ -21,28 +22,6 @@ class INetStream(ABC):
|
||||||
:return: true if successful
|
:return: true if successful
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def read(self, n: int = -1) -> bytes:
|
|
||||||
"""
|
|
||||||
reads from the underlying muxed_stream
|
|
||||||
:param n: number of bytes to read
|
|
||||||
:return: bytes of input
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def write(self, data: bytes) -> int:
|
|
||||||
"""
|
|
||||||
write to the underlying muxed_stream
|
|
||||||
:return: number of bytes written
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def close(self) -> bool:
|
|
||||||
"""
|
|
||||||
close the underlying muxed stream
|
|
||||||
:return: true if successful
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def reset(self) -> bool:
|
async def reset(self) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -8,7 +8,7 @@ from libp2p.peer.peerstore import PeerStoreError
|
||||||
from libp2p.peer.peerstore_interface import IPeerStore
|
from libp2p.peer.peerstore_interface import IPeerStore
|
||||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||||
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
||||||
from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator
|
from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator
|
||||||
from libp2p.routing.interfaces import IPeerRouting
|
from libp2p.routing.interfaces import IPeerRouting
|
||||||
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
||||||
from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure
|
from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure
|
||||||
|
@ -161,7 +161,7 @@ class Swarm(INetwork):
|
||||||
|
|
||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
selected_protocol = await self.multiselect_client.select_one_of(
|
selected_protocol = await self.multiselect_client.select_one_of(
|
||||||
list(protocol_ids), StreamCommunicator(muxed_stream)
|
list(protocol_ids), MultiselectCommunicator(muxed_stream)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create a net stream with the selected protocol
|
# Create a net stream with the selected protocol
|
||||||
|
@ -294,7 +294,7 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn:
|
||||||
async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None:
|
async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None:
|
||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
protocol, handler = await multiselect.negotiate(
|
protocol, handler = await multiselect.negotiate(
|
||||||
StreamCommunicator(muxed_stream)
|
MultiselectCommunicator(muxed_stream)
|
||||||
)
|
)
|
||||||
|
|
||||||
net_stream = NetStream(muxed_stream)
|
net_stream = NetStream(muxed_stream)
|
||||||
|
|
|
@ -1,35 +1,19 @@
|
||||||
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
from libp2p.io.abc import ReadWriteCloser
|
||||||
from libp2p.stream_muxer.abc import IMuxedStream
|
|
||||||
from libp2p.utils import encode_delim, read_delim
|
from libp2p.utils import encode_delim, read_delim
|
||||||
|
|
||||||
from .multiselect_communicator_interface import IMultiselectCommunicator
|
from .multiselect_communicator_interface import IMultiselectCommunicator
|
||||||
|
|
||||||
|
|
||||||
class RawConnectionCommunicator(IMultiselectCommunicator):
|
class MultiselectCommunicator(IMultiselectCommunicator):
|
||||||
conn: IRawConnection
|
read_writer: ReadWriteCloser
|
||||||
|
|
||||||
def __init__(self, conn: IRawConnection) -> None:
|
def __init__(self, read_writer: ReadWriteCloser) -> None:
|
||||||
self.conn = conn
|
self.read_writer = read_writer
|
||||||
|
|
||||||
async def write(self, msg_str: str) -> None:
|
async def write(self, msg_str: str) -> None:
|
||||||
msg_bytes = encode_delim(msg_str.encode())
|
msg_bytes = encode_delim(msg_str.encode())
|
||||||
await self.conn.write(msg_bytes)
|
await self.read_writer.write(msg_bytes)
|
||||||
|
|
||||||
async def read(self) -> str:
|
async def read(self) -> str:
|
||||||
data = await read_delim(self.conn)
|
data = await read_delim(self.read_writer)
|
||||||
return data.decode()
|
|
||||||
|
|
||||||
|
|
||||||
class StreamCommunicator(IMultiselectCommunicator):
|
|
||||||
stream: IMuxedStream
|
|
||||||
|
|
||||||
def __init__(self, stream: IMuxedStream) -> None:
|
|
||||||
self.stream = stream
|
|
||||||
|
|
||||||
async def write(self, msg_str: str) -> None:
|
|
||||||
msg_bytes = encode_delim(msg_str.encode())
|
|
||||||
await self.stream.write(msg_bytes)
|
|
||||||
|
|
||||||
async def read(self) -> str:
|
|
||||||
data = await read_delim(self.stream)
|
|
||||||
return data.decode()
|
return data.decode()
|
||||||
|
|
|
@ -6,7 +6,7 @@ from libp2p.network.connection.raw_connection_interface import IRawConnection
|
||||||
from libp2p.peer.id import ID
|
from libp2p.peer.id import ID
|
||||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||||
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
||||||
from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator
|
from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator
|
||||||
from libp2p.security.secure_conn_interface import ISecureConn
|
from libp2p.security.secure_conn_interface import ISecureConn
|
||||||
from libp2p.security.secure_transport_interface import ISecureTransport
|
from libp2p.security.secure_transport_interface import ISecureTransport
|
||||||
from libp2p.typing import TProtocol
|
from libp2p.typing import TProtocol
|
||||||
|
@ -88,7 +88,7 @@ class SecurityMultistream(ABC):
|
||||||
:return: selected secure transport
|
:return: selected secure transport
|
||||||
"""
|
"""
|
||||||
protocol: TProtocol
|
protocol: TProtocol
|
||||||
communicator = RawConnectionCommunicator(conn)
|
communicator = MultiselectCommunicator(conn)
|
||||||
if initiator:
|
if initiator:
|
||||||
# Select protocol if initiator
|
# Select protocol if initiator
|
||||||
protocol = await self.multiselect_client.select_one_of(
|
protocol = await self.multiselect_client.select_one_of(
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from libp2p.io.abc import ReadWriteCloser
|
||||||
from libp2p.peer.id import ID
|
from libp2p.peer.id import ID
|
||||||
from libp2p.security.secure_conn_interface import ISecureConn
|
from libp2p.security.secure_conn_interface import ISecureConn
|
||||||
from libp2p.stream_muxer.mplex.constants import HeaderTags
|
from libp2p.stream_muxer.mplex.constants import HeaderTags
|
||||||
|
@ -76,32 +77,10 @@ class IMuxedConn(ABC):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
class IMuxedStream(ABC):
|
class IMuxedStream(ReadWriteCloser):
|
||||||
|
|
||||||
mplex_conn: IMuxedConn
|
mplex_conn: IMuxedConn
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def read(self, n: int = -1) -> bytes:
|
|
||||||
"""
|
|
||||||
reads from the underlying muxed_conn
|
|
||||||
:param n: number of bytes to read
|
|
||||||
:return: bytes of input
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def write(self, data: bytes) -> int:
|
|
||||||
"""
|
|
||||||
writes to the underlying muxed_conn
|
|
||||||
:return: number of bytes written
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def close(self) -> bool:
|
|
||||||
"""
|
|
||||||
close the underlying muxed_conn
|
|
||||||
:return: true if successful
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def reset(self) -> bool:
|
async def reset(self) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -6,7 +6,7 @@ from libp2p.network.typing import GenericProtocolHandlerFn
|
||||||
from libp2p.peer.id import ID
|
from libp2p.peer.id import ID
|
||||||
from libp2p.protocol_muxer.multiselect import Multiselect
|
from libp2p.protocol_muxer.multiselect import Multiselect
|
||||||
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
||||||
from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator
|
from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator
|
||||||
from libp2p.security.secure_conn_interface import ISecureConn
|
from libp2p.security.secure_conn_interface import ISecureConn
|
||||||
from libp2p.typing import TProtocol
|
from libp2p.typing import TProtocol
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ class MuxerMultistream:
|
||||||
:return: selected muxer transport
|
:return: selected muxer transport
|
||||||
"""
|
"""
|
||||||
protocol: TProtocol
|
protocol: TProtocol
|
||||||
communicator = RawConnectionCommunicator(conn)
|
communicator = MultiselectCommunicator(conn)
|
||||||
if conn.initiator:
|
if conn.initiator:
|
||||||
protocol = await self.multiselect_client.select_one_of(
|
protocol = await self.multiselect_client.select_one_of(
|
||||||
tuple(self.transports.keys()), communicator
|
tuple(self.transports.keys()), communicator
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union
|
from typing import TYPE_CHECKING, Awaitable, Callable, NewType
|
||||||
|
|
||||||
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401
|
from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401
|
||||||
|
@ -8,5 +6,3 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
TProtocol = NewType("TProtocol", str)
|
TProtocol = NewType("TProtocol", str)
|
||||||
StreamHandlerFn = Callable[["INetStream"], Awaitable[None]]
|
StreamHandlerFn = Callable[["INetStream"], Awaitable[None]]
|
||||||
|
|
||||||
StreamReader = Union["IMuxedStream", "INetStream", IRawConnection]
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ import math
|
||||||
|
|
||||||
from libp2p.exceptions import ParseError
|
from libp2p.exceptions import ParseError
|
||||||
from libp2p.io.abc import Reader
|
from libp2p.io.abc import Reader
|
||||||
from libp2p.typing import StreamReader
|
|
||||||
|
|
||||||
# Unsigned LEB128(varint codec)
|
# Unsigned LEB128(varint codec)
|
||||||
# Reference: https://github.com/ethereum/py-wasm/blob/master/wasm/parsers/leb128.py
|
# Reference: https://github.com/ethereum/py-wasm/blob/master/wasm/parsers/leb128.py
|
||||||
|
@ -31,7 +30,7 @@ def encode_uvarint(number: int) -> bytes:
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
|
|
||||||
async def decode_uvarint_from_stream(reader: StreamReader) -> int:
|
async def decode_uvarint_from_stream(reader: Reader) -> int:
|
||||||
"""
|
"""
|
||||||
https://en.wikipedia.org/wiki/LEB128
|
https://en.wikipedia.org/wiki/LEB128
|
||||||
"""
|
"""
|
||||||
|
@ -61,7 +60,7 @@ def encode_varint_prefixed(msg_bytes: bytes) -> bytes:
|
||||||
return varint_len + msg_bytes
|
return varint_len + msg_bytes
|
||||||
|
|
||||||
|
|
||||||
async def read_varint_prefixed_bytes(reader: StreamReader) -> bytes:
|
async def read_varint_prefixed_bytes(reader: Reader) -> bytes:
|
||||||
len_msg = await decode_uvarint_from_stream(reader)
|
len_msg = await decode_uvarint_from_stream(reader)
|
||||||
data = await reader.read(len_msg)
|
data = await reader.read(len_msg)
|
||||||
if len(data) != len_msg:
|
if len(data) != len_msg:
|
||||||
|
@ -80,7 +79,7 @@ def encode_delim(msg: bytes) -> bytes:
|
||||||
return encode_varint_prefixed(delimited_msg)
|
return encode_varint_prefixed(delimited_msg)
|
||||||
|
|
||||||
|
|
||||||
async def read_delim(reader: StreamReader) -> bytes:
|
async def read_delim(reader: Reader) -> bytes:
|
||||||
msg_bytes = await read_varint_prefixed_bytes(reader)
|
msg_bytes = await read_varint_prefixed_bytes(reader)
|
||||||
# TODO: Investigate if it is possible to have empty `msg_bytes`
|
# TODO: Investigate if it is possible to have empty `msg_bytes`
|
||||||
if len(msg_bytes) != 0 and msg_bytes[-1:] != b"\n":
|
if len(msg_bytes) != 0 and msg_bytes[-1:] != b"\n":
|
||||||
|
|
Loading…
Reference in New Issue
Block a user