diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index a6cee2f..7f80ddd 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -2,7 +2,7 @@ from ast import literal_eval import asyncio import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple +from typing import Any, Dict, Iterable, List, Sequence, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -216,38 +216,40 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ - send_to: Set[ID] = set() for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue # floodsub peers - for peer_id in self.pubsub.peer_topics[topic]: - # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. - # This will improve the efficiency when searching for a peer's protocol id. - if peer_id in self.peers_floodsub: - send_to.add(peer_id) + # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. + # This will improve the efficiency when searching for a peer's protocol id. + floodsub_peers: List[ID] = [ + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if peer_id in self.peers_floodsub + ] # gossipsub peers - in_topic_gossipsub_peers: List[ID] = None + gossipsub_peers: List[ID] = [] if topic in self.mesh: - in_topic_gossipsub_peers = self.mesh[topic] + gossipsub_peers = self.mesh[topic] else: - # It could be the case that we publish to a topic that we have not subscribe - # and the topic is not yet added to our `fanout`. - if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): - # If no peers in fanout, choose some peers from gossipsub peers in topic. - self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] + # When we publish to a topic that we have not subscribe to, we randomly pick + # `self.degree` number of peers who have subscribe to the topic and add them + # as our `fanout` peers. + topic_in_fanout: bool = topic in self.fanout + fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else [] + fanout_size = len(fanout_peers) + if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): + # Combine fanout peers with selected peers + self.fanout[topic] += self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers ) - in_topic_gossipsub_peers = self.fanout[topic] - for peer_id in in_topic_gossipsub_peers: - send_to.add(peer_id) + gossipsub_peers = fanout_peers # Excludes `msg_forwarder` and `origin` - yield from send_to.difference([msg_forwarder, origin]) + yield from set(floodsub_peers + gossipsub_peers).difference([msg_forwarder, origin]) async def join(self, topic: str) -> None: - # Note: the comments here are the near-exact algorithm description from the spec """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement. @@ -277,9 +279,8 @@ class GossipSub(IPubsubRouter): # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: - if peer not in self.mesh[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) self.fanout.pop(topic, None) @@ -300,7 +301,7 @@ class GossipSub(IPubsubRouter): await self.emit_prune(topic, peer) # Forget mesh[topic] - del self.mesh[topic] + self.mesh.pop(topic, None) async def _emit_control_msgs( self, @@ -452,7 +453,7 @@ class GossipSub(IPubsubRouter): del self.fanout[topic] del self.time_since_last_publish[topic] else: - # Check whether our peers are still in the topic + # Check if fanout peers are still in the topic and remove the ones that are not # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 in_topic_fanout_peers = [ peer