diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5c71757..3aeb10a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -281,6 +281,17 @@ class Pubsub: logger.debug("added new peer %s", peer_id) + async def _handle_dead_peer(self, peer_id: ID) -> None: + del self.peers[peer_id] + + for topic in self.peer_topics: + if peer_id in self.peer_topics[topic]: + self.peer_topics[topic].remove(peer_id) + + self.router.remove_peer(peer_id) + + logger.debug("removed dead peer %s", peer_id) + async def handle_peer_queue(self) -> None: """ Continuously read from peer queue and each time a new peer is found, @@ -298,6 +309,21 @@ class Pubsub: # Force context switch await asyncio.sleep(0) + async def handle_dead_peer_queue(self) -> None: + """ + Continuously read from dead peer queue and close the stream between that peer and + remove peer info from pubsub and pubsub router. + """ + while True: + + peer_id: ID = await self.dead_peer_queue.get() + + # Remove Peer + asyncio.ensure_future(self._handle_dead_peer(peer_id)) + + # Force context switch + await asyncio.sleep(0) + def handle_subscription( self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts ) -> None: