Combine peers_gossipsub
and peers_floodsub
This commit is contained in:
parent
6cd3eb8fae
commit
e51d376d5e
|
@ -35,13 +35,11 @@ class GossipSub(IPubsubRouter):
|
|||
mesh: Dict[str, Set[ID]]
|
||||
fanout: Dict[str, Set[ID]]
|
||||
|
||||
peers_to_protocol: Dict[ID, str]
|
||||
# The protocol peer supports
|
||||
peer_protocol: Dict[ID, TProtocol]
|
||||
|
||||
time_since_last_publish: Dict[str, int]
|
||||
|
||||
peers_gossipsub: Set[ID]
|
||||
peers_floodsub: Set[ID]
|
||||
|
||||
mcache: MessageCache
|
||||
|
||||
heartbeat_initial_delay: float
|
||||
|
@ -75,14 +73,11 @@ class GossipSub(IPubsubRouter):
|
|||
self.fanout = {}
|
||||
|
||||
# Create peer --> protocol mapping
|
||||
self.peers_to_protocol = {}
|
||||
self.peer_protocol = {}
|
||||
|
||||
# Create topic --> time since last publish map
|
||||
self.time_since_last_publish = {}
|
||||
|
||||
self.peers_gossipsub = set()
|
||||
self.peers_floodsub = set()
|
||||
|
||||
# Create message cache
|
||||
self.mcache = MessageCache(gossip_window, gossip_history)
|
||||
|
||||
|
@ -121,17 +116,13 @@ class GossipSub(IPubsubRouter):
|
|||
"""
|
||||
logger.debug("adding peer %s with protocol %s", peer_id, protocol_id)
|
||||
|
||||
if protocol_id == PROTOCOL_ID:
|
||||
self.peers_gossipsub.add(peer_id)
|
||||
elif protocol_id == floodsub.PROTOCOL_ID:
|
||||
self.peers_floodsub.add(peer_id)
|
||||
else:
|
||||
if protocol_id not in (PROTOCOL_ID, floodsub.PROTOCOL_ID):
|
||||
# We should never enter here. Becuase the `protocol_id` is registered by your pubsub
|
||||
# instance in multistream-select, but it is not the protocol that gossipsub supports.
|
||||
# In this case, probably we registered gossipsub to a wrong `protocol_id`
|
||||
# in multistream-select, or wrong versions.
|
||||
raise Exception(f"Unreachable: Protocol={protocol_id} is not supported.")
|
||||
self.peers_to_protocol[peer_id] = protocol_id
|
||||
self.peer_protocol[peer_id] = protocol_id
|
||||
|
||||
def remove_peer(self, peer_id: ID) -> None:
|
||||
"""
|
||||
|
@ -141,15 +132,12 @@ class GossipSub(IPubsubRouter):
|
|||
"""
|
||||
logger.debug("removing peer %s", peer_id)
|
||||
|
||||
self.peers_gossipsub.discard(peer_id)
|
||||
self.peers_floodsub.discard(peer_id)
|
||||
|
||||
for topic in self.mesh:
|
||||
self.mesh[topic].discard(peer_id)
|
||||
for topic in self.fanout:
|
||||
self.fanout[topic].discard(peer_id)
|
||||
|
||||
self.peers_to_protocol.pop(peer_id, None)
|
||||
self.peer_protocol.pop(peer_id, None)
|
||||
|
||||
async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None:
|
||||
"""
|
||||
|
@ -217,12 +205,10 @@ class GossipSub(IPubsubRouter):
|
|||
continue
|
||||
|
||||
# floodsub peers
|
||||
# FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
|
||||
# This will improve the efficiency when searching for a peer's protocol id.
|
||||
floodsub_peers: Set[ID] = set(
|
||||
peer_id
|
||||
for peer_id in self.pubsub.peer_topics[topic]
|
||||
if peer_id in self.peers_floodsub
|
||||
if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
|
||||
)
|
||||
send_to.update(floodsub_peers)
|
||||
|
||||
|
@ -540,11 +526,9 @@ class GossipSub(IPubsubRouter):
|
|||
gossipsub_peers_in_topic = set(
|
||||
peer_id
|
||||
for peer_id in self.pubsub.peer_topics[topic]
|
||||
if peer_id in self.peers_gossipsub
|
||||
)
|
||||
return self.select_from_minus(
|
||||
num_to_select, gossipsub_peers_in_topic, minus
|
||||
if self.peer_protocol[peer_id] == PROTOCOL_ID
|
||||
)
|
||||
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, minus)
|
||||
|
||||
# RPC handlers
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import random
|
|||
import pytest
|
||||
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.pubsub.gossipsub import PROTOCOL_ID
|
||||
from libp2p.tools.constants import GOSSIPSUB_PARAMS, GossipsubParams
|
||||
from libp2p.tools.pubsub.utils import dense_connect, one_to_all_connect
|
||||
from libp2p.tools.utils import connect
|
||||
|
@ -108,7 +109,7 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch):
|
|||
monkeypatch.setattr(gossipsubs[index_bob], "emit_prune", emit_prune)
|
||||
|
||||
# Check that alice is bob's peer but not his mesh peer
|
||||
assert id_alice in gossipsubs[index_bob].peers_gossipsub
|
||||
assert gossipsubs[index_bob].peer_protocol[id_alice] == PROTOCOL_ID
|
||||
assert topic not in gossipsubs[index_bob].mesh
|
||||
|
||||
await gossipsubs[index_alice].emit_graft(topic, id_bob)
|
||||
|
@ -120,7 +121,7 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch):
|
|||
# Check that bob is alice's peer but not her mesh peer
|
||||
assert topic in gossipsubs[index_alice].mesh
|
||||
assert id_bob not in gossipsubs[index_alice].mesh[topic]
|
||||
assert id_bob in gossipsubs[index_alice].peers_gossipsub
|
||||
assert gossipsubs[index_alice].peer_protocol[id_bob] == PROTOCOL_ID
|
||||
|
||||
await gossipsubs[index_bob].emit_graft(topic, id_alice)
|
||||
|
||||
|
@ -390,7 +391,8 @@ async def test_mesh_heartbeat(
|
|||
fake_peer_ids = [
|
||||
ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count)
|
||||
]
|
||||
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", set(fake_peer_ids))
|
||||
peer_protocol = {peer_id: PROTOCOL_ID for peer_id in fake_peer_ids}
|
||||
monkeypatch.setattr(pubsubs_gsub[0].router, "peer_protocol", peer_protocol)
|
||||
|
||||
peer_topics = {topic: set(fake_peer_ids)}
|
||||
# Monkeypatch the peer subscriptions
|
||||
|
@ -437,7 +439,8 @@ async def test_gossip_heartbeat(
|
|||
fake_peer_ids = [
|
||||
ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count)
|
||||
]
|
||||
monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", set(fake_peer_ids))
|
||||
peer_protocol = {peer_id: PROTOCOL_ID for peer_id in fake_peer_ids}
|
||||
monkeypatch.setattr(pubsubs_gsub[0].router, "peer_protocol", peer_protocol)
|
||||
|
||||
topic_mesh_peer_count = 14
|
||||
# Split into mesh peers and fanout peers
|
||||
|
|
Loading…
Reference in New Issue
Block a user