Merge pull request #184 from NIC619/fix_handle_graft
Fix `gossipsub.handle_graft`
This commit is contained in:
commit
4819959e5a
|
@ -204,8 +204,9 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
# Add fanout peers to mesh and notifies them with a GRAFT(topic) control message.
|
||||
for peer in fanout_peers:
|
||||
self.mesh[topic].append(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
if peer not in self.mesh[topic]:
|
||||
self.mesh[topic].append(peer)
|
||||
await self.emit_graft(topic, peer)
|
||||
|
||||
if topic_in_fanout:
|
||||
del self.fanout[topic]
|
||||
|
@ -281,7 +282,12 @@ class GossipSub(IPubsubRouter):
|
|||
self.mesh[topic]
|
||||
)
|
||||
|
||||
for peer in selected_peers:
|
||||
fanout_peers_not_in_mesh = [
|
||||
peer
|
||||
for peer in selected_peers
|
||||
if peer not in self.mesh[topic]
|
||||
]
|
||||
for peer in fanout_peers_not_in_mesh:
|
||||
# Add peer to mesh[topic]
|
||||
self.mesh[topic].append(peer)
|
||||
|
||||
|
@ -460,9 +466,11 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
# Add peer to mesh for topic
|
||||
if topic in self.mesh:
|
||||
self.mesh[topic].append(from_id_str)
|
||||
if from_id_str not in self.mesh[topic]:
|
||||
self.mesh[topic].append(from_id_str)
|
||||
else:
|
||||
self.mesh[topic] = [from_id_str]
|
||||
# Respond with PRUNE if not subscribed to the topic
|
||||
await self.emit_prune(topic, sender_peer_id)
|
||||
|
||||
async def handle_prune(self, prune_msg, sender_peer_id):
|
||||
topic = prune_msg.topicID
|
||||
|
|
|
@ -80,8 +80,8 @@ async def test_leave():
|
|||
|
||||
# Create pubsub, gossipsub instances
|
||||
_, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
|
||||
SUPPORTED_PROTOCOLS, \
|
||||
10, 9, 11, 30, 3, 5, 0.5)
|
||||
SUPPORTED_PROTOCOLS, \
|
||||
10, 9, 11, 30, 3, 5, 0.5)
|
||||
|
||||
gossipsub = gossipsubs[0]
|
||||
topic = "test_leave"
|
||||
|
@ -98,6 +98,110 @@ async def test_leave():
|
|||
await cleanup()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_graft(event_loop, monkeypatch):
|
||||
num_hosts = 2
|
||||
libp2p_hosts = await create_libp2p_hosts(num_hosts)
|
||||
|
||||
# Create pubsub, gossipsub instances
|
||||
_, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
|
||||
SUPPORTED_PROTOCOLS, \
|
||||
10, 9, 11, 30, 3, 5, 0.5)
|
||||
|
||||
index_alice = 0
|
||||
id_alice = str(libp2p_hosts[index_alice].get_id())
|
||||
index_bob = 1
|
||||
id_bob = str(libp2p_hosts[index_bob].get_id())
|
||||
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob])
|
||||
|
||||
# Wait 2 seconds for heartbeat to allow mesh to connect
|
||||
await asyncio.sleep(2)
|
||||
|
||||
topic = "test_handle_graft"
|
||||
# Only lice subscribe to the topic
|
||||
await gossipsubs[index_alice].join(topic)
|
||||
|
||||
# Monkey patch bob's `emit_prune` function so we can
|
||||
# check if it is called in `handle_graft`
|
||||
event_emit_prune = asyncio.Event()
|
||||
async def emit_prune(topic, sender_peer_id):
|
||||
event_emit_prune.set()
|
||||
|
||||
monkeypatch.setattr(gossipsubs[index_bob], 'emit_prune', emit_prune)
|
||||
|
||||
# Check that alice is bob's peer but not his mesh peer
|
||||
assert id_alice in gossipsubs[index_bob].peers_gossipsub
|
||||
assert topic not in gossipsubs[index_bob].mesh
|
||||
|
||||
await gossipsubs[index_alice].emit_graft(topic, id_bob)
|
||||
|
||||
# Check that `emit_prune` is called
|
||||
await asyncio.wait_for(
|
||||
event_emit_prune.wait(),
|
||||
timeout=1,
|
||||
loop=event_loop,
|
||||
)
|
||||
assert event_emit_prune.is_set()
|
||||
|
||||
# Check that bob is alice's peer but not her mesh peer
|
||||
assert topic in gossipsubs[index_alice].mesh
|
||||
assert id_bob not in gossipsubs[index_alice].mesh[topic]
|
||||
assert id_bob in gossipsubs[index_alice].peers_gossipsub
|
||||
|
||||
await gossipsubs[index_bob].emit_graft(topic, id_alice)
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Check that bob is now alice's mesh peer
|
||||
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
||||
|
||||
await cleanup()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_prune():
|
||||
num_hosts = 2
|
||||
libp2p_hosts = await create_libp2p_hosts(num_hosts)
|
||||
|
||||
# Create pubsub, gossipsub instances
|
||||
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
|
||||
SUPPORTED_PROTOCOLS, \
|
||||
10, 9, 11, 30, 3, 5, 3)
|
||||
|
||||
index_alice = 0
|
||||
id_alice = str(libp2p_hosts[index_alice].get_id())
|
||||
index_bob = 1
|
||||
id_bob = str(libp2p_hosts[index_bob].get_id())
|
||||
|
||||
topic = "test_handle_prune"
|
||||
for pubsub in pubsubs:
|
||||
await pubsub.subscribe(topic)
|
||||
|
||||
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob])
|
||||
|
||||
# Wait 3 seconds for heartbeat to allow mesh to connect
|
||||
await asyncio.sleep(3)
|
||||
|
||||
# Check that they are each other's mesh peer
|
||||
assert id_alice in gossipsubs[index_bob].mesh[topic]
|
||||
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
||||
|
||||
# alice emit prune message to bob, alice should be removed
|
||||
# from bob's mesh peer
|
||||
await gossipsubs[index_alice].emit_prune(topic, id_bob)
|
||||
|
||||
# FIXME: This test currently works because the heartbeat interval
|
||||
# is increased to 3 seconds, so alice won't get add back into
|
||||
# bob's mesh peer during heartbeat.
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Check that alice is no longer bob's mesh peer
|
||||
assert id_alice not in gossipsubs[index_bob].mesh[topic]
|
||||
assert id_bob in gossipsubs[index_alice].mesh[topic]
|
||||
|
||||
await cleanup()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dense():
|
||||
# Create libp2p hosts
|
||||
|
|
Loading…
Reference in New Issue
Block a user