From e52bfe3a51a46057cb5735eb2f845b0994ac9e6b Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 18 Jul 2019 22:37:44 +0800 Subject: [PATCH 1/5] Fix: Respond GRAFT with PRUNE if not subscribed to the topic --- libp2p/pubsub/gossipsub.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 01576ae..f1d057c 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -462,7 +462,8 @@ class GossipSub(IPubsubRouter): if topic in self.mesh: self.mesh[topic].append(from_id_str) else: - self.mesh[topic] = [from_id_str] + # Respond with PRUNE if not subscribed to the topic + await self.emit_prune(topic, sender_peer_id) async def handle_prune(self, prune_msg, sender_peer_id): topic = prune_msg.topicID From 42093e40ec8aa5ee9c236bc92c3005be20d2063f Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 21 Jul 2019 21:22:20 +0800 Subject: [PATCH 2/5] Add `handle_graft` test --- tests/pubsub/test_gossipsub.py | 62 ++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index a8f58d3..9784b05 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -80,8 +80,8 @@ async def test_leave(): # Create pubsub, gossipsub instances _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) gossipsub = gossipsubs[0] topic = "test_leave" @@ -98,6 +98,64 @@ async def test_leave(): await cleanup() +@pytest.mark.asyncio +async def test_handle_graft(event_loop, monkeypatch): + num_hosts = 2 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + index_alice = 0 + id_alice = str(libp2p_hosts[index_alice].get_id()) + index_bob = 1 + id_bob = str(libp2p_hosts[index_bob].get_id()) + await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + topic = "test_handle_graft" + # Only lice subscribe to the topic + await gossipsubs[index_alice].join(topic) + + # Monkey patch bob's `emit_prune` function so we can + # check if it is called in `handle_graft` + event_emit_prune = asyncio.Event() + async def emit_prune(topic, sender_peer_id): + event_emit_prune.set() + + monkeypatch.setattr(gossipsubs[index_bob], 'emit_prune', emit_prune) + + # Check that alice is bob's peer but not his mesh peer + assert id_alice in gossipsubs[index_bob].peers_gossipsub + assert topic not in gossipsubs[index_bob].mesh + await gossipsubs[index_alice].emit_graft(topic, id_bob) + + # Check that `emit_prune` is called + await asyncio.wait_for( + event_emit_prune.wait(), + timeout=1, + loop=event_loop, + ) + assert event_emit_prune.is_set() + + # Check that bob is alice's peer but not her mesh peer + assert topic in gossipsubs[index_alice].mesh + assert id_bob not in gossipsubs[index_alice].mesh[topic] + assert id_bob in gossipsubs[index_alice].peers_gossipsub + await gossipsubs[index_bob].emit_graft(topic, id_alice) + + await asyncio.sleep(1) + + # Check that bob is now alice's mesh peer + assert id_bob in gossipsubs[index_alice].mesh[topic] + + await cleanup() + + @pytest.mark.asyncio async def test_dense(): # Create libp2p hosts From 99252e49f88c113a5177763a4eb67513cc0c8e97 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 21 Jul 2019 22:28:17 +0800 Subject: [PATCH 3/5] Prevent re-adding peers to mesh --- libp2p/pubsub/gossipsub.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index f1d057c..31ab606 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -204,8 +204,9 @@ class GossipSub(IPubsubRouter): # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + if peer not in self.mesh[topic]: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) if topic_in_fanout: del self.fanout[topic] @@ -281,7 +282,12 @@ class GossipSub(IPubsubRouter): self.mesh[topic] ) - for peer in selected_peers: + fanout_peers_not_in_mesh = [ + peer + for peer in selected_peers + if peer not in self.mesh[topic] + ] + for peer in fanout_peers_not_in_mesh: # Add peer to mesh[topic] self.mesh[topic].append(peer) @@ -460,7 +466,8 @@ class GossipSub(IPubsubRouter): # Add peer to mesh for topic if topic in self.mesh: - self.mesh[topic].append(from_id_str) + if from_id_str not in self.mesh[topic]: + self.mesh[topic].append(from_id_str) else: # Respond with PRUNE if not subscribed to the topic await self.emit_prune(topic, sender_peer_id) From 99eabe49eb5e5123ee7e29f576be7918e79d81b4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 21 Jul 2019 22:28:43 +0800 Subject: [PATCH 4/5] Add `handle_prune` test --- tests/pubsub/test_gossipsub.py | 46 ++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 9784b05..b827f9c 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -132,6 +132,7 @@ async def test_handle_graft(event_loop, monkeypatch): # Check that alice is bob's peer but not his mesh peer assert id_alice in gossipsubs[index_bob].peers_gossipsub assert topic not in gossipsubs[index_bob].mesh + await gossipsubs[index_alice].emit_graft(topic, id_bob) # Check that `emit_prune` is called @@ -146,6 +147,7 @@ async def test_handle_graft(event_loop, monkeypatch): assert topic in gossipsubs[index_alice].mesh assert id_bob not in gossipsubs[index_alice].mesh[topic] assert id_bob in gossipsubs[index_alice].peers_gossipsub + await gossipsubs[index_bob].emit_graft(topic, id_alice) await asyncio.sleep(1) @@ -156,6 +158,50 @@ async def test_handle_graft(event_loop, monkeypatch): await cleanup() +@pytest.mark.asyncio +async def test_handle_prune(): + num_hosts = 2 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 2) + + index_alice = 0 + id_alice = str(libp2p_hosts[index_alice].get_id()) + index_bob = 1 + id_bob = str(libp2p_hosts[index_bob].get_id()) + + topic = "test_handle_prune" + await gossipsubs[index_alice].join(topic) + await gossipsubs[index_bob].join(topic) + + await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) + + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + # Check that they are each other's mesh peer + assert id_alice in gossipsubs[index_bob].mesh[topic] + assert id_bob in gossipsubs[index_alice].mesh[topic] + + # alice emit prune message to bob, alice should be removed + # from bob's mesh peer + await gossipsubs[index_alice].emit_prune(topic, id_bob) + + # FIXME: This test currently works because the heartbeat interval + # is increased to 2 seconds, so alice won't get add back into + # bob's mesh peer during heartbeat. + await asyncio.sleep(1) + + # Check that alice is no longer bob's mesh peer + assert id_alice not in gossipsubs[index_bob].mesh[topic] + assert id_bob in gossipsubs[index_alice].mesh[topic] + + await cleanup() + + @pytest.mark.asyncio async def test_dense(): # Create libp2p hosts From b0b4ddd0cac92d7812b5ba97063866a9ebb3fc95 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Wed, 24 Jul 2019 11:35:14 +0800 Subject: [PATCH 5/5] Fix `test_handle_prune` --- tests/pubsub/test_gossipsub.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index b827f9c..bb47135 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -164,9 +164,9 @@ async def test_handle_prune(): libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 2) + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 3) index_alice = 0 id_alice = str(libp2p_hosts[index_alice].get_id()) @@ -174,13 +174,13 @@ async def test_handle_prune(): id_bob = str(libp2p_hosts[index_bob].get_id()) topic = "test_handle_prune" - await gossipsubs[index_alice].join(topic) - await gossipsubs[index_bob].join(topic) + for pubsub in pubsubs: + await pubsub.subscribe(topic) await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) - # Wait 2 seconds for heartbeat to allow mesh to connect - await asyncio.sleep(2) + # Wait 3 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(3) # Check that they are each other's mesh peer assert id_alice in gossipsubs[index_bob].mesh[topic] @@ -191,7 +191,7 @@ async def test_handle_prune(): await gossipsubs[index_alice].emit_prune(topic, id_bob) # FIXME: This test currently works because the heartbeat interval - # is increased to 2 seconds, so alice won't get add back into + # is increased to 3 seconds, so alice won't get add back into # bob's mesh peer during heartbeat. await asyncio.sleep(1)