Refactor gossipsub.join
This commit is contained in:
parent
1e78c21eca
commit
b5c3420c16
|
@ -184,43 +184,29 @@ class GossipSub(IPubsubRouter):
|
|||
# Create mesh[topic] if it does not yet exist
|
||||
self.mesh[topic] = []
|
||||
|
||||
if topic in self.fanout and len(self.fanout[topic]) == self.degree:
|
||||
# If router already has D peers from the fanout peers of a topic
|
||||
# TODO: Do we remove all peers from fanout[topic]?
|
||||
topic_in_fanout = topic in self.fanout
|
||||
fanout_peers = self.fanout[topic] if topic_in_fanout else []
|
||||
fanout_size = len(fanout_peers)
|
||||
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
||||
# There are less than D peers (let this number be x)
|
||||
# in the fanout for a topic (or the topic is not in the fanout).
|
||||
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
||||
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
|
||||
if peer in self.peers_gossipsub]
|
||||
selected_peers = \
|
||||
GossipSub.select_from_minus(self.degree - fanout_size,
|
||||
gossipsub_peers_in_topic,
|
||||
fanout_peers)
|
||||
|
||||
# Add them to mesh[topic], and notifies them with a
|
||||
# GRAFT(topic) control message.
|
||||
for peer in self.fanout[topic]:
|
||||
self.mesh[topic].append(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
else:
|
||||
# Otherwise, if there are less than D peers
|
||||
# (let this number be x) in the fanout for a topic (or the topic is not in the fanout),
|
||||
fanout_size = 0
|
||||
if topic in self.fanout:
|
||||
fanout_size = len(self.fanout[topic])
|
||||
# then it still adds them as above (if there are any)
|
||||
for peer in self.fanout[topic]:
|
||||
self.mesh[topic].append(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
# Combine fanout peers with selected peers
|
||||
fanout_peers += selected_peers
|
||||
|
||||
if topic in self.peers_gossipsub:
|
||||
# TODO: Should we have self.fanout[topic] here or [] (as the minus variable)?
|
||||
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic]
|
||||
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
|
||||
if peer in self.peers_gossipsub]
|
||||
selected_peers = \
|
||||
GossipSub.select_from_minus(self.degree - fanout_size,
|
||||
gossipsub_peers_in_topic,
|
||||
self.fanout[topic] if topic in self.fanout else [])
|
||||
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
|
||||
for peer in fanout_peers:
|
||||
self.mesh[topic].append(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
|
||||
# And likewise adds them to mesh[topic] and notifies them with a
|
||||
# GRAFT(topic) control message.
|
||||
for peer in selected_peers:
|
||||
self.mesh[topic].append(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
|
||||
# TODO: Do we remove all peers from fanout[topic]?
|
||||
# TODO: Do we remove all peers from fanout[topic]?
|
||||
|
||||
async def leave(self, topic):
|
||||
# Note: the comments here are the near-exact algorithm description from the spec
|
||||
|
|
Loading…
Reference in New Issue
Block a user