diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 839d3f7..93faebd 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -193,8 +193,7 @@ class GossipSub(IPubsubRouter): await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) except StreamClosed: logger.debug("Fail to publish message to %s: stream closed", peer_id) - # TODO: also remove peer info from pubsub - self.remove_peer(peer_id) + self.pubsub._handle_dead_peer(peer_id) def _get_peers_to_send( self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID @@ -521,7 +520,11 @@ class GossipSub(IPubsubRouter): try: await peer_stream.write(encode_varint_prefixed(rpc_msg)) except StreamClosed: - logger.debug("Fail to responed to iwant request from %s: stream closed", sender_peer_id) + logger.debug( + "Fail to responed to iwant request from %s: stream closed", + sender_peer_id, + ) + self.pubsub._handle_dead_peer(sender_peer_id) async def handle_graft( self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID @@ -609,3 +612,4 @@ class GossipSub(IPubsubRouter): await peer_stream.write(encode_varint_prefixed(rpc_msg)) except StreamClosed: logger.debug("Fail to emit control message to %s: stream closed", to_peer) + self.pubsub._handle_dead_peer(to_peer) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 8cc6539..3834eb4 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -440,8 +440,7 @@ class Pubsub: except StreamClosed: peer_id = stream.muxed_conn.peer_id logger.debug("Fail to message peer %s: stream closed", peer_id) - del self.peers[peer_id] - self.router.remove_peer(peer_id) + self._handle_dead_peer(peer_id) async def publish(self, topic_id: str, data: bytes) -> None: """