diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7a453e7..54de625 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -248,16 +248,17 @@ class Pubsub: """ peer_id = stream.muxed_conn.peer_id - # Error handling pattern reference: - # https://github.com/libp2p/go-libp2p-pubsub/blob/534fe2f382d8dd75dab89ddb0760542546c9f24e/comm.go#L38-L46 # noqa: E501 try: await self.continuously_read_stream(stream) - except StreamEOF as error: - await stream.close() - logger.debug("fail to read from peer %s, error=%s", peer_id, error) - except (ParseError, IncompleteReadError, StreamReset) as error: + except (StreamEOF, StreamReset, ParseError, IncompleteReadError) as error: + logger.debug( + "fail to read from peer %s, error=%s," + "closing the stream and remove the peer from record", + peer_id, + error, + ) await stream.reset() - logger.debug("read corrupted data from peer %s, error=%s", peer_id, error) + self._handle_dead_peer(peer_id) async def _handle_new_peer(self, peer_id: ID) -> None: try: @@ -282,6 +283,8 @@ class Pubsub: logger.debug("added new peer %s", peer_id) def _handle_dead_peer(self, peer_id: ID) -> None: + if peer_id not in self.peers: + return del self.peers[peer_id] for topic in self.peer_topics: