Change type of peers from list to set:
`peers_gossipsub`, `peers_floodsub` and mesh/fanout peers
This commit is contained in:
parent
7d6daa8e10
commit
f1d58ef8ff
|
@ -3,7 +3,7 @@ import asyncio
|
|||
from collections import defaultdict
|
||||
import logging
|
||||
import random
|
||||
from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple
|
||||
from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Set, Tuple
|
||||
|
||||
from libp2p.network.stream.exceptions import StreamClosed
|
||||
from libp2p.peer.id import ID
|
||||
|
@ -32,15 +32,15 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
time_to_live: int
|
||||
|
||||
mesh: Dict[str, List[ID]]
|
||||
fanout: Dict[str, List[ID]]
|
||||
mesh: Dict[str, Set[ID]]
|
||||
fanout: Dict[str, Set[ID]]
|
||||
|
||||
peers_to_protocol: Dict[ID, str]
|
||||
|
||||
time_since_last_publish: Dict[str, int]
|
||||
|
||||
peers_gossipsub: List[ID]
|
||||
peers_floodsub: List[ID]
|
||||
peers_gossipsub: Set[ID]
|
||||
peers_floodsub: Set[ID]
|
||||
|
||||
mcache: MessageCache
|
||||
|
||||
|
@ -80,8 +80,8 @@ class GossipSub(IPubsubRouter):
|
|||
# Create topic --> time since last publish map
|
||||
self.time_since_last_publish = {}
|
||||
|
||||
self.peers_gossipsub = []
|
||||
self.peers_floodsub = []
|
||||
self.peers_gossipsub = set()
|
||||
self.peers_floodsub = set()
|
||||
|
||||
# Create message cache
|
||||
self.mcache = MessageCache(gossip_window, gossip_history)
|
||||
|
@ -122,9 +122,9 @@ class GossipSub(IPubsubRouter):
|
|||
logger.debug("adding peer %s with protocol %s", peer_id, protocol_id)
|
||||
|
||||
if protocol_id == PROTOCOL_ID:
|
||||
self.peers_gossipsub.append(peer_id)
|
||||
self.peers_gossipsub.add(peer_id)
|
||||
elif protocol_id == floodsub.PROTOCOL_ID:
|
||||
self.peers_floodsub.append(peer_id)
|
||||
self.peers_floodsub.add(peer_id)
|
||||
else:
|
||||
# We should never enter here. Becuase the `protocol_id` is registered by your pubsub
|
||||
# instance in multistream-select, but it is not the protocol that gossipsub supports.
|
||||
|
@ -142,16 +142,16 @@ class GossipSub(IPubsubRouter):
|
|||
logger.debug("removing peer %s", peer_id)
|
||||
|
||||
if peer_id in self.peers_gossipsub:
|
||||
self.peers_gossipsub.remove(peer_id)
|
||||
self.peers_gossipsub.discard(peer_id)
|
||||
elif peer_id in self.peers_floodsub:
|
||||
self.peers_floodsub.remove(peer_id)
|
||||
self.peers_floodsub.discard(peer_id)
|
||||
|
||||
for topic in self.mesh:
|
||||
if peer_id in self.mesh[topic]:
|
||||
self.mesh[topic].remove(peer_id)
|
||||
self.mesh[topic].discard(peer_id)
|
||||
for topic in self.fanout:
|
||||
if peer_id in self.fanout[topic]:
|
||||
self.fanout[topic].remove(peer_id)
|
||||
self.fanout[topic].discard(peer_id)
|
||||
|
||||
self.peers_to_protocol.pop(peer_id, None)
|
||||
|
||||
|
@ -246,7 +246,7 @@ class GossipSub(IPubsubRouter):
|
|||
fanout_peers += self._get_in_topic_gossipsub_peers_from_minus(
|
||||
topic, self.degree - fanout_size, fanout_peers
|
||||
)
|
||||
self.fanout[topic] = fanout_peers
|
||||
self.fanout[topic] = set(fanout_peers)
|
||||
gossipsub_peers = fanout_peers
|
||||
send_to.extend(floodsub_peers + gossipsub_peers)
|
||||
# Excludes `msg_forwarder` and `origin`
|
||||
|
@ -282,7 +282,7 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
|
||||
for peer in fanout_peers:
|
||||
self.mesh[topic].append(peer)
|
||||
self.mesh[topic].add(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
|
||||
self.fanout.pop(topic, None)
|
||||
|
@ -419,7 +419,7 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
for peer in selected_peers:
|
||||
# Add peer to mesh[topic]
|
||||
self.mesh[topic].append(peer)
|
||||
self.mesh[topic].add(peer)
|
||||
|
||||
# Emit GRAFT(topic) control message to peer
|
||||
peers_to_graft[peer].append(topic)
|
||||
|
@ -431,7 +431,7 @@ class GossipSub(IPubsubRouter):
|
|||
)
|
||||
for peer in selected_peers:
|
||||
# Remove peer from mesh[topic]
|
||||
self.mesh[topic].remove(peer)
|
||||
self.mesh[topic].discard(peer)
|
||||
|
||||
# Emit PRUNE(topic) control message to peer
|
||||
peers_to_prune[peer].append(topic)
|
||||
|
@ -458,7 +458,7 @@ class GossipSub(IPubsubRouter):
|
|||
for peer in self.fanout[topic]
|
||||
if peer in self.pubsub.peer_topics[topic]
|
||||
]
|
||||
self.fanout[topic] = in_topic_fanout_peers
|
||||
self.fanout[topic] = set(in_topic_fanout_peers)
|
||||
num_fanout_peers_in_topic = len(self.fanout[topic])
|
||||
|
||||
# If |fanout[topic]| < D
|
||||
|
@ -470,7 +470,7 @@ class GossipSub(IPubsubRouter):
|
|||
self.fanout[topic],
|
||||
)
|
||||
# Add the peers to fanout[topic]
|
||||
self.fanout[topic].extend(selected_peers)
|
||||
self.fanout[topic].Update(selected_peers)
|
||||
|
||||
def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]:
|
||||
peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict)
|
||||
|
@ -621,7 +621,7 @@ class GossipSub(IPubsubRouter):
|
|||
# Add peer to mesh for topic
|
||||
if topic in self.mesh:
|
||||
if sender_peer_id not in self.mesh[topic]:
|
||||
self.mesh[topic].append(sender_peer_id)
|
||||
self.mesh[topic].add(sender_peer_id)
|
||||
else:
|
||||
# Respond with PRUNE if not subscribed to the topic
|
||||
await self.emit_prune(topic, sender_peer_id)
|
||||
|
@ -633,7 +633,7 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
# Remove peer from mesh for topic, if peer is in topic
|
||||
if topic in self.mesh and sender_peer_id in self.mesh[topic]:
|
||||
self.mesh[topic].remove(sender_peer_id)
|
||||
self.mesh[topic].discard(sender_peer_id)
|
||||
|
||||
# RPC emitters
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user