From 92ea35e147b825a4880785a87b5ee7d34567996b Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 27 Jan 2020 00:10:33 +0800 Subject: [PATCH] Fix `IPubsub` and add `IPubsub.wait_until_ready` --- libp2p/pubsub/abc.py | 66 +++++++++++++++++++++++++++++++++------ libp2p/pubsub/pubsub.py | 30 ++++++------------ libp2p/pubsub/typing.py | 9 ++++++ libp2p/tools/factories.py | 3 +- 4 files changed, 76 insertions(+), 32 deletions(-) create mode 100644 libp2p/pubsub/typing.py diff --git a/libp2p/pubsub/abc.py b/libp2p/pubsub/abc.py index 19f9b2a..e4b7584 100644 --- a/libp2p/pubsub/abc.py +++ b/libp2p/pubsub/abc.py @@ -1,18 +1,35 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, AsyncContextManager, AsyncIterable, List +from typing import ( + TYPE_CHECKING, + AsyncContextManager, + AsyncIterable, + KeysView, + List, + Tuple, +) + +from async_service import ServiceAPI from libp2p.peer.id import ID from libp2p.typing import TProtocol from .pb import rpc_pb2 +from .typing import ValidatorFn if TYPE_CHECKING: from .pubsub import Pubsub # noqa: F401 -# TODO: Add interface for Pubsub -class IPubsub(ABC): - pass +class ISubscriptionAPI( + AsyncContextManager["ISubscriptionAPI"], AsyncIterable[rpc_pb2.Message] +): + @abstractmethod + async def cancel(self) -> None: + ... + + @abstractmethod + async def get(self) -> rpc_pb2.Message: + ... class IPubsubRouter(ABC): @@ -86,13 +103,44 @@ class IPubsubRouter(ABC): """ -class ISubscriptionAPI( - AsyncContextManager["ISubscriptionAPI"], AsyncIterable[rpc_pb2.Message] -): +class IPubsub(ServiceAPI): + @property @abstractmethod - async def cancel(self) -> None: + def my_id(self) -> ID: + ... + + @property + @abstractmethod + def protocols(self) -> Tuple[TProtocol, ...]: + ... + + @property + @abstractmethod + def topic_ids(self) -> KeysView[str]: ... @abstractmethod - async def get(self) -> rpc_pb2.Message: + def set_topic_validator( + self, topic: str, validator: ValidatorFn, is_async_validator: bool + ) -> None: + ... + + @abstractmethod + def remove_topic_validator(self, topic: str) -> None: + ... + + @abstractmethod + async def wait_until_ready(self) -> None: + ... + + @abstractmethod + async def subscribe(self, topic_id: str) -> ISubscriptionAPI: + ... + + @abstractmethod + async def unsubscribe(self, topic_id: str) -> None: + ... + + @abstractmethod + async def publish(self, topic_id: str, data: bytes) -> None: ... diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 379361d..94c33b4 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,19 +1,7 @@ import logging import math import time -from typing import ( - TYPE_CHECKING, - Awaitable, - Callable, - Dict, - KeysView, - List, - NamedTuple, - Set, - Tuple, - Union, - cast, -) +from typing import TYPE_CHECKING, Dict, KeysView, List, NamedTuple, Set, Tuple, cast from async_service import Service import base58 @@ -35,6 +23,7 @@ from .abc import IPubsub, ISubscriptionAPI from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee from .subscription import TrioSubscriptionAPI +from .typing import AsyncValidatorFn, SyncValidatorFn, ValidatorFn from .validators import PUBSUB_SIGNING_PREFIX, signature_validator if TYPE_CHECKING: @@ -50,17 +39,12 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: return (msg.seqno, msg.from_id) -SyncValidatorFn = Callable[[ID, rpc_pb2.Message], bool] -AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]] -ValidatorFn = Union[SyncValidatorFn, AsyncValidatorFn] - - class TopicValidator(NamedTuple): validator: ValidatorFn is_async: bool -class Pubsub(IPubsub, Service): +class Pubsub(Service, IPubsub): host: IHost @@ -290,6 +274,10 @@ class Pubsub(IPubsub, Service): await stream.reset() self._handle_dead_peer(peer_id) + async def wait_until_ready(self) -> None: + await self.event_handle_peer_queue_started.wait() + await self.event_handle_dead_peer_queue_started.wait() + async def _handle_new_peer(self, peer_id: ID) -> None: try: stream: INetStream = await self.host.new_stream(peer_id, self.protocols) @@ -332,18 +320,18 @@ class Pubsub(IPubsub, Service): """Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol pubsub protocols we support.""" - self.event_handle_peer_queue_started.set() async with self.peer_receive_channel: + self.event_handle_peer_queue_started.set() async for peer_id in self.peer_receive_channel: # Add Peer self.manager.run_task(self._handle_new_peer, peer_id) async def handle_dead_peer_queue(self) -> None: - self.event_handle_dead_peer_queue_started.set() """Continuously read from dead peer channel and close the stream between that peer and remove peer info from pubsub and pubsub router.""" async with self.dead_peer_receive_channel: + self.event_handle_dead_peer_queue_started.set() async for peer_id in self.dead_peer_receive_channel: # Remove Peer self._handle_dead_peer(peer_id) diff --git a/libp2p/pubsub/typing.py b/libp2p/pubsub/typing.py new file mode 100644 index 0000000..c352d52 --- /dev/null +++ b/libp2p/pubsub/typing.py @@ -0,0 +1,9 @@ +from typing import Awaitable, Callable, Union + +from libp2p.peer.id import ID + +from .pb import rpc_pb2 + +SyncValidatorFn = Callable[[ID, rpc_pb2.Message], bool] +AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]] +ValidatorFn = Union[SyncValidatorFn, AsyncValidatorFn] diff --git a/libp2p/tools/factories.py b/libp2p/tools/factories.py index a20b59f..84e6af3 100644 --- a/libp2p/tools/factories.py +++ b/libp2p/tools/factories.py @@ -245,8 +245,7 @@ class PubsubFactory(factory.Factory): strict_signing=strict_signing, ) async with background_trio_service(pubsub): - await pubsub.event_handle_peer_queue_started.wait() - await pubsub.event_handle_dead_peer_queue_started.wait() + await pubsub.wait_until_ready() yield pubsub @classmethod