diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index d09db76..045ef39 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -1,8 +1,9 @@ from ast import literal_eval import asyncio +from collections import defaultdict import logging import random -from typing import Any, Dict, Iterable, List, Sequence, Set +from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple from libp2p.network.stream.exceptions import StreamClosed from libp2p.peer.id import ID @@ -43,6 +44,7 @@ class GossipSub(IPubsubRouter): mcache: MessageCache + heartbeat_initial_delay: float heartbeat_interval: int def __init__( @@ -54,6 +56,7 @@ class GossipSub(IPubsubRouter): time_to_live: int, gossip_window: int = 3, gossip_history: int = 5, + heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120, ) -> None: self.protocols = list(protocols) @@ -84,6 +87,7 @@ class GossipSub(IPubsubRouter): self.mcache = MessageCache(gossip_window, gossip_history) # Create heartbeat timer + self.heartbeat_initial_delay = heartbeat_initial_delay self.heartbeat_interval = heartbeat_interval # Interface functions @@ -106,7 +110,6 @@ class GossipSub(IPubsubRouter): logger.debug("attached to pusub") # Start heartbeat now that we have a pubsub instance - # TODO: Start after delay asyncio.ensure_future(self.heartbeat()) def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: @@ -127,8 +130,7 @@ class GossipSub(IPubsubRouter): # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. - # TODO: Better handling - raise Exception(f"protocol is not supported: protocol_id={protocol_id}") + raise Exception(f"Unreachable: Protocol={protocol_id} is not supported.") self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: @@ -144,6 +146,15 @@ class GossipSub(IPubsubRouter): elif peer_id in self.peers_floodsub: self.peers_floodsub.remove(peer_id) + for topic in self.mesh: + if peer_id in self.mesh[topic]: + # Delete the entry if no other peers left + self.mesh[topic].remove(peer_id) + for topic in self.fanout: + if peer_id in self.fanout[topic]: + # Delete the entry if no other peers left + self.fanout[topic].remove(peer_id) + self.peers_to_protocol.pop(peer_id, None) async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: @@ -204,42 +215,46 @@ class GossipSub(IPubsubRouter): :param origin: peer id of the peer the message originate from. :return: a generator of the peer ids who we send data to. """ - send_to: Set[ID] = set() + send_to: List[ID] = [] for topic in topic_ids: if topic not in self.pubsub.peer_topics: continue # floodsub peers - for peer_id in self.pubsub.peer_topics[topic]: - # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. - # This will improve the efficiency when searching for a peer's protocol id. - if peer_id in self.peers_floodsub: - send_to.add(peer_id) + # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. + # This will improve the efficiency when searching for a peer's protocol id. + floodsub_peers: List[ID] = [ + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if peer_id in self.peers_floodsub + ] # gossipsub peers - in_topic_gossipsub_peers: List[ID] = None - # TODO: Do we need to check `topic in self.pubsub.my_topics`? + gossipsub_peers: List[ID] = [] if topic in self.mesh: - in_topic_gossipsub_peers = self.mesh[topic] + gossipsub_peers = self.mesh[topic] else: - # TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not - # subscribed? - # I assume there could be short periods between heartbeats where topic may not - # be but we should check that this path gets hit appropriately - - if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): - # If no peers in fanout, choose some peers from gossipsub peers in topic. - self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] - ) - in_topic_gossipsub_peers = self.fanout[topic] - for peer_id in in_topic_gossipsub_peers: - send_to.add(peer_id) + # When we publish to a topic that we have not subscribe to, we randomly pick + # `self.degree` number of peers who have subscribed to the topic and add them + # as our `fanout` peers. + topic_in_fanout: bool = topic in self.fanout + fanout_peers: List[ID] = self.fanout[topic] if topic_in_fanout else [] + fanout_size = len(fanout_peers) + if not topic_in_fanout or ( + topic_in_fanout and fanout_size < self.degree + ): + if topic in self.pubsub.peer_topics: + # Combine fanout peers with selected peers + fanout_peers += self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers + ) + self.fanout[topic] = fanout_peers + gossipsub_peers = fanout_peers + send_to.extend(floodsub_peers + gossipsub_peers) # Excludes `msg_forwarder` and `origin` - yield from send_to.difference([msg_forwarder, origin]) + yield from set(send_to).difference([msg_forwarder, origin]) async def join(self, topic: str) -> None: - # Note: the comments here are the near-exact algorithm description from the spec """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the subscription announcement. @@ -269,9 +284,8 @@ class GossipSub(IPubsubRouter): # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: - if peer not in self.mesh[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) self.fanout.pop(topic, None) @@ -292,7 +306,75 @@ class GossipSub(IPubsubRouter): await self.emit_prune(topic, peer) # Forget mesh[topic] - del self.mesh[topic] + self.mesh.pop(topic, None) + + async def _emit_control_msgs( + self, + peers_to_graft: Dict[ID, List[str]], + peers_to_prune: Dict[ID, List[str]], + peers_to_gossip: Dict[ID, Dict[str, List[str]]], + ) -> None: + graft_msgs: List[rpc_pb2.ControlGraft] = [] + prune_msgs: List[rpc_pb2.ControlPrune] = [] + ihave_msgs: List[rpc_pb2.ControlIHave] = [] + # Starting with GRAFT messages + for peer, topics in peers_to_graft.items(): + for topic in topics: + graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft(topicID=topic) + graft_msgs.append(graft_msg) + + # If there are also PRUNE messages to send to this peer + if peer in peers_to_prune: + for topic in peers_to_prune[peer]: + prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune( + topicID=topic + ) + prune_msgs.append(prune_msg) + del peers_to_prune[peer] + + # If there are also IHAVE messages to send to this peer + if peer in peers_to_gossip: + for topic in peers_to_gossip[peer]: + ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) + ihave_msgs.append(ihave_msg) + del peers_to_gossip[peer] + + control_msg = self.pack_control_msgs(ihave_msgs, graft_msgs, prune_msgs) + await self.emit_control_message(control_msg, peer) + + # Next with PRUNE messages + for peer, topics in peers_to_prune.items(): + prune_msgs = [] + for topic in topics: + prune_msg = rpc_pb2.ControlPrune(topicID=topic) + prune_msgs.append(prune_msg) + + # If there are also IHAVE messages to send to this peer + if peer in peers_to_gossip: + ihave_msgs = [] + for topic in peers_to_gossip[peer]: + ihave_msg = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) + ihave_msgs.append(ihave_msg) + del peers_to_gossip[peer] + + control_msg = self.pack_control_msgs(ihave_msgs, None, prune_msgs) + await self.emit_control_message(control_msg, peer) + + # Fianlly IHAVE messages + for peer in peers_to_gossip: + ihave_msgs = [] + for topic in peers_to_gossip[peer]: + ihave_msg = rpc_pb2.ControlIHave( + messageIDs=peers_to_gossip[peer][topic], topicID=topic + ) + ihave_msgs.append(ihave_msg) + + control_msg = self.pack_control_msgs(ihave_msgs, None, None) + await self.emit_control_message(control_msg, peer) # Heartbeat async def heartbeat(self) -> None: @@ -302,16 +384,29 @@ class GossipSub(IPubsubRouter): Note: the heartbeats are called with awaits because each heartbeat depends on the state changes in the preceding heartbeat """ + # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 + await asyncio.sleep(self.heartbeat_initial_delay) while True: + # Maintain mesh and keep track of which peers to send GRAFT or PRUNE to + peers_to_graft, peers_to_prune = self.mesh_heartbeat() + # Maintain fanout + self.fanout_heartbeat() + # Get the peers to send IHAVE to + peers_to_gossip = self.gossip_heartbeat() + # Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and send it + await self._emit_control_msgs( + peers_to_graft, peers_to_prune, peers_to_gossip + ) - await self.mesh_heartbeat() - await self.fanout_heartbeat() - await self.gossip_heartbeat() + self.mcache.shift() await asyncio.sleep(self.heartbeat_interval) - async def mesh_heartbeat(self) -> None: - # Note: the comments here are the exact pseudocode from the spec + def mesh_heartbeat( + self + ) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]: + peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list) + peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list) for topic in self.mesh: # Skip if no peers have subscribed to the topic if topic not in self.pubsub.peer_topics: @@ -324,15 +419,12 @@ class GossipSub(IPubsubRouter): topic, self.degree - num_mesh_peers_in_topic, self.mesh[topic] ) - fanout_peers_not_in_mesh: List[ID] = [ - peer for peer in selected_peers if peer not in self.mesh[topic] - ] - for peer in fanout_peers_not_in_mesh: + for peer in selected_peers: # Add peer to mesh[topic] self.mesh[topic].append(peer) # Emit GRAFT(topic) control message to peer - await self.emit_graft(topic, peer) + peers_to_graft[peer].append(topic) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] @@ -344,18 +436,31 @@ class GossipSub(IPubsubRouter): self.mesh[topic].remove(peer) # Emit PRUNE(topic) control message to peer - await self.emit_prune(topic, peer) + peers_to_prune[peer].append(topic) + return peers_to_graft, peers_to_prune - async def fanout_heartbeat(self) -> None: + def fanout_heartbeat(self) -> None: # Note: the comments here are the exact pseudocode from the spec for topic in self.fanout: - # If time since last published > ttl + # Delete topic entry if it's not in `pubsub.peer_topics` + # or if it's time-since-last-published > ttl # TODO: there's no way time_since_last_publish gets set anywhere yet - if self.time_since_last_publish[topic] > self.time_to_live: + if ( + topic not in self.pubsub.peer_topics + or self.time_since_last_publish[topic] > self.time_to_live + ): # Remove topic from fanout del self.fanout[topic] del self.time_since_last_publish[topic] else: + # Check if fanout peers are still in the topic and remove the ones that are not + # ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501 + in_topic_fanout_peers = [ + peer + for peer in self.fanout[topic] + if peer in self.pubsub.peer_topics[topic] + ] + self.fanout[topic] = in_topic_fanout_peers num_fanout_peers_in_topic = len(self.fanout[topic]) # If |fanout[topic]| < D @@ -369,51 +474,37 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - async def gossip_heartbeat(self) -> None: + def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]: + peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict) for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: - # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in a topic and only add them if they are gossipsub peers too if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] + topic, self.degree, self.mesh[topic] ) + msg_id_strs = [str(msg_id) for msg_id in msg_ids] for peer in peers_to_emit_ihave_to: - # TODO: this line is a monster, can hopefully be simplified - if ( - topic not in self.mesh or (peer not in self.mesh[topic]) - ) and ( - topic not in self.fanout or (peer not in self.fanout[topic]) - ): - msg_id_strs = [str(msg_id) for msg_id in msg_ids] - await self.emit_ihave(topic, msg_id_strs, peer) + peers_to_gossip[peer][topic] = msg_id_strs # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh for topic in self.fanout: - if topic not in self.mesh: - msg_ids = self.mcache.window(topic) - if msg_ids: - # TODO: Make more efficient, possibly using a generator? - # Get all pubsub peers in topic and only add if they are gossipsub peers also - if topic in self.pubsub.peer_topics: - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree, [] - ) - for peer in peers_to_emit_ihave_to: - if ( - peer not in self.mesh[topic] - and peer not in self.fanout[topic] - ): - - msg_id_strs = [str(msg) for msg in msg_ids] - await self.emit_ihave(topic, msg_id_strs, peer) - - self.mcache.shift() + msg_ids = self.mcache.window(topic) + if msg_ids: + # Get all pubsub peers in topic and only add if they are gossipsub peers also + if topic in self.pubsub.peer_topics: + # Select D peers from peers.gossipsub[topic] + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree, self.fanout[topic] + ) + msg_id_strs = [str(msg) for msg in msg_ids] + for peer in peers_to_emit_ihave_to: + peers_to_gossip[peer][topic] = msg_id_strs + return peers_to_gossip @staticmethod def select_from_minus( @@ -548,6 +639,21 @@ class GossipSub(IPubsubRouter): # RPC emitters + def pack_control_msgs( + self, + ihave_msgs: List[rpc_pb2.ControlIHave], + graft_msgs: List[rpc_pb2.ControlGraft], + prune_msgs: List[rpc_pb2.ControlPrune], + ) -> rpc_pb2.ControlMessage: + control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() + if ihave_msgs: + control_msg.ihave.extend(ihave_msgs) + if graft_msgs: + control_msg.graft.extend(graft_msgs) + if prune_msgs: + control_msg.prune.extend(prune_msgs) + return control_msg + async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: """Emit ihave message, sent to to_peer, for topic and msg_ids.""" diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index fab8024..25ab81e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -78,7 +78,6 @@ class Pubsub: topic_validators: Dict[str, TopicValidator] - # TODO: Be sure it is increased atomically everytime. counter: int # uint64 _tasks: List["asyncio.Future[Any]"] @@ -300,7 +299,6 @@ class Pubsub: logger.debug("Fail to add new peer %s: stream closed", peer_id) del self.peers[peer_id] return - # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. try: self.router.add_peer(peer_id, stream.get_protocol()) @@ -318,6 +316,7 @@ class Pubsub: for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: + # Delete the entry if no other peers left self.peer_topics[topic].remove(peer_id) self.router.remove_peer(peer_id) @@ -325,12 +324,9 @@ class Pubsub: logger.debug("removed dead peer %s", peer_id) async def handle_peer_queue(self) -> None: - """ - Continuously read from peer queue and each time a new peer is found, - open a stream to the peer using a supported pubsub protocol - TODO: Handle failure for when the peer does not support any of the - pubsub protocols we support - """ + """Continuously read from peer queue and each time a new peer is found, + open a stream to the peer using a supported pubsub protocol pubsub + protocols we support.""" while True: peer_id: ID = await self.peer_queue.get() # Add Peer @@ -364,6 +360,7 @@ class Pubsub: else: if sub_message.topicid in self.peer_topics: if origin_id in self.peer_topics[sub_message.topicid]: + # Delete the entry if no other peers left self.peer_topics[sub_message.topicid].remove(origin_id) # FIXME(mhchia): Change the function name? diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 34dade4..8c22d15 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -24,6 +24,7 @@ class GossipsubParams(NamedTuple): time_to_live: int = 30 gossip_window: int = 3 gossip_history: int = 5 + heartbeat_initial_delay: float = 0.1 heartbeat_interval: float = 0.5 diff --git a/libp2p/tools/factories.py b/libp2p/tools/factories.py index b189cfa..1b98eaa 100644 --- a/libp2p/tools/factories.py +++ b/libp2p/tools/factories.py @@ -142,6 +142,7 @@ class GossipsubFactory(factory.Factory): time_to_live = GOSSIPSUB_PARAMS.time_to_live gossip_window = GOSSIPSUB_PARAMS.gossip_window gossip_history = GOSSIPSUB_PARAMS.gossip_history + heartbeat_initial_delay = GOSSIPSUB_PARAMS.heartbeat_initial_delay heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 2121f8f..19ec07c 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -3,7 +3,8 @@ import random import pytest -from libp2p.tools.constants import GossipsubParams +from libp2p.peer.id import ID +from libp2p.tools.constants import GOSSIPSUB_PARAMS, GossipsubParams from libp2p.tools.pubsub.utils import dense_connect, one_to_all_connect from libp2p.tools.utils import connect @@ -147,8 +148,8 @@ async def test_handle_prune(pubsubs_gsub, hosts): await connect(hosts[index_alice], hosts[index_bob]) - # Wait 3 seconds for heartbeat to allow mesh to connect - await asyncio.sleep(3) + # Wait for heartbeat to allow mesh to connect + await asyncio.sleep(1) # Check that they are each other's mesh peer assert id_alice in gossipsubs[index_bob].mesh[topic] @@ -157,15 +158,16 @@ async def test_handle_prune(pubsubs_gsub, hosts): # alice emit prune message to bob, alice should be removed # from bob's mesh peer await gossipsubs[index_alice].emit_prune(topic, id_bob) + # `emit_prune` does not remove bob from alice's mesh peers + assert id_bob in gossipsubs[index_alice].mesh[topic] - # FIXME: This test currently works because the heartbeat interval - # is increased to 3 seconds, so alice won't get add back into - # bob's mesh peer during heartbeat. - await asyncio.sleep(1) + # NOTE: We increase `heartbeat_interval` to 3 seconds so that bob will not + # add alice back to his mesh after heartbeat. + # Wait for bob to `handle_prune` + await asyncio.sleep(0.1) # Check that alice is no longer bob's mesh peer assert id_alice not in gossipsubs[index_bob].mesh[topic] - assert id_bob in gossipsubs[index_alice].mesh[topic] @pytest.mark.parametrize("num_hosts", (10,)) @@ -366,3 +368,125 @@ async def test_gossip_propagation(hosts, pubsubs_gsub): # should be able to read message msg = await queue_1.get() assert msg.data == msg_content + + +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", ((1, GossipsubParams(heartbeat_initial_delay=100)),) +) +@pytest.mark.parametrize("initial_mesh_peer_count", (7, 10, 13)) +@pytest.mark.asyncio +async def test_mesh_heartbeat( + num_hosts, initial_mesh_peer_count, pubsubs_gsub, hosts, monkeypatch +): + # It's difficult to set up the initial peer subscription condition. + # Ideally I would like to have initial mesh peer count that's below ``GossipSubDegree`` + # so I can test if `mesh_heartbeat` return correct peers to GRAFT. + # The problem is that I can not set it up so that we have peers subscribe to the topic + # but not being part of our mesh peers (as these peers are the peers to GRAFT). + # So I monkeypatch the peer subscriptions and our mesh peers. + total_peer_count = 14 + topic = "TEST_MESH_HEARTBEAT" + + fake_peer_ids = [ + ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count) + ] + monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) + + peer_topics = {topic: fake_peer_ids} + # Monkeypatch the peer subscriptions + monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) + + mesh_peer_indices = random.sample(range(total_peer_count), initial_mesh_peer_count) + mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] + router_mesh = {topic: list(mesh_peers)} + # Monkeypatch our mesh peers + monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) + + peers_to_graft, peers_to_prune = pubsubs_gsub[0].router.mesh_heartbeat() + if initial_mesh_peer_count > GOSSIPSUB_PARAMS.degree: + # If number of initial mesh peers is more than `GossipSubDegree`, we should PRUNE mesh peers + assert len(peers_to_graft) == 0 + assert len(peers_to_prune) == initial_mesh_peer_count - GOSSIPSUB_PARAMS.degree + for peer in peers_to_prune: + assert peer in mesh_peers + elif initial_mesh_peer_count < GOSSIPSUB_PARAMS.degree: + # If number of initial mesh peers is less than `GossipSubDegree`, we should GRAFT more peers + assert len(peers_to_prune) == 0 + assert len(peers_to_graft) == GOSSIPSUB_PARAMS.degree - initial_mesh_peer_count + for peer in peers_to_graft: + assert peer not in mesh_peers + else: + assert len(peers_to_prune) == 0 and len(peers_to_graft) == 0 + + +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", ((1, GossipsubParams(heartbeat_initial_delay=100)),) +) +@pytest.mark.parametrize("initial_peer_count", (1, 4, 7)) +@pytest.mark.asyncio +async def test_gossip_heartbeat( + num_hosts, initial_peer_count, pubsubs_gsub, hosts, monkeypatch +): + # The problem is that I can not set it up so that we have peers subscribe to the topic + # but not being part of our mesh peers (as these peers are the peers to GRAFT). + # So I monkeypatch the peer subscriptions and our mesh peers. + total_peer_count = 28 + topic_mesh = "TEST_GOSSIP_HEARTBEAT_1" + topic_fanout = "TEST_GOSSIP_HEARTBEAT_2" + + fake_peer_ids = [ + ID((i).to_bytes(2, byteorder="big")) for i in range(total_peer_count) + ] + monkeypatch.setattr(pubsubs_gsub[0].router, "peers_gossipsub", fake_peer_ids) + + topic_mesh_peer_count = 14 + # Split into mesh peers and fanout peers + peer_topics = { + topic_mesh: fake_peer_ids[:topic_mesh_peer_count], + topic_fanout: fake_peer_ids[topic_mesh_peer_count:], + } + # Monkeypatch the peer subscriptions + monkeypatch.setattr(pubsubs_gsub[0], "peer_topics", peer_topics) + + mesh_peer_indices = random.sample(range(topic_mesh_peer_count), initial_peer_count) + mesh_peers = [fake_peer_ids[i] for i in mesh_peer_indices] + router_mesh = {topic_mesh: list(mesh_peers)} + # Monkeypatch our mesh peers + monkeypatch.setattr(pubsubs_gsub[0].router, "mesh", router_mesh) + fanout_peer_indices = random.sample( + range(topic_mesh_peer_count, total_peer_count), initial_peer_count + ) + fanout_peers = [fake_peer_ids[i] for i in fanout_peer_indices] + router_fanout = {topic_fanout: list(fanout_peers)} + # Monkeypatch our fanout peers + monkeypatch.setattr(pubsubs_gsub[0].router, "fanout", router_fanout) + + def window(topic): + if topic == topic_mesh: + return [topic_mesh] + elif topic == topic_fanout: + return [topic_fanout] + else: + return [] + + # Monkeypatch the memory cache messages + monkeypatch.setattr(pubsubs_gsub[0].router.mcache, "window", window) + + peers_to_gossip = pubsubs_gsub[0].router.gossip_heartbeat() + # If our mesh peer count is less than `GossipSubDegree`, we should gossip to up to + # `GossipSubDegree` peers (exclude mesh peers). + if topic_mesh_peer_count - initial_peer_count < GOSSIPSUB_PARAMS.degree: + # The same goes for fanout so it's two times the number of peers to gossip. + assert len(peers_to_gossip) == 2 * (topic_mesh_peer_count - initial_peer_count) + elif topic_mesh_peer_count - initial_peer_count >= GOSSIPSUB_PARAMS.degree: + assert len(peers_to_gossip) == 2 * (GOSSIPSUB_PARAMS.degree) + + for peer in peers_to_gossip: + if peer in peer_topics[topic_mesh]: + # Check that the peer to gossip to is not in our mesh peers + assert peer not in mesh_peers + assert topic_mesh in peers_to_gossip[peer] + elif peer in peer_topics[topic_fanout]: + # Check that the peer to gossip to is not in our fanout peers + assert peer not in fanout_peers + assert topic_fanout in peers_to_gossip[peer]