Apply PR feedback:
change param type and remove check before `discard`
This commit is contained in:
parent
f3732f9480
commit
6cd3eb8fae
@ -141,17 +141,13 @@ class GossipSub(IPubsubRouter):
|
|||||||
"""
|
"""
|
||||||
logger.debug("removing peer %s", peer_id)
|
logger.debug("removing peer %s", peer_id)
|
||||||
|
|
||||||
if peer_id in self.peers_gossipsub:
|
self.peers_gossipsub.discard(peer_id)
|
||||||
self.peers_gossipsub.discard(peer_id)
|
self.peers_floodsub.discard(peer_id)
|
||||||
elif peer_id in self.peers_floodsub:
|
|
||||||
self.peers_floodsub.discard(peer_id)
|
|
||||||
|
|
||||||
for topic in self.mesh:
|
for topic in self.mesh:
|
||||||
if peer_id in self.mesh[topic]:
|
self.mesh[topic].discard(peer_id)
|
||||||
self.mesh[topic].discard(peer_id)
|
|
||||||
for topic in self.fanout:
|
for topic in self.fanout:
|
||||||
if peer_id in self.fanout[topic]:
|
self.fanout[topic].discard(peer_id)
|
||||||
self.fanout[topic].discard(peer_id)
|
|
||||||
|
|
||||||
self.peers_to_protocol.pop(peer_id, None)
|
self.peers_to_protocol.pop(peer_id, None)
|
||||||
|
|
||||||
@ -248,7 +244,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
# Combine fanout peers with selected peers
|
# Combine fanout peers with selected peers
|
||||||
fanout_peers.update(
|
fanout_peers.update(
|
||||||
self._get_in_topic_gossipsub_peers_from_minus(
|
self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - fanout_size, list(fanout_peers)
|
topic, self.degree - fanout_size, fanout_peers
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.fanout[topic] = fanout_peers
|
self.fanout[topic] = fanout_peers
|
||||||
@ -280,7 +276,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
|
||||||
if topic in self.pubsub.peer_topics:
|
if topic in self.pubsub.peer_topics:
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - fanout_size, list(fanout_peers)
|
topic, self.degree - fanout_size, fanout_peers
|
||||||
)
|
)
|
||||||
# Combine fanout peers with selected peers
|
# Combine fanout peers with selected peers
|
||||||
fanout_peers.update(selected_peers)
|
fanout_peers.update(selected_peers)
|
||||||
@ -419,7 +415,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
if num_mesh_peers_in_topic < self.degree_low:
|
if num_mesh_peers_in_topic < self.degree_low:
|
||||||
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
|
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
|
||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree - num_mesh_peers_in_topic, list(self.mesh[topic])
|
topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic]
|
||||||
)
|
)
|
||||||
|
|
||||||
for peer in selected_peers:
|
for peer in selected_peers:
|
||||||
@ -432,7 +428,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
if num_mesh_peers_in_topic > self.degree_high:
|
if num_mesh_peers_in_topic > self.degree_high:
|
||||||
# Select |mesh[topic]| - D peers from mesh[topic]
|
# Select |mesh[topic]| - D peers from mesh[topic]
|
||||||
selected_peers = GossipSub.select_from_minus(
|
selected_peers = GossipSub.select_from_minus(
|
||||||
num_mesh_peers_in_topic - self.degree, list(self.mesh[topic]), []
|
num_mesh_peers_in_topic - self.degree, self.mesh[topic], set()
|
||||||
)
|
)
|
||||||
for peer in selected_peers:
|
for peer in selected_peers:
|
||||||
# Remove peer from mesh[topic]
|
# Remove peer from mesh[topic]
|
||||||
@ -472,7 +468,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic,
|
topic,
|
||||||
self.degree - num_fanout_peers_in_topic,
|
self.degree - num_fanout_peers_in_topic,
|
||||||
list(self.fanout[topic]),
|
self.fanout[topic],
|
||||||
)
|
)
|
||||||
# Add the peers to fanout[topic]
|
# Add the peers to fanout[topic]
|
||||||
self.fanout[topic].update(selected_peers)
|
self.fanout[topic].update(selected_peers)
|
||||||
@ -486,7 +482,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
if topic in self.pubsub.peer_topics:
|
if topic in self.pubsub.peer_topics:
|
||||||
# Select D peers from peers.gossipsub[topic]
|
# Select D peers from peers.gossipsub[topic]
|
||||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree, list(self.mesh[topic])
|
topic, self.degree, self.mesh[topic]
|
||||||
)
|
)
|
||||||
|
|
||||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||||
@ -502,7 +498,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
if topic in self.pubsub.peer_topics:
|
if topic in self.pubsub.peer_topics:
|
||||||
# Select D peers from peers.gossipsub[topic]
|
# Select D peers from peers.gossipsub[topic]
|
||||||
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
topic, self.degree, list(self.fanout[topic])
|
topic, self.degree, self.fanout[topic]
|
||||||
)
|
)
|
||||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
msg_id_strs = [str(msg) for msg in msg_ids]
|
||||||
for peer in peers_to_emit_ihave_to:
|
for peer in peers_to_emit_ihave_to:
|
||||||
@ -511,7 +507,7 @@ class GossipSub(IPubsubRouter):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def select_from_minus(
|
def select_from_minus(
|
||||||
num_to_select: int, pool: Sequence[Any], minus: Sequence[Any]
|
num_to_select: int, pool: Iterable[Any], minus: Iterable[Any]
|
||||||
) -> List[Any]:
|
) -> List[Any]:
|
||||||
"""
|
"""
|
||||||
Select at most num_to_select subset of elements from the set (pool - minus) randomly.
|
Select at most num_to_select subset of elements from the set (pool - minus) randomly.
|
||||||
@ -539,15 +535,15 @@ class GossipSub(IPubsubRouter):
|
|||||||
return selection
|
return selection
|
||||||
|
|
||||||
def _get_in_topic_gossipsub_peers_from_minus(
|
def _get_in_topic_gossipsub_peers_from_minus(
|
||||||
self, topic: str, num_to_select: int, minus: Sequence[ID]
|
self, topic: str, num_to_select: int, minus: Iterable[ID]
|
||||||
) -> List[ID]:
|
) -> List[ID]:
|
||||||
gossipsub_peers_in_topic = [
|
gossipsub_peers_in_topic = set(
|
||||||
peer_id
|
peer_id
|
||||||
for peer_id in self.pubsub.peer_topics[topic]
|
for peer_id in self.pubsub.peer_topics[topic]
|
||||||
if peer_id in self.peers_gossipsub
|
if peer_id in self.peers_gossipsub
|
||||||
]
|
)
|
||||||
return self.select_from_minus(
|
return self.select_from_minus(
|
||||||
num_to_select, gossipsub_peers_in_topic, list(minus)
|
num_to_select, gossipsub_peers_in_topic, minus
|
||||||
)
|
)
|
||||||
|
|
||||||
# RPC handlers
|
# RPC handlers
|
||||||
@ -642,8 +638,8 @@ class GossipSub(IPubsubRouter):
|
|||||||
) -> None:
|
) -> None:
|
||||||
topic: str = prune_msg.topicID
|
topic: str = prune_msg.topicID
|
||||||
|
|
||||||
# Remove peer from mesh for topic, if peer is in topic
|
# Remove peer from mesh for topic
|
||||||
if topic in self.mesh and sender_peer_id in self.mesh[topic]:
|
if topic in self.mesh:
|
||||||
self.mesh[topic].discard(sender_peer_id)
|
self.mesh[topic].discard(sender_peer_id)
|
||||||
|
|
||||||
# RPC emitters
|
# RPC emitters
|
||||||
|
Loading…
x
Reference in New Issue
Block a user