Fix gosssipsub tests

This commit is contained in:
mhchia 2019-09-03 23:37:34 +08:00
parent d7bce941d8
commit 7f20ab781d
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
3 changed files with 13 additions and 21 deletions

View File

@ -4,6 +4,7 @@ import random
from typing import Any, Dict, Iterable, List, Sequence, Set from typing import Any, Dict, Iterable, List, Sequence, Set
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub import floodsub
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed from libp2p.utils import encode_varint_prefixed
@ -107,16 +108,19 @@ class GossipSub(IPubsubRouter):
:param peer_id: id of peer to add :param peer_id: id of peer to add
:param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub :param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub
""" """
# Add peer to the correct peer list
peer_type = GossipSub.get_peer_type(protocol_id)
self.peers_to_protocol[peer_id] = protocol_id self.peers_to_protocol[peer_id] = protocol_id
if peer_type == "gossip": if protocol_id == PROTOCOL_ID:
self.peers_gossipsub.append(peer_id) self.peers_gossipsub.append(peer_id)
elif peer_type == "flood": elif protocol_id == floodsub.PROTOCOL_ID:
self.peers_floodsub.append(peer_id) self.peers_floodsub.append(peer_id)
else:
# We should never enter here. Becuase the `protocol_id` is registered by your pubsub
# instance in multistream-select, but it is not the protocol that gossipsub supports,
# what we check above. In this case, probably we registered gossipsub to a wrong
# `protocol_id` in multistream-select, or wrong versions.
# TODO: Better handling
raise Exception(f"protocol is not supported: protocol_id={protocol_id}")
def remove_peer(self, peer_id: ID) -> None: def remove_peer(self, peer_id: ID) -> None:
""" """
@ -267,16 +271,6 @@ class GossipSub(IPubsubRouter):
# Forget mesh[topic] # Forget mesh[topic]
self.mesh.pop(topic, None) self.mesh.pop(topic, None)
# Interface Helper Functions
@staticmethod
def get_peer_type(protocol_id: str) -> str:
# TODO: Do this in a better, more efficient way
if "gossipsub" in protocol_id:
return "gossip"
if "floodsub" in protocol_id:
return "flood"
return "unknown"
# Heartbeat # Heartbeat
async def heartbeat(self) -> None: async def heartbeat(self) -> None:
""" """

View File

@ -227,12 +227,8 @@ class Pubsub:
await self.continuously_read_stream(stream) await self.continuously_read_stream(stream)
async def _handle_new_peer(self, peer_id: ID) -> None: async def _handle_new_peer(self, peer_id: ID) -> None:
# Open a stream to peer on existing connection
# (we know connection exists since that's the only way
# an element gets added to peer_queue)
stream: INetStream = await self.host.new_stream(peer_id, self.protocols) stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
# Map peer to stream
self.peers[peer_id] = stream self.peers[peer_id] = stream
# Send hello packet # Send hello packet

View File

@ -140,7 +140,9 @@ async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch):
"num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),) "num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),)
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs): async def test_handle_prune(pubsubs_gsub, hosts):
gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub)
index_alice = 0 index_alice = 0
id_alice = hosts[index_alice].get_id() id_alice = hosts[index_alice].get_id()
index_bob = 1 index_bob = 1