Change type of local peers var from list to set
This commit is contained in:
parent
f1d58ef8ff
commit
65766ec9ac
|
@ -213,7 +213,7 @@ class GossipSub(IPubsubRouter):
|
||||||
:param origin: peer id of the peer the message originate from.
|
:param origin: peer id of the peer the message originate from.
|
||||||
:return: a generator of the peer ids who we send data to.
|
:return: a generator of the peer ids who we send data to.
|
||||||
"""
|
"""
|
||||||
send_to: List[ID] = []
|
send_to: Set[ID] = set()
|
||||||
for topic in topic_ids:
|
for topic in topic_ids:
|
||||||
if topic not in self.pubsub.peer_topics:
|
if topic not in self.pubsub.peer_topics:
|
||||||
continue
|
continue
|
||||||
|
@ -221,14 +221,15 @@ class GossipSub(IPubsubRouter):
|
||||||
# floodsub peers
|
# floodsub peers
|
||||||
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
|
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
|
||||||
# This will improve the efficiency when searching for a peer's protocol id.
|
# This will improve the efficiency when searching for a peer's protocol id.
|
||||||
floodsub_peers: List[ID] = [
|
floodsub_peers: Set[ID] = set(
|
||||||
peer_id
|
peer_id
|
||||||
for peer_id in self.pubsub.peer_topics[topic]
|
for peer_id in self.pubsub.peer_topics[topic]
|
||||||
if peer_id in self.peers_floodsub
|
if peer_id in self.peers_floodsub
|
||||||
]
|
)
|
||||||
|
send_to.update(floodsub_peers)
|
||||||
|
|
||||||
# gossipsub peers
|
# gossipsub peers
|
||||||
gossipsub_peers: List[ID] = []
|
gossipsub_peers: Set[ID] = set()
|
||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
gossipsub_peers = self.mesh[topic]
|
gossipsub_peers = self.mesh[topic]
|
||||||
else:
|
else:
|
||||||
|
@ -236,21 +237,23 @@ class GossipSub(IPubsubRouter):
|
||||||
# `self.degree` number of peers who have subscribed to the topic and add them
|
# `self.degree` number of peers who have subscribed to the topic and add them
|
||||||
# as our `fanout` peers.
|
# as our `fanout` peers.
|
||||||
topic_in_fanout: bool = topic in self.fanout
|
topic_in_fanout: bool = topic in self.fanout
|
||||||
fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []
|
fanout_peers: Set[ID] = self.fanout[topic] if topic_in_fanout else set()
|
||||||
fanout_size = len(fanout_peers)
|
fanout_size = len(fanout_peers)
|
||||||
if not topic_in_fanout or (
|
if not topic_in_fanout or (
|
||||||
topic_in_fanout and fanout_size < self.degree
|
topic_in_fanout and fanout_size < self.degree
|
||||||
):
|
):
|
||||||
if topic in self.pubsub.peer_topics:
|
if topic in self.pubsub.peer_topics:
|
||||||
# Combine fanout peers with selected peers
|
# Combine fanout peers with selected peers
|
||||||
fanout_peers += self._get_in_topic_gossipsub_peers_from_minus(
|
fanout_peers.update(
|
||||||
topic, self.degree - fanout_size, fanout_peers
|
self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
topic, self.degree - fanout_size, fanout_peers
|
||||||
|
)
|
||||||
)
|
)
|
||||||
self.fanout[topic] = set(fanout_peers)
|
self.fanout[topic] = fanout_peers
|
||||||
gossipsub_peers = fanout_peers
|
gossipsub_peers = fanout_peers
|
||||||
send_to.extend(floodsub_peers + gossipsub_peers)
|
send_to.update(gossipsub_peers)
|
||||||
# Excludes `msg_forwarder` and `origin`
|
# Excludes `msg_forwarder` and `origin`
|
||||||
yield from set(send_to).difference([msg_forwarder, origin])
|
yield from send_to.difference([msg_forwarder, origin])
|
||||||
|
|
||||||
async def join(self, topic: str) -> None:
|
async def join(self, topic: str) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -264,10 +267,10 @@ class GossipSub(IPubsubRouter):
|
||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
return
|
return
|
||||||
# Create mesh[topic] if it does not yet exist
|
# Create mesh[topic] if it does not yet exist
|
||||||
self.mesh[topic] = []
|
self.mesh[topic] = set()
|
||||||
|
|
||||||
topic_in_fanout: bool = topic in self.fanout
|
topic_in_fanout: bool = topic in self.fanout
|
||||||
fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []
|
fanout_peers: Set[ID] = self.fanout[topic] if topic_in_fanout else set()
|
||||||
fanout_size = len(fanout_peers)
|
fanout_size = len(fanout_peers)
|
||||||
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
|
||||||
# There are less than D peers (let this number be x)
|
# There are less than D peers (let this number be x)
|
||||||
|
@ -278,7 +281,7 @@ class GossipSub(IPubsubRouter):
|
||||||
topic, self.degree - fanout_size, fanout_peers
|
topic, self.degree - fanout_size, fanout_peers
|
||||||
)
|
)
|
||||||
# Combine fanout peers with selected peers
|
# Combine fanout peers with selected peers
|
||||||
fanout_peers += selected_peers
|
fanout_peers.update(selected_peers)
|
||||||
|
|
||||||
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
|
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
|
||||||
for peer in fanout_peers:
|
for peer in fanout_peers:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user