Fix lint and add check in fanout heartbeat

This commit is contained in:
NIC619 2019-12-02 22:49:27 +08:00
parent a7e0c5d737
commit 920cf646ef
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17
3 changed files with 19 additions and 10 deletions

View File

@ -55,7 +55,7 @@ class GossipSub(IPubsubRouter):
time_to_live: int,
gossip_window: int = 3,
gossip_history: int = 5,
heartbeat_initial_delay: int = 0.1,
heartbeat_initial_delay: float = 0.1,
heartbeat_interval: int = 120,
) -> None:
self.protocols = list(protocols)
@ -129,7 +129,9 @@ class GossipSub(IPubsubRouter):
# instance in multistream-select, but it is not the protocol that gossipsub supports.
# In this case, probably we registered gossipsub to a wrong `protocol_id`
# in multistream-select, or wrong versions.
raise Exception(f"This should not happen. Protocol={protocol_id} is not supported.")
raise Exception(
f"This should not happen. Protocol={protocol_id} is not supported."
)
self.peers_to_protocol[peer_id] = protocol_id
def remove_peer(self, peer_id: ID) -> None:
@ -357,13 +359,22 @@ class GossipSub(IPubsubRouter):
async def fanout_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec
for topic in self.fanout:
# If time since last published > ttl
# Delete topic entry if it's not in `pubsub.peer_topics`
# or if it's time-since-last-published > ttl
# TODO: there's no way time_since_last_publish gets set anywhere yet
if self.time_since_last_publish[topic] > self.time_to_live:
if (
topic not in self.pubsub.peer_topics
or self.time_since_last_publish[topic] > self.time_to_live
):
# Remove topic from fanout
del self.fanout[topic]
del self.time_since_last_publish[topic]
else:
# Check whether our peers are still in the topic
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
for peer in self.fanout[topic]:
if peer not in self.pubsub.peer_topics[topic]:
self.fanout[topic].remove(peer)
num_fanout_peers_in_topic = len(self.fanout[topic])
# If |fanout[topic]| < D

View File

@ -327,11 +327,9 @@ class Pubsub:
logger.debug("removed dead peer %s", peer_id)
async def handle_peer_queue(self) -> None:
"""
Continuously read from peer queue and each time a new peer is found,
open a stream to the peer using a supported pubsub protocol
pubsub protocols we support
"""
"""Continuously read from peer queue and each time a new peer is found,
open a stream to the peer using a supported pubsub protocol pubsub
protocols we support."""
while True:
peer_id: ID = await self.peer_queue.get()
# Add Peer

View File

@ -24,7 +24,7 @@ class GossipsubParams(NamedTuple):
time_to_live: int = 30
gossip_window: int = 3
gossip_history: int = 5
heartbeat_initial_delay: int = 0.1
heartbeat_initial_delay: float = 0.1
heartbeat_interval: float = 0.5