Refactor _get_peers_to_send

This commit is contained in:
NIC619 2019-12-05 14:35:34 +08:00
parent 8e591229fd
commit e6813da5f5
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17

View File

@ -2,7 +2,7 @@ from ast import literal_eval
import asyncio
import logging
import random
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple
from typing import Any, Dict, Iterable, List, Sequence, Tuple
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID
@ -216,38 +216,40 @@ class GossipSub(IPubsubRouter):
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
send_to: Set[ID] = set()
for topic in topic_ids:
if topic not in self.pubsub.peer_topics:
continue
# floodsub peers
for peer_id in self.pubsub.peer_topics[topic]:
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
# This will improve the efficiency when searching for a peer's protocol id.
if peer_id in self.peers_floodsub:
send_to.add(peer_id)
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
# This will improve the efficiency when searching for a peer's protocol id.
floodsub_peers: List[ID] = [
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if peer_id in self.peers_floodsub
]
# gossipsub peers
in_topic_gossipsub_peers: List[ID] = None
gossipsub_peers: List[ID] = []
if topic in self.mesh:
in_topic_gossipsub_peers = self.mesh[topic]
gossipsub_peers = self.mesh[topic]
else:
# It could be the case that we publish to a topic that we have not subscribe
# and the topic is not yet added to our `fanout`.
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
# If no peers in fanout, choose some peers from gossipsub peers in topic.
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, []
# When we publish to a topic that we have not subscribe to, we randomly pick
# `self.degree` number of peers who have subscribe to the topic and add them
# as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else []
fanout_size = len(fanout_peers)
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
# Combine fanout peers with selected peers
self.fanout[topic] += self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
)
in_topic_gossipsub_peers = self.fanout[topic]
for peer_id in in_topic_gossipsub_peers:
send_to.add(peer_id)
gossipsub_peers = fanout_peers
# Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin])
yield from set(floodsub_peers + gossipsub_peers).difference([msg_forwarder, origin])
async def join(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec
"""
Join notifies the router that we want to receive and forward messages
in a topic. It is invoked after the subscription announcement.
@ -277,9 +279,8 @@ class GossipSub(IPubsubRouter):
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
for peer in fanout_peers:
if peer not in self.mesh[topic]:
self.mesh[topic].append(peer)
await self.emit_graft(topic, peer)
self.mesh[topic].append(peer)
await self.emit_graft(topic, peer)
self.fanout.pop(topic, None)
@ -300,7 +301,7 @@ class GossipSub(IPubsubRouter):
await self.emit_prune(topic, peer)
# Forget mesh[topic]
del self.mesh[topic]
self.mesh.pop(topic, None)
async def _emit_control_msgs(
self,
@ -452,7 +453,7 @@ class GossipSub(IPubsubRouter):
del self.fanout[topic]
del self.time_since_last_publish[topic]
else:
# Check whether our peers are still in the topic
# Check if fanout peers are still in the topic and remove the ones that are not
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
in_topic_fanout_peers = [
peer