From 920cf646efe8cb979b55c96979008a08f368da4e Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 2 Dec 2019 22:49:27 +0800 Subject: [PATCH] Fix lint and add check in fanout heartbeat --- libp2p/pubsub/gossipsub.py | 19 +++++++++++++++---- libp2p/pubsub/pubsub.py | 8 +++----- libp2p/tools/constants.py | 2 +- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3939700..831e6d3 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -55,7 +55,7 @@ class GossipSub(IPubsubRouter): time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, - heartbeat_initial_delay: int = 0.1, + heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120, ) -> None: self.protocols = list(protocols) @@ -129,7 +129,9 @@ class GossipSub(IPubsubRouter): # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. - raise Exception(f"This should not happen. Protocol={protocol_id} is not supported.") + raise Exception( + f"This should not happen. Protocol={protocol_id} is not supported." + ) self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: @@ -357,13 +359,22 @@ class GossipSub(IPubsubRouter): async def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: - # If time since last published > ttl + # Delete topic entry if it's not in `pubsub.peer_topics` + # or if it's time-since-last-published > ttl # TODO: there's no way time_since_last_publish gets set anywhere yet - if self.time_since_last_publish[topic] > self.time_to_live: + if ( + topic not in self.pubsub.peer_topics + or self.time_since_last_publish[topic] > self.time_to_live + ): # Remove topic from fanout del self.fanout[topic] del self.time_since_last_publish[topic] else: + # Check whether our peers are still in the topic + # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 + for peer in self.fanout[topic]: + if peer not in self.pubsub.peer_topics[topic]: + self.fanout[topic].remove(peer) num_fanout_peers_in_topic = len(self.fanout[topic]) # If |fanout[topic]| < D diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a3827f9..45e7ffc 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -327,11 +327,9 @@ class Pubsub: logger.debug("removed dead peer %s", peer_id) async def handle_peer_queue(self) -> None: - """ - Continuously read from peer queue and each time a new peer is found, - open a stream to the peer using a supported pubsub protocol - pubsub protocols we support - """ + """Continuously read from peer queue and each time a new peer is found, + open a stream to the peer using a supported pubsub protocol pubsub + protocols we support.""" while True: peer_id: ID = await self.peer_queue.get() # Add Peer diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 2f132bb..8c22d15 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -24,7 +24,7 @@ class GossipsubParams(NamedTuple): time_to_live: int = 30 gossip_window: int = 3 gossip_history: int = 5 - heartbeat_initial_delay: int = 0.1 + heartbeat_initial_delay: float = 0.1 heartbeat_interval: float = 0.5