diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index b2e6f38..360143a 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -184,43 +184,29 @@ class GossipSub(IPubsubRouter): # Create mesh[topic] if it does not yet exist self.mesh[topic] = [] - if topic in self.fanout and len(self.fanout[topic]) == self.degree: - # If router already has D peers from the fanout peers of a topic - # TODO: Do we remove all peers from fanout[topic]? + topic_in_fanout = topic in self.fanout + fanout_peers = self.fanout[topic] if topic_in_fanout else [] + fanout_size = len(fanout_peers) + if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): + # There are less than D peers (let this number be x) + # in the fanout for a topic (or the topic is not in the fanout). + # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + selected_peers = \ + GossipSub.select_from_minus(self.degree - fanout_size, + gossipsub_peers_in_topic, + fanout_peers) - # Add them to mesh[topic], and notifies them with a - # GRAFT(topic) control message. - for peer in self.fanout[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) - else: - # Otherwise, if there are less than D peers - # (let this number be x) in the fanout for a topic (or the topic is not in the fanout), - fanout_size = 0 - if topic in self.fanout: - fanout_size = len(self.fanout[topic]) - # then it still adds them as above (if there are any) - for peer in self.fanout[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + # Combine fanout peers with selected peers + fanout_peers += selected_peers - if topic in self.peers_gossipsub: - # TODO: Should we have self.fanout[topic] here or [] (as the minus variable)? - # Selects the remaining number of peers (D-x) from peers.gossipsub[topic] - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - selected_peers = \ - GossipSub.select_from_minus(self.degree - fanout_size, - gossipsub_peers_in_topic, - self.fanout[topic] if topic in self.fanout else []) + # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. + for peer in fanout_peers: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) - # And likewise adds them to mesh[topic] and notifies them with a - # GRAFT(topic) control message. - for peer in selected_peers: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) - - # TODO: Do we remove all peers from fanout[topic]? + # TODO: Do we remove all peers from fanout[topic]? async def leave(self, topic): # Note: the comments here are the near-exact algorithm description from the spec