diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 1969d4f..7ef1372 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -3,7 +3,7 @@ import asyncio from collections import defaultdict import logging import random -from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple +from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Set, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -32,15 +32,15 @@ class GossipSub(IPubsubRouter): time_to_live: int - mesh: Dict[str, List[ID]] - fanout: Dict[str, List[ID]] + mesh: Dict[str, Set[ID]] + fanout: Dict[str, Set[ID]] peers_to_protocol: Dict[ID, str] time_since_last_publish: Dict[str, int] - peers_gossipsub: List[ID] - peers_floodsub: List[ID] + peers_gossipsub: Set[ID] + peers_floodsub: Set[ID] mcache: MessageCache @@ -80,8 +80,8 @@ class GossipSub(IPubsubRouter): # Create topic --> time since last publish map self.time_since_last_publish = {} - self.peers_gossipsub = [] - self.peers_floodsub = [] + self.peers_gossipsub = set() + self.peers_floodsub = set() # Create message cache self.mcache = MessageCache(gossip_window, gossip_history) @@ -122,9 +122,9 @@ class GossipSub(IPubsubRouter): logger.debug("adding peer %s with protocol %s", peer_id, protocol_id) if protocol_id == PROTOCOL_ID: - self.peers_gossipsub.append(peer_id) + self.peers_gossipsub.add(peer_id) elif protocol_id == floodsub.PROTOCOL_ID: - self.peers_floodsub.append(peer_id) + self.peers_floodsub.add(peer_id) else: # We should never enter here. Becuase the `protocol_id` is registered by your pubsub # instance in multistream-select, but it is not the protocol that gossipsub supports. @@ -142,16 +142,16 @@ class GossipSub(IPubsubRouter): logger.debug("removing peer %s", peer_id) if peer_id in self.peers_gossipsub: - self.peers_gossipsub.remove(peer_id) + self.peers_gossipsub.discard(peer_id) elif peer_id in self.peers_floodsub: - self.peers_floodsub.remove(peer_id) + self.peers_floodsub.discard(peer_id) for topic in self.mesh: if peer_id in self.mesh[topic]: - self.mesh[topic].remove(peer_id) + self.mesh[topic].discard(peer_id) for topic in self.fanout: if peer_id in self.fanout[topic]: - self.fanout[topic].remove(peer_id) + self.fanout[topic].discard(peer_id) self.peers_to_protocol.pop(peer_id, None) @@ -246,7 +246,7 @@ class GossipSub(IPubsubRouter): fanout_peers += self._get_in_topic_gossipsub_peers_from_minus( topic, self.degree - fanout_size, fanout_peers ) - self.fanout[topic] = fanout_peers + self.fanout[topic] = set(fanout_peers) gossipsub_peers = fanout_peers send_to.extend(floodsub_peers + gossipsub_peers) # Excludes `msg_forwarder` and `origin` @@ -282,7 +282,7 @@ class GossipSub(IPubsubRouter): # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: - self.mesh[topic].append(peer) + self.mesh[topic].add(peer) await self.emit_graft(topic, peer) self.fanout.pop(topic, None) @@ -419,7 +419,7 @@ class GossipSub(IPubsubRouter): for peer in selected_peers: # Add peer to mesh[topic] - self.mesh[topic].append(peer) + self.mesh[topic].add(peer) # Emit GRAFT(topic) control message to peer peers_to_graft[peer].append(topic) @@ -431,7 +431,7 @@ class GossipSub(IPubsubRouter): ) for peer in selected_peers: # Remove peer from mesh[topic] - self.mesh[topic].remove(peer) + self.mesh[topic].discard(peer) # Emit PRUNE(topic) control message to peer peers_to_prune[peer].append(topic) @@ -458,7 +458,7 @@ class GossipSub(IPubsubRouter): for peer in self.fanout[topic] if peer in self.pubsub.peer_topics[topic] ] - self.fanout[topic] = in_topic_fanout_peers + self.fanout[topic] = set(in_topic_fanout_peers) num_fanout_peers_in_topic = len(self.fanout[topic]) # If |fanout[topic]| < D @@ -470,7 +470,7 @@ class GossipSub(IPubsubRouter): self.fanout[topic], ) # Add the peers to fanout[topic] - self.fanout[topic].extend(selected_peers) + self.fanout[topic].Update(selected_peers) def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]: peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict) @@ -621,7 +621,7 @@ class GossipSub(IPubsubRouter): # Add peer to mesh for topic if topic in self.mesh: if sender_peer_id not in self.mesh[topic]: - self.mesh[topic].append(sender_peer_id) + self.mesh[topic].add(sender_peer_id) else: # Respond with PRUNE if not subscribed to the topic await self.emit_prune(topic, sender_peer_id) @@ -633,7 +633,7 @@ class GossipSub(IPubsubRouter): # Remove peer from mesh for topic, if peer is in topic if topic in self.mesh and sender_peer_id in self.mesh[topic]: - self.mesh[topic].remove(sender_peer_id) + self.mesh[topic].discard(sender_peer_id) # RPC emitters