diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 60e77f0..162ce30 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -102,8 +102,8 @@ class FloodSub(IPubsubRouter): origin: ID) -> Iterable[ID]: """ Get the eligible peers to send the data to. - :param src: peer ID of the peer who forwards the message to us - :param origin: peer id of the peer the message originate from + :param src: peer ID of the peer who forwards the message to us. + :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ for topic in topic_ids: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index d394e95..20e21a9 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -149,21 +149,21 @@ class GossipSub(IPubsubRouter): """ Get the eligible peers to send the data to. :param src: the peer id of the peer who forwards the message to me. - :param origin: the peer id of the peer who originally broadcast the message. + :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ - to_send: MutableSet[ID] = set() + send_to: MutableSet[ID] = set() for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue # floodsub peers for peer_id_str in self.pubsub.peer_topics[topic]: - peer_id = id_b58_decode(peer_id_str) # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. # This will improve the efficiency when searching for a peer's protocol id. if peer_id_str in self.peers_floodsub: - to_send.add(peer_id) + peer_id = id_b58_decode(peer_id_str) + send_to.add(peer_id) # gossipsub peers # FIXME: Change `str` to `ID` @@ -180,12 +180,16 @@ class GossipSub(IPubsubRouter): # pylint: disable=len-as-condition if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. - self.fanout[topic] = self._get_peers_from_minus(topic, self.degree, []) + self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree, + [], + ) gossipsub_peers = self.fanout[topic] for peer_id_str in gossipsub_peers: - to_send.add(id_b58_decode(peer_id_str)) + send_to.add(id_b58_decode(peer_id_str)) # Excludes `src` and `origin` - yield from to_send.difference([src, origin]) + yield from send_to.difference([src, origin]) async def join(self, topic): # Note: the comments here are the near-exact algorithm description from the spec @@ -208,7 +212,7 @@ class GossipSub(IPubsubRouter): # in the fanout for a topic (or the topic is not in the fanout). # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. if topic in self.pubsub.peer_topics: - selected_peers = self._get_peers_from_minus( + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( topic, self.degree - fanout_size, fanout_peers, @@ -286,13 +290,10 @@ class GossipSub(IPubsubRouter): num_mesh_peers_in_topic = len(self.mesh[topic]) if num_mesh_peers_in_topic < self.degree_low: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] - selected_peers = GossipSub.select_from_minus( + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - num_mesh_peers_in_topic, - gossipsub_peers_in_topic, self.mesh[topic], ) @@ -334,12 +335,11 @@ class GossipSub(IPubsubRouter): # If |fanout[topic]| < D if num_fanout_peers_in_topic < self.degree: # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - selected_peers = \ - GossipSub.select_from_minus(self.degree - num_fanout_peers_in_topic, - gossipsub_peers_in_topic, self.fanout[topic]) - + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree - num_fanout_peers_in_topic, + self.fanout[topic], + ) # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) @@ -351,12 +351,12 @@ class GossipSub(IPubsubRouter): # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in a topic and only add them if they are gossipsub peers too if topic in self.pubsub.peer_topics: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = \ - GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree, + [], + ) for peer in peers_to_emit_ihave_to: # TODO: this line is a monster, can hopefully be simplified @@ -365,6 +365,7 @@ class GossipSub(IPubsubRouter): msg_ids = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_ids, peer) + # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh for topic in self.fanout: if topic not in self.mesh: @@ -373,12 +374,12 @@ class GossipSub(IPubsubRouter): # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in topic and only add if they are gossipsub peers also if topic in self.pubsub.peer_topics: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = \ - GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree, + [], + ) for peer in peers_to_emit_ihave_to: if peer not in self.mesh[topic] and peer not in self.fanout[topic]: @@ -414,7 +415,7 @@ class GossipSub(IPubsubRouter): return selection - def _get_peers_from_minus( + def _get_in_topic_gossipsub_peers_from_minus( self, topic: str, num_to_select: int,