diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7be2d28..5c71757 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -60,6 +60,7 @@ class Pubsub: router: "IPubsubRouter" peer_queue: "asyncio.Queue[ID]" + dead_peer_queue: "asyncio.Queue[ID]" protocols: List[TProtocol] @@ -100,7 +101,10 @@ class Pubsub: # Register a notifee self.peer_queue = asyncio.Queue() - self.host.get_network().register_notifee(PubsubNotifee(self.peer_queue)) + self.dead_peer_queue = asyncio.Queue() + self.host.get_network().register_notifee( + PubsubNotifee(self.peer_queue, self.dead_peer_queue) + ) # Register stream handlers for each pubsub router protocol to handle # the pubsub streams opened on those protocols diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 19be612..2553124 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -15,13 +15,21 @@ if TYPE_CHECKING: class PubsubNotifee(INotifee): initiator_peers_queue: "asyncio.Queue[ID]" + dead_peers_queue: "asyncio.Queue[ID]" - def __init__(self, initiator_peers_queue: "asyncio.Queue[ID]") -> None: + def __init__( + self, + initiator_peers_queue: "asyncio.Queue[ID]", + dead_peers_queue: "asyncio.Queue[ID]", + ) -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them + :param dead_peers_queue: queue to add dead peers to so that pubsub + can process dead peers after we disconnect from each other """ self.initiator_peers_queue = initiator_peers_queue + self.dead_peers_queue = dead_peers_queue async def opened_stream(self, network: INetwork, stream: INetStream) -> None: pass