diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index e4a54cf..99ee832 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -1,17 +1,17 @@ from typing import ( Any, + Awaitable, Callable, - Coroutine, List, Sequence, ) import multiaddr -from libp2p.network.swarm import Swarm +from libp2p.network.network_interface import INetwork from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo -from libp2p.peer.peerstore import PeerStore +from libp2p.peer.peerstore_interface import IPeerStore from libp2p.network.stream.net_stream_interface import INetStream from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter @@ -24,17 +24,17 @@ from .host_interface import IHost # telling it to listen on the given listen addresses. -StreamHandlerFn = Callable[[INetStream], Coroutine[Any, Any, None]] +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class BasicHost(IHost): - _network: Swarm + _network: INetwork router: KadmeliaPeerRouter - peerstore: PeerStore + peerstore: IPeerStore # default options constructor - def __init__(self, network: Swarm, router: KadmeliaPeerRouter = None) -> None: + def __init__(self, network: INetwork, router: KadmeliaPeerRouter = None) -> None: self._network = network self._router = router self.peerstore = self._network.peerstore @@ -45,13 +45,13 @@ class BasicHost(IHost): """ return self._network.get_peer_id() - def get_network(self) -> Swarm: + def get_network(self) -> INetwork: """ :return: network instance of host """ return self._network - def get_peerstore(self) -> PeerStore: + def get_peerstore(self) -> IPeerStore: """ :return: peerstore of the host (same one as in its network instance) """ diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index bf4fe5f..1607d0f 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -1,22 +1,22 @@ from abc import ABC, abstractmethod from typing import ( Any, + Awaitable, Callable, - Coroutine, List, Sequence, ) import multiaddr -from libp2p.network.swarm import Swarm +from libp2p.network.network_interface import INetwork from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo from libp2p.network.stream.net_stream_interface import INetStream -StreamHandlerFn = Callable[[INetStream], Coroutine[Any, Any, None]] +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class IHost(ABC): @@ -28,7 +28,7 @@ class IHost(ABC): """ @abstractmethod - def get_network(self) -> Swarm: + def get_network(self) -> INetwork: """ :return: network instance of host """ @@ -58,9 +58,9 @@ class IHost(ABC): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on @abstractmethod - def new_stream(self, + async def new_stream(self, peer_id: ID, - protocol_ids: Sequence[str]) -> Coroutine[Any, Any, INetStream]: + protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_ids: protocol ids that stream can run on @@ -68,7 +68,7 @@ class IHost(ABC): """ @abstractmethod - def connect(self, peer_info: PeerInfo) -> Coroutine[Any, Any, None]: + async def connect(self, peer_info: PeerInfo) -> None: """ connect ensures there is a connection between this host and the peer with given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 0a4a8eb..fc1694a 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -4,8 +4,9 @@ from abc import ( ) from typing import ( Any, + Awaitable, Callable, - Coroutine, + Dict, Sequence, TYPE_CHECKING, ) @@ -13,19 +14,25 @@ from typing import ( from multiaddr import Multiaddr from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStore from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.transport.listener_interface import IListener -from .stream.net_stream import NetStream +from .stream.net_stream_interface import INetStream if TYPE_CHECKING: from .notifee_interface import INotifee -StreamHandlerFn = Callable[[NetStream], Coroutine[Any, Any, None]] +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class INetwork(ABC): + peerstore: PeerStore + connections: Dict[ID, IMuxedConn] + listeners: Dict[str, IListener] + @abstractmethod def get_peer_id(self) -> ID: """ @@ -33,7 +40,7 @@ class INetwork(ABC): """ @abstractmethod - def dial_peer(self, peer_id: ID) -> Coroutine[Any, Any, IMuxedConn]: + async def dial_peer(self, peer_id: ID) -> IMuxedConn: """ dial_peer try to create a connection to peer_id @@ -51,9 +58,9 @@ class INetwork(ABC): """ @abstractmethod - def new_stream(self, + async def new_stream(self, peer_id: ID, - protocol_ids: Sequence[str]) -> Coroutine[Any, Any, NetStream]: + protocol_ids: Sequence[str]) -> INetStream: """ :param peer_id: peer_id of destination :param protocol_ids: available protocol ids to use for stream @@ -61,7 +68,7 @@ class INetwork(ABC): """ @abstractmethod - def listen(self, *args: Multiaddr) -> Coroutine[Any, Any, bool]: + async def listen(self, *args: Multiaddr) -> bool: """ :param *args: one or many multiaddrs to start listening on :return: True if at least one success diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 237ad73..8ebe791 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,16 +1,16 @@ -from libp2p.stream_muxer.mplex.mplex import Mplex -from libp2p.stream_muxer.mplex.mplex_stream import MplexStream +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream from .net_stream_interface import INetStream class NetStream(INetStream): - muxed_stream: MplexStream - mplex_conn: Mplex + muxed_stream: IMuxedStream + mplex_conn: IMuxedConn protocol_id: str - def __init__(self, muxed_stream: MplexStream) -> None: + def __init__(self, muxed_stream: IMuxedStream) -> None: self.muxed_stream = muxed_stream self.mplex_conn = muxed_stream.mplex_conn self.protocol_id = None diff --git a/libp2p/network/stream/net_stream_interface.py b/libp2p/network/stream/net_stream_interface.py index 30d7ac7..31c0080 100644 --- a/libp2p/network/stream/net_stream_interface.py +++ b/libp2p/network/stream/net_stream_interface.py @@ -1,15 +1,14 @@ from abc import ABC, abstractmethod from typing import ( Any, - Coroutine, ) -from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn class INetStream(ABC): - mplex_conn: Mplex + mplex_conn: IMuxedConn @abstractmethod def get_protocol(self) -> str: @@ -25,21 +24,21 @@ class INetStream(ABC): """ @abstractmethod - def read(self) -> Coroutine[Any, Any, bytes]: + async def read(self) -> bytes: """ reads from the underlying muxed_stream :return: bytes of input """ @abstractmethod - def write(self, data: bytes) -> Coroutine[Any, Any, int]: + async def write(self, data: bytes) -> int: """ write to the underlying muxed_stream :return: number of bytes written """ @abstractmethod - def close(self) -> Coroutine[Any, Any, bool]: + async def close(self) -> bool: """ close the underlying muxed stream :return: true if successful diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 68b1892..0038360 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -1,8 +1,8 @@ import asyncio from typing import ( Any, + Awaitable, Callable, - Coroutine, Dict, List, Sequence, @@ -18,10 +18,10 @@ from libp2p.peer.peerstore import PeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.routing.interfaces import IPeerRouting +from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from libp2p.transport.upgrader import TransportUpgrader from libp2p.transport.transport_interface import ITransport from libp2p.transport.listener_interface import IListener -from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from libp2p.stream_muxer.mplex.mplex_stream import MplexStream @@ -29,9 +29,10 @@ from .network_interface import INetwork from .notifee_interface import INotifee from .connection.raw_connection import RawConnection from .stream.net_stream import NetStream +from .stream.net_stream_interface import INetStream -StreamHandlerFn = Callable[[NetStream], Coroutine[Any, Any, None]] +StreamHandlerFn = Callable[[INetStream], Awaitable[None]] class Swarm(INetwork): @@ -44,7 +45,7 @@ class Swarm(INetwork): router: IPeerRouting connections: Dict[ID, IMuxedConn] listeners: Dict[str, IListener] - stream_handlers: Dict[NetStream, Callable[[NetStream], None]] + stream_handlers: Dict[INetStream, Callable[[INetStream], None]] multiselect: Multiselect multiselect_client: MultiselectClient @@ -252,7 +253,7 @@ class Swarm(INetwork): # TODO: `disconnect`? -GenericProtocolHandlerFn = Callable[[MplexStream], Coroutine[Any, Any, None]] +GenericProtocolHandlerFn = Callable[[MplexStream], Awaitable[None]] def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: diff --git a/libp2p/peer/peerdata.py b/libp2p/peer/peerdata.py index 696e736..bb4962b 100644 --- a/libp2p/peer/peerdata.py +++ b/libp2p/peer/peerdata.py @@ -39,10 +39,10 @@ class PeerData(IPeerData): def clear_addrs(self) -> None: self.addrs = [] - def put_metadata(self, key: Any, val: Any) -> None: + def put_metadata(self, key: str, val: Any) -> None: self.metadata[key] = val - def get_metadata(self, key: Any) -> Any: + def get_metadata(self, key: str) -> Any: if key in self.metadata: return self.metadata[key] raise PeerDataError("key not found") diff --git a/libp2p/peer/peerdata_interface.py b/libp2p/peer/peerdata_interface.py index e2f150e..cefd56d 100644 --- a/libp2p/peer/peerdata_interface.py +++ b/libp2p/peer/peerdata_interface.py @@ -7,6 +7,8 @@ from typing import ( from multiaddr import Multiaddr +from .peermetadata_interface import IPeerMetadata + class IPeerData(ABC): @@ -47,7 +49,7 @@ class IPeerData(ABC): """ @abstractmethod - def put_metadata(self, key: Any, val: Any) -> None: + def put_metadata(self, key: str, val: Any) -> None: """ :param key: key in KV pair :param val: val to associate with key @@ -55,7 +57,7 @@ class IPeerData(ABC): """ @abstractmethod - def get_metadata(self, key: Any) -> Any: + def get_metadata(self, key: str) -> IPeerMetadata: """ :param key: key in KV pair :return: val for key diff --git a/libp2p/peer/peermetadata_interface.py b/libp2p/peer/peermetadata_interface.py index 6fcf8de..3d60259 100644 --- a/libp2p/peer/peermetadata_interface.py +++ b/libp2p/peer/peermetadata_interface.py @@ -14,7 +14,7 @@ class IPeerMetadata(ABC): pass @abstractmethod - def get(self, peer_id: ID, key: Any) -> Any: + def get(self, peer_id: ID, key: str) -> Any: """ :param peer_id: peer ID to lookup key for :param key: key to look up @@ -23,7 +23,7 @@ class IPeerMetadata(ABC): """ @abstractmethod - def put(self, peer_id: ID, key: Any, val: Any) -> None: + def put(self, peer_id: ID, key: str, val: Any) -> None: """ :param peer_id: peer ID to lookup key for :param key: key to associate with peer diff --git a/libp2p/peer/peerstore.py b/libp2p/peer/peerstore.py index 3a59495..2cd1574 100644 --- a/libp2p/peer/peerstore.py +++ b/libp2p/peer/peerstore.py @@ -58,13 +58,13 @@ class PeerStore(IPeerStore): def peer_ids(self) -> List[ID]: return list(self.peer_map.keys()) - def get(self, peer_id: ID, key: Any) -> Any: + def get(self, peer_id: ID, key: str) -> Any: if peer_id in self.peer_map: val = self.peer_map[peer_id].get_metadata(key) return val raise PeerStoreError("peer ID not found") - def put(self, peer_id: ID, key: Any, val: Any) -> None: + def put(self, peer_id: ID, key: str, val: Any) -> None: # <> # This can output an error, not sure what the possible errors are peer = self.__create_or_get_peer(peer_id) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5e86015..ddfa59c 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -306,8 +306,7 @@ class Pubsub: # Tell router we are leaving this topic await self.router.leave(topic_id) - # FIXME: `raw_msg` can be further type hinted with mypy_protobuf - async def message_all_peers(self, raw_msg: Any) -> None: + async def message_all_peers(self, raw_msg: bytes) -> None: """ Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index de2ec7a..e656215 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -1,7 +1,6 @@ from abc import ABC, abstractmethod from typing import ( Any, - Coroutine, Iterable, ) @@ -31,7 +30,7 @@ class IContentRouting(ABC): class IPeerRouting(ABC): @abstractmethod - def find_peer(self, peer_id: ID) -> Coroutine[Any, Any, PeerInfo]: + async def find_peer(self, peer_id: ID) -> PeerInfo: """ Find specific Peer FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py index 5e426fe..e50ec84 100644 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -9,6 +9,7 @@ from libp2p.kademlia.kad_peerinfo import ( ) from libp2p.kademlia.network import KademliaServer from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo from libp2p.routing.interfaces import IPeerRouting diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 3fd6961..08b30d5 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -1,7 +1,8 @@ import asyncio +from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream + from .utils import get_flag -from ..muxed_stream_interface import IMuxedStream class MplexStream(IMuxedStream): diff --git a/libp2p/stream_muxer/muxed_stream_interface.py b/libp2p/stream_muxer/muxed_stream_interface.py index b15dba1..b9034ce 100644 --- a/libp2p/stream_muxer/muxed_stream_interface.py +++ b/libp2p/stream_muxer/muxed_stream_interface.py @@ -1,8 +1,12 @@ from abc import ABC, abstractmethod +from libp2p.stream_muxer.mplex.mplex import Mplex + class IMuxedStream(ABC): + mplex_conn: Mplex + @abstractmethod def read(self): """