py-libp2p/libp2p/stream_muxer/abc.py

137 lines
3.6 KiB
Python
Raw Normal View History

2018-11-01 05:31:00 +08:00
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
2019-08-05 11:22:44 +08:00
from multiaddr import Multiaddr
2018-11-01 05:31:00 +08:00
from libp2p.peer.id import ID
from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.stream_muxer.mplex.constants import HeaderTags
2019-08-05 10:35:56 +08:00
if TYPE_CHECKING:
# Prevent GenericProtocolHandlerFn introducing circular dependencies
from libp2p.network.typing import GenericProtocolHandlerFn # noqa: F401
2019-08-05 10:35:56 +08:00
2018-11-12 07:03:04 +08:00
2018-11-01 05:31:00 +08:00
class IMuxedConn(ABC):
"""
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
"""
peer_id: ID
2019-07-28 14:30:15 +08:00
@abstractmethod
2019-08-02 18:03:34 +08:00
def __init__(
self,
conn: ISecureConn,
generic_protocol_handler: "GenericProtocolHandlerFn",
peer_id: ID,
2019-08-02 18:03:34 +08:00
) -> None:
"""
create a new muxed connection
:param conn: an instance of secured connection
:param generic_protocol_handler: generic protocol handler
for new muxed streams
[WIP] PubSub and FloodSub development (#133) * Add notifee interface * Add notify function to network interface * Implement notify feature * Add tests for notify * Make notifee functions all async * Fix linting issue * Fix linting issue * Scaffold pubsub router interface * Scaffold pubsub directory * Store peer_id in muxed connection * Implement pubsub notifee * Remove outdated files * Implement pubsub first attempt * Prepare pubsub for floodsub * Add mplex conn to net stream and add conn in notify tests * Implement floodsub * Use NetStream in generic protocol handler * Debugging async issues * Modify test to perform proper assert. Test passes * Remove callbacks. Reduce sleep time * Add simple three node test * Clean up code. Add message classes * Add test for two topics * Add conn to net stream and conn tests * Refactor test setup to remove duplicate code * Fix linting issues * Fix linting issue * Fix linting issue * Fix outstanding unrelated lint issue in multiselect_client * Add connect function * Remove debug prints * Remove debug prints from floodsub * Use MessageTalk in place of direct message breakdown * Remove extra prints * Remove outdated function * Add message to queues for all topics in message * Debugging * Add message self delivery * Increase read timeout to 5 to get pubsub tests passing * Refactor testing helper func. Add tests * Add tests and increase timeout to get tests passing * Add dummy account demo scaffolding * Attempt to use threads. Test fails * Implement basic dummy node tests using threads * Add generic testing function * Add simple seven node tree test * Add more complex seven node tree tests * Add five node ring tests * Remove unnecessary get_message_type func * Add documentation to classes * Add message id to messages * Add documentation to test helper func * Add docs to dummy account node helper func * Add more docs to dummy account node test helper func * fixed linting errors in floodsub * small notify bugfix * move pubsub into libp2p * fixed pubsub linting * fixing pubsub test failures * linting
2019-03-24 01:52:02 +08:00
:param peer_id: peer_id of peer the connection is to
"""
@property
@abstractmethod
def initiator(self) -> bool:
pass
2018-11-12 07:03:04 +08:00
@abstractmethod
async def close(self) -> None:
2018-11-12 07:03:04 +08:00
"""
close connection
"""
2018-11-01 05:31:00 +08:00
@abstractmethod
2019-08-02 18:03:34 +08:00
def is_closed(self) -> bool:
2018-11-01 05:31:00 +08:00
"""
check connection is fully closed
:return: true if successful
"""
2019-08-02 18:28:04 +08:00
@abstractmethod
async def read_buffer(self, stream_id: int) -> bytes:
"""
Read a message from stream_id's buffer, check raw connection for new messages
:param stream_id: stream id of stream to read from
:return: message read
"""
@abstractmethod
async def read_buffer_nonblocking(self, stream_id: int) -> Optional[bytes]:
"""
Read a message from `stream_id`'s buffer, non-blockingly.
"""
# FIXME: Remove multiaddr from being passed into muxed_conn
2018-11-01 05:31:00 +08:00
@abstractmethod
async def open_stream(
self, protocol_id: str, multi_addr: Multiaddr
) -> "IMuxedStream":
2018-11-01 05:31:00 +08:00
"""
creates a new muxed_stream
2018-11-13 02:02:49 +08:00
:param protocol_id: protocol_id of stream
:param multi_addr: multi_addr that stream connects to
2018-11-01 05:31:00 +08:00
:return: a new stream
"""
@abstractmethod
2019-08-02 18:03:34 +08:00
async def accept_stream(self) -> None:
2018-11-01 05:31:00 +08:00
"""
accepts a muxed stream opened by the other end
"""
2019-08-02 18:28:04 +08:00
@abstractmethod
async def send_message(self, flag: HeaderTags, data: bytes, stream_id: int) -> int:
"""
sends a message over the connection
:param header: header to use
:param data: data to send in the message
:param stream_id: stream the message is in
"""
class IMuxedStream(ABC):
mplex_conn: IMuxedConn
@abstractmethod
2019-08-07 15:23:20 +08:00
async def read(self, n: int = -1) -> bytes:
"""
reads from the underlying muxed_conn
2019-08-07 15:23:20 +08:00
:param n: number of bytes to read
:return: bytes of input
"""
@abstractmethod
async def write(self, data: bytes) -> int:
"""
writes to the underlying muxed_conn
:return: number of bytes written
"""
@abstractmethod
async def close(self) -> bool:
"""
close the underlying muxed_conn
:return: true if successful
"""
@abstractmethod
async def reset(self) -> bool:
"""
closes both ends of the stream
tells this remote side to hang up
:return: true if successful
"""
@abstractmethod
2019-08-05 17:02:18 +08:00
def set_deadline(self, ttl: int) -> bool:
"""
set deadline for muxed stream
:return: a new stream
"""