diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 9a0279d..45180a9 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -1,8 +1,9 @@ import logging -from typing import Dict, List +from typing import Dict, List, Optional from async_service import Service from multiaddr import Multiaddr +import trio from libp2p.io.abc import ReadWriteCloser from libp2p.network.connection.net_connection_interface import INetConn @@ -49,6 +50,8 @@ class Swarm(Service, INetworkService): connections: Dict[ID, INetConn] listeners: Dict[str, IListener] common_stream_handler: StreamHandlerFn + listener_nursery: Optional[trio.Nursery] + event_listener_nursery_created: trio.Event notifees: List[INotifee] @@ -72,8 +75,21 @@ class Swarm(Service, INetworkService): # Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427 self.common_stream_handler = create_default_stream_handler(self) # type: ignore + self.listener_nursery = None + self.event_listener_nursery_created = trio.Event() + async def run(self) -> None: - await self.manager.wait_finished() + async with trio.open_nursery() as nursery: + # Create a nursery for listener tasks. + self.listener_nursery = nursery + self.event_listener_nursery_created.set() + try: + await self.manager.wait_finished() + finally: + # The service ended. Cancel listener tasks. + nursery.cancel_scope.cancel() + # Indicate that the nursery has been cancelled. + self.listener_nursery = None def get_peer_id(self) -> ID: return self.self_id @@ -207,6 +223,9 @@ class Swarm(Service, INetworkService): - Call listener listen with the multiaddr - Map multiaddr to listener """ + # We need to wait until `self.listener_nursery` is created. + await self.event_listener_nursery_created.wait() + for maddr in multiaddrs: if str(maddr) in self.listeners: return True @@ -250,7 +269,9 @@ class Swarm(Service, INetworkService): self.listeners[str(maddr)] = listener # TODO: `listener.listen` is not bounded with nursery. If we want to be # I/O agnostic, we should change the API. - await listener.listen(maddr, self.manager._task_nursery) # type: ignore + if self.listener_nursery is None: + raise SwarmException("swarm instance hasn't been run") + await listener.listen(maddr, self.listener_nursery) # Call notifiers since event occurred self.notify_listen(maddr)