Add a nursery in Swarm

To avoid using the one in `Service`
This commit is contained in:
mhchia 2020-02-04 20:45:58 +08:00
parent b007bb4d07
commit a7ba59bf9f
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A

View File

@ -1,8 +1,9 @@
import logging
from typing import Dict, List
from typing import Dict, List, Optional
from async_service import Service
from multiaddr import Multiaddr
import trio
from libp2p.io.abc import ReadWriteCloser
from libp2p.network.connection.net_connection_interface import INetConn
@ -49,6 +50,8 @@ class Swarm(Service, INetworkService):
connections: Dict[ID, INetConn]
listeners: Dict[str, IListener]
common_stream_handler: StreamHandlerFn
listener_nursery: Optional[trio.Nursery]
event_listener_nursery_created: trio.Event
notifees: List[INotifee]
@ -72,8 +75,21 @@ class Swarm(Service, INetworkService):
# Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427
self.common_stream_handler = create_default_stream_handler(self) # type: ignore
self.listener_nursery = None
self.event_listener_nursery_created = trio.Event()
async def run(self) -> None:
await self.manager.wait_finished()
async with trio.open_nursery() as nursery:
# Create a nursery for listener tasks.
self.listener_nursery = nursery
self.event_listener_nursery_created.set()
try:
await self.manager.wait_finished()
finally:
# The service ended. Cancel listener tasks.
nursery.cancel_scope.cancel()
# Indicate that the nursery has been cancelled.
self.listener_nursery = None
def get_peer_id(self) -> ID:
return self.self_id
@ -207,6 +223,9 @@ class Swarm(Service, INetworkService):
- Call listener listen with the multiaddr
- Map multiaddr to listener
"""
# We need to wait until `self.listener_nursery` is created.
await self.event_listener_nursery_created.wait()
for maddr in multiaddrs:
if str(maddr) in self.listeners:
return True
@ -250,7 +269,9 @@ class Swarm(Service, INetworkService):
self.listeners[str(maddr)] = listener
# TODO: `listener.listen` is not bounded with nursery. If we want to be
# I/O agnostic, we should change the API.
await listener.listen(maddr, self.manager._task_nursery) # type: ignore
if self.listener_nursery is None:
raise SwarmException("swarm instance hasn't been run")
await listener.listen(maddr, self.listener_nursery)
# Call notifiers since event occurred
self.notify_listen(maddr)