Reflect PR feedback

This commit is contained in:
mhchia 2019-07-28 18:06:38 +08:00
parent 70c5c84f32
commit 74d831d4e2
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
2 changed files with 33 additions and 32 deletions

View File

@ -102,8 +102,8 @@ class FloodSub(IPubsubRouter):
origin: ID) -> Iterable[ID]:
"""
Get the eligible peers to send the data to.
:param src: peer ID of the peer who forwards the message to us
:param origin: peer id of the peer the message originate from
:param src: peer ID of the peer who forwards the message to us.
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
for topic in topic_ids:

View File

@ -149,21 +149,21 @@ class GossipSub(IPubsubRouter):
"""
Get the eligible peers to send the data to.
:param src: the peer id of the peer who forwards the message to me.
:param origin: the peer id of the peer who originally broadcast the message.
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
to_send: MutableSet[ID] = set()
send_to: MutableSet[ID] = set()
for topic in topic_ids:
if topic not in self.pubsub.peer_topics:
continue
# floodsub peers
for peer_id_str in self.pubsub.peer_topics[topic]:
peer_id = id_b58_decode(peer_id_str)
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
# This will improve the efficiency when searching for a peer's protocol id.
if peer_id_str in self.peers_floodsub:
to_send.add(peer_id)
peer_id = id_b58_decode(peer_id_str)
send_to.add(peer_id)
# gossipsub peers
# FIXME: Change `str` to `ID`
@ -180,12 +180,16 @@ class GossipSub(IPubsubRouter):
# pylint: disable=len-as-condition
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
# If no peers in fanout, choose some peers from gossipsub peers in topic.
self.fanout[topic] = self._get_peers_from_minus(topic, self.degree, [])
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree,
[],
)
gossipsub_peers = self.fanout[topic]
for peer_id_str in gossipsub_peers:
to_send.add(id_b58_decode(peer_id_str))
send_to.add(id_b58_decode(peer_id_str))
# Excludes `src` and `origin`
yield from to_send.difference([src, origin])
yield from send_to.difference([src, origin])
async def join(self, topic):
# Note: the comments here are the near-exact algorithm description from the spec
@ -208,7 +212,7 @@ class GossipSub(IPubsubRouter):
# in the fanout for a topic (or the topic is not in the fanout).
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
if topic in self.pubsub.peer_topics:
selected_peers = self._get_peers_from_minus(
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - fanout_size,
fanout_peers,
@ -286,13 +290,10 @@ class GossipSub(IPubsubRouter):
num_mesh_peers_in_topic = len(self.mesh[topic])
if num_mesh_peers_in_topic < self.degree_low:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
selected_peers = GossipSub.select_from_minus(
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - num_mesh_peers_in_topic,
gossipsub_peers_in_topic,
self.mesh[topic],
)
@ -334,12 +335,11 @@ class GossipSub(IPubsubRouter):
# If |fanout[topic]| < D
if num_fanout_peers_in_topic < self.degree:
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic]
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
selected_peers = \
GossipSub.select_from_minus(self.degree - num_fanout_peers_in_topic,
gossipsub_peers_in_topic, self.fanout[topic])
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - num_fanout_peers_in_topic,
self.fanout[topic],
)
# Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers)
@ -351,12 +351,12 @@ class GossipSub(IPubsubRouter):
# TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too
if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = \
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, [])
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree,
[],
)
for peer in peers_to_emit_ihave_to:
# TODO: this line is a monster, can hopefully be simplified
@ -365,6 +365,7 @@ class GossipSub(IPubsubRouter):
msg_ids = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_ids, peer)
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
# Do the same for fanout, for all topics not already hit in mesh
for topic in self.fanout:
if topic not in self.mesh:
@ -373,12 +374,12 @@ class GossipSub(IPubsubRouter):
# TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in topic and only add if they are gossipsub peers also
if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = \
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, [])
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree,
[],
)
for peer in peers_to_emit_ihave_to:
if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
@ -414,7 +415,7 @@ class GossipSub(IPubsubRouter):
return selection
def _get_peers_from_minus(
def _get_in_topic_gossipsub_peers_from_minus(
self,
topic: str,
num_to_select: int,