From 7f20ab781d8c5d46a5d6e450cb2b8845d9df4124 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 3 Sep 2019 23:37:34 +0800 Subject: [PATCH] Fix gosssipsub tests --- libp2p/pubsub/gossipsub.py | 26 ++++++++++---------------- libp2p/pubsub/pubsub.py | 4 ---- tests/pubsub/test_gossipsub.py | 4 +++- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 64a56d1..f2b4521 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -4,6 +4,7 @@ import random from typing import Any, Dict, Iterable, List, Sequence, Set from libp2p.peer.id import ID +from libp2p.pubsub import floodsub from libp2p.typing import TProtocol from libp2p.utils import encode_varint_prefixed @@ -107,16 +108,19 @@ class GossipSub(IPubsubRouter): :param peer_id: id of peer to add :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub """ - - # Add peer to the correct peer list - peer_type = GossipSub.get_peer_type(protocol_id) - self.peers_to_protocol[peer_id] = protocol_id - if peer_type == "gossip": + if protocol_id == PROTOCOL_ID: self.peers_gossipsub.append(peer_id) - elif peer_type == "flood": + elif protocol_id == floodsub.PROTOCOL_ID: self.peers_floodsub.append(peer_id) + else: + # We should never enter here. Becuase the `protocol_id` is registered by your pubsub + # instance in multistream-select, but it is not the protocol that gossipsub supports, + # what we check above. In this case, probably we registered gossipsub to a wrong + # `protocol_id` in multistream-select, or wrong versions. + # TODO: Better handling + raise Exception(f"protocol is not supported: protocol_id={protocol_id}") def remove_peer(self, peer_id: ID) -> None: """ @@ -267,16 +271,6 @@ class GossipSub(IPubsubRouter): # Forget mesh[topic] self.mesh.pop(topic, None) - # Interface Helper Functions - @staticmethod - def get_peer_type(protocol_id: str) -> str: - # TODO: Do this in a better, more efficient way - if "gossipsub" in protocol_id: - return "gossip" - if "floodsub" in protocol_id: - return "flood" - return "unknown" - # Heartbeat async def heartbeat(self) -> None: """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index c26bc54..46d2ee9 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -227,12 +227,8 @@ class Pubsub: await self.continuously_read_stream(stream) async def _handle_new_peer(self, peer_id: ID) -> None: - # Open a stream to peer on existing connection - # (we know connection exists since that's the only way - # an element gets added to peer_queue) stream: INetStream = await self.host.new_stream(peer_id, self.protocols) - # Map peer to stream self.peers[peer_id] = stream # Send hello packet diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index a0a8a28..7a0efc2 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -140,7 +140,9 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): "num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),) ) @pytest.mark.asyncio -async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs): +async def test_handle_prune(pubsubs_gsub, hosts): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) + index_alice = 0 id_alice = hosts[index_alice].get_id() index_bob = 1