From b5c3420c16c31edf9c47a72a649480df3f75f69b Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 15 Jul 2019 17:13:46 +0800 Subject: [PATCH 01/15] Refactor `gossipsub.join` --- libp2p/pubsub/gossipsub.py | 54 ++++++++++++++------------------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index b2e6f38..360143a 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -184,43 +184,29 @@ class GossipSub(IPubsubRouter): # Create mesh[topic] if it does not yet exist self.mesh[topic] = [] - if topic in self.fanout and len(self.fanout[topic]) == self.degree: - # If router already has D peers from the fanout peers of a topic - # TODO: Do we remove all peers from fanout[topic]? + topic_in_fanout = topic in self.fanout + fanout_peers = self.fanout[topic] if topic_in_fanout else [] + fanout_size = len(fanout_peers) + if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): + # There are less than D peers (let this number be x) + # in the fanout for a topic (or the topic is not in the fanout). + # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + selected_peers = \ + GossipSub.select_from_minus(self.degree - fanout_size, + gossipsub_peers_in_topic, + fanout_peers) - # Add them to mesh[topic], and notifies them with a - # GRAFT(topic) control message. - for peer in self.fanout[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) - else: - # Otherwise, if there are less than D peers - # (let this number be x) in the fanout for a topic (or the topic is not in the fanout), - fanout_size = 0 - if topic in self.fanout: - fanout_size = len(self.fanout[topic]) - # then it still adds them as above (if there are any) - for peer in self.fanout[topic]: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) + # Combine fanout peers with selected peers + fanout_peers += selected_peers - if topic in self.peers_gossipsub: - # TODO: Should we have self.fanout[topic] here or [] (as the minus variable)? - # Selects the remaining number of peers (D-x) from peers.gossipsub[topic] - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - selected_peers = \ - GossipSub.select_from_minus(self.degree - fanout_size, - gossipsub_peers_in_topic, - self.fanout[topic] if topic in self.fanout else []) + # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. + for peer in fanout_peers: + self.mesh[topic].append(peer) + await self.emit_graft(topic, peer) - # And likewise adds them to mesh[topic] and notifies them with a - # GRAFT(topic) control message. - for peer in selected_peers: - self.mesh[topic].append(peer) - await self.emit_graft(topic, peer) - - # TODO: Do we remove all peers from fanout[topic]? + # TODO: Do we remove all peers from fanout[topic]? async def leave(self, topic): # Note: the comments here are the near-exact algorithm description from the spec From 404dc67e831ea2d2bb4ae3a9b23e35f9691bd501 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 18 Jul 2019 19:39:01 +0800 Subject: [PATCH 02/15] Fix: prevent selecting peers from topic not in peer topics --- libp2p/pubsub/gossipsub.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 360143a..39bd8e3 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -191,15 +191,16 @@ class GossipSub(IPubsubRouter): # There are less than D peers (let this number be x) # in the fanout for a topic (or the topic is not in the fanout). # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - selected_peers = \ - GossipSub.select_from_minus(self.degree - fanout_size, - gossipsub_peers_in_topic, - fanout_peers) + if topic in self.pubsub.peer_topics: + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] + selected_peers = \ + GossipSub.select_from_minus(self.degree - fanout_size, + gossipsub_peers_in_topic, + fanout_peers) - # Combine fanout peers with selected peers - fanout_peers += selected_peers + # Combine fanout peers with selected peers + fanout_peers += selected_peers # Add fanout peers to mesh and notifies them with a GRAFT(topic) control message. for peer in fanout_peers: From a26c7783d64271d231dbb83cef9f4a9c44623a7c Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 18 Jul 2019 19:39:57 +0800 Subject: [PATCH 03/15] Add `one_to_all_connect` --- tests/pubsub/utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index 6691661..e406536 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -122,3 +122,8 @@ async def connect_some(hosts, degree): # await connect(host, neighbor) # j += 1 + +async def one_to_all_connect(hosts, central_host_index): + for i, host in enumerate(hosts): + if i != central_host_index: + await connect(hosts[central_host_index], host) From fd1f318b0cf7fed774cfd5f593a7da62093a8709 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 18 Jul 2019 22:40:05 +0800 Subject: [PATCH 04/15] Fix: in mesh heartbeat, select from gossipsub peers subscribed to the topic --- libp2p/pubsub/gossipsub.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 39bd8e3..6cf4380 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -267,10 +267,12 @@ class GossipSub(IPubsubRouter): num_mesh_peers_in_topic = len(self.mesh[topic]) if num_mesh_peers_in_topic < self.degree_low: + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] selected_peers = GossipSub.select_from_minus(self.degree - num_mesh_peers_in_topic, - self.peers_gossipsub, self.mesh[topic]) + gossipsub_peers_in_topic, self.mesh[topic]) for peer in selected_peers: # Add peer to mesh[topic] From 2c1c8dc8cf1898991fbf43551d8b9ee8a18f7cef Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 19 Jul 2019 12:56:15 +0800 Subject: [PATCH 05/15] Add `gossipsub.join` test --- tests/pubsub/test_gossipsub.py | 42 ++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 1f9b859..50f65ff 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -8,7 +8,7 @@ from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from utils import message_id_generator, generate_RPC_packet, \ create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \ - connect + connect, one_to_all_connect from tests.utils import cleanup SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] @@ -16,25 +16,47 @@ SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] @pytest.mark.asyncio async def test_join(): - num_hosts = 1 + # Create libp2p hosts + next_msg_id_func = message_id_generator(0) + + num_hosts = 10 + hosts_indices = list(range(num_hosts)) libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ SUPPORTED_PROTOCOLS, \ 10, 9, 11, 30, 3, 5, 0.5) - gossipsub = gossipsubs[0] topic = "test_join" + central_node_index = 0 + # Remove index of central host from the indices + hosts_indices.remove(central_node_index) + num_subscribed_peer = 6 + subscribed_peer_indices = random.sample(hosts_indices, num_subscribed_peer) - assert topic not in gossipsub.mesh - await gossipsub.join(topic) - assert topic in gossipsub.mesh + # All pubsub except the one of central node subscribe to topic + for i in subscribed_peer_indices: + q = await pubsubs[i].subscribe(topic) - # Test re-join - await gossipsub.join(topic) + # Connect central host to all other hosts + await one_to_all_connect(libp2p_hosts, central_node_index) - await cleanup() + # Wait 2 seconds for heartbeat to allow mesh to connect + await asyncio.sleep(2) + + # Check that the pubsub of central node does not have mesh or fanout for the topic + assert topic not in gossipsubs[central_node_index].fanout + assert topic not in gossipsubs[central_node_index].mesh + + # Central node subscribe message origin + await pubsubs[central_node_index].subscribe(topic) + + for i in hosts_indices: + if i in subscribed_peer_indices: + assert str(libp2p_hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic] + else: + assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] @pytest.mark.asyncio From 41141c028bf31bc00e72600376f0b0b0298fe1cc Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 19 Jul 2019 19:40:26 +0800 Subject: [PATCH 06/15] FIx: check topic exist in `pubsub.peer_topics` --- libp2p/pubsub/gossipsub.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 6cf4380..3df5b78 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -267,19 +267,20 @@ class GossipSub(IPubsubRouter): num_mesh_peers_in_topic = len(self.mesh[topic]) if num_mesh_peers_in_topic < self.degree_low: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] + if topic in self.pubsub.peer_topics: + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] - # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] - selected_peers = GossipSub.select_from_minus(self.degree - num_mesh_peers_in_topic, - gossipsub_peers_in_topic, self.mesh[topic]) + # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] + selected_peers = GossipSub.select_from_minus(self.degree - num_mesh_peers_in_topic, + gossipsub_peers_in_topic, self.mesh[topic]) - for peer in selected_peers: - # Add peer to mesh[topic] - self.mesh[topic].append(peer) + for peer in selected_peers: + # Add peer to mesh[topic] + self.mesh[topic].append(peer) - # Emit GRAFT(topic) control message to peer - await self.emit_graft(topic, peer) + # Emit GRAFT(topic) control message to peer + await self.emit_graft(topic, peer) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] From 14ee44c54942dfc2ff1e6bd6881c8917c5e30653 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 19 Jul 2019 19:43:12 +0800 Subject: [PATCH 07/15] Lint test and add cleanup to the end --- tests/pubsub/test_gossipsub.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 50f65ff..f267d39 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -2,10 +2,6 @@ import asyncio import pytest import random -from libp2p.pubsub.gossipsub import GossipSub -from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.pb import rpc_pb2 -from libp2p.pubsub.pubsub import Pubsub from utils import message_id_generator, generate_RPC_packet, \ create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \ connect, one_to_all_connect @@ -17,8 +13,6 @@ SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] @pytest.mark.asyncio async def test_join(): # Create libp2p hosts - next_msg_id_func = message_id_generator(0) - num_hosts = 10 hosts_indices = list(range(num_hosts)) libp2p_hosts = await create_libp2p_hosts(num_hosts) @@ -37,7 +31,7 @@ async def test_join(): # All pubsub except the one of central node subscribe to topic for i in subscribed_peer_indices: - q = await pubsubs[i].subscribe(topic) + await pubsubs[i].subscribe(topic) # Connect central host to all other hosts await one_to_all_connect(libp2p_hosts, central_node_index) @@ -58,6 +52,8 @@ async def test_join(): else: assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] + await cleanup() + @pytest.mark.asyncio async def test_leave(): @@ -128,11 +124,9 @@ async def test_dense(): await asyncio.sleep(0.5) # Assert that all blocking queues receive the message - items = [] for queue in queues: msg = await queue.get() assert msg.data == packet.publish[0].data - items.append(msg.data) await cleanup() @pytest.mark.asyncio From 4ab99485a6f7c3825baa3c4de5d436dff4257bca Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 21 Jul 2019 23:32:54 +0800 Subject: [PATCH 08/15] Fix lint error --- libp2p/pubsub/gossipsub.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3df5b78..c29effd 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -269,11 +269,14 @@ class GossipSub(IPubsubRouter): if num_mesh_peers_in_topic < self.degree_low: if topic in self.pubsub.peer_topics: gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] + if peer in self.peers_gossipsub] # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] - selected_peers = GossipSub.select_from_minus(self.degree - num_mesh_peers_in_topic, - gossipsub_peers_in_topic, self.mesh[topic]) + selected_peers = GossipSub.select_from_minus( + self.degree - num_mesh_peers_in_topic, + gossipsub_peers_in_topic, + self.mesh[topic] + ) for peer in selected_peers: # Add peer to mesh[topic] From 67f9edb77daf0253de7223c22f13ecd25bbceb77 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 22 Jul 2019 19:28:07 +0800 Subject: [PATCH 09/15] Remove fanout topic after joining the topic --- libp2p/pubsub/gossipsub.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index c29effd..e585566 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -207,7 +207,8 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) await self.emit_graft(topic, peer) - # TODO: Do we remove all peers from fanout[topic]? + if topic_in_fanout: + del self.fanout[topic] async def leave(self, topic): # Note: the comments here are the near-exact algorithm description from the spec @@ -303,7 +304,7 @@ class GossipSub(IPubsubRouter): # TODO: there's no way time_since_last_publish gets set anywhere yet if self.time_since_last_publish[topic] > self.time_to_live: # Remove topic from fanout - self.fanout.remove(topic) + del self.fanout[topic] self.time_since_last_publish.remove(topic) else: num_fanout_peers_in_topic = len(self.fanout[topic]) From cdbeb6387914a6fdc1d19265235bf36b4574734e Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 22 Jul 2019 19:28:12 +0800 Subject: [PATCH 10/15] Add test --- tests/pubsub/test_gossipsub.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index f267d39..3993602 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -39,13 +39,27 @@ async def test_join(): # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) - # Check that the pubsub of central node does not have mesh or fanout for the topic - assert topic not in gossipsubs[central_node_index].fanout + # Central node publish to the topic so that this topic + # is added to central node's fanout + next_msg_id_func = message_id_generator(0) + msg_content = "" + host_id = str(libp2p_hosts[central_node_index].get_id()) + # Generate message packet + packet = generate_RPC_packet(host_id, [topic], msg_content, next_msg_id_func()) + # publish from the randomly chosen host + await gossipsubs[central_node_index].publish(host_id, packet.SerializeToString()) + + # Check that the gossipsub of central node has fanout for the topic + assert topic in gossipsubs[central_node_index].fanout + # Check that the gossipsub of central node does not has mesh for the topic assert topic not in gossipsubs[central_node_index].mesh # Central node subscribe message origin await pubsubs[central_node_index].subscribe(topic) + # Check that the gossipsub of central node no longer has fanout for the topic + assert topic not in gossipsubs[central_node_index].fanout + for i in hosts_indices: if i in subscribed_peer_indices: assert str(libp2p_hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic] From afc853a7769f658f12eee118b6548b310880c53c Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 22 Jul 2019 23:22:07 +0800 Subject: [PATCH 11/15] Apply PR feedback --- libp2p/pubsub/gossipsub.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index e585566..01576ae 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -265,26 +265,28 @@ class GossipSub(IPubsubRouter): async def mesh_heartbeat(self): # Note: the comments here are the exact pseudocode from the spec for topic in self.mesh: + # Skip if no peers have subscribed to the topic + if topic not in self.pubsub.peer_topics: + continue num_mesh_peers_in_topic = len(self.mesh[topic]) if num_mesh_peers_in_topic < self.degree_low: - if topic in self.pubsub.peer_topics: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] + gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] + if peer in self.peers_gossipsub] - # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] - selected_peers = GossipSub.select_from_minus( - self.degree - num_mesh_peers_in_topic, - gossipsub_peers_in_topic, - self.mesh[topic] - ) + # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] + selected_peers = GossipSub.select_from_minus( + self.degree - num_mesh_peers_in_topic, + gossipsub_peers_in_topic, + self.mesh[topic] + ) - for peer in selected_peers: - # Add peer to mesh[topic] - self.mesh[topic].append(peer) + for peer in selected_peers: + # Add peer to mesh[topic] + self.mesh[topic].append(peer) - # Emit GRAFT(topic) control message to peer - await self.emit_graft(topic, peer) + # Emit GRAFT(topic) control message to peer + await self.emit_graft(topic, peer) if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] From 3c3a9ac90b905184698722cb414fa5b9eb18073d Mon Sep 17 00:00:00 2001 From: NIC Lin Date: Tue, 23 Jul 2019 16:37:01 +0800 Subject: [PATCH 12/15] Update tests/pubsub/test_gossipsub.py Co-Authored-By: Kevin Mai-Husan Chia --- tests/pubsub/test_gossipsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 3993602..6dd7e44 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -51,7 +51,7 @@ async def test_join(): # Check that the gossipsub of central node has fanout for the topic assert topic in gossipsubs[central_node_index].fanout - # Check that the gossipsub of central node does not has mesh for the topic + # Check that the gossipsub of central node does not have a mesh for the topic assert topic not in gossipsubs[central_node_index].mesh # Central node subscribe message origin From 042e0ac916fe8999b8a99c31f61a413a084abc10 Mon Sep 17 00:00:00 2001 From: NIC Lin Date: Tue, 23 Jul 2019 16:37:41 +0800 Subject: [PATCH 13/15] Update tests/pubsub/test_gossipsub.py Co-Authored-By: Kevin Mai-Husan Chia --- tests/pubsub/test_gossipsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 6dd7e44..2b81eb3 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -54,7 +54,7 @@ async def test_join(): # Check that the gossipsub of central node does not have a mesh for the topic assert topic not in gossipsubs[central_node_index].mesh - # Central node subscribe message origin + # Central node subscribes the topic await pubsubs[central_node_index].subscribe(topic) # Check that the gossipsub of central node no longer has fanout for the topic From c0a3af69e0d3a868f5d40e653400bf6d7a016d38 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 23 Jul 2019 16:45:54 +0800 Subject: [PATCH 14/15] Apply PR feedback: Check that node is in mesh peer's mesh after subscribe --- tests/pubsub/test_gossipsub.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 2b81eb3..ab76e26 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -63,8 +63,10 @@ async def test_join(): for i in hosts_indices: if i in subscribed_peer_indices: assert str(libp2p_hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic] + assert str(libp2p_hosts[central_node_index].get_id()) in gossipsubs[i].mesh[topic] else: assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] + assert str(libp2p_hosts[central_node_index].get_id()) not in gossipsubs[i].mesh[topic] await cleanup() From 0cc8a205fb4f4c84121187bc030b617544614ccb Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 23 Jul 2019 17:28:46 +0800 Subject: [PATCH 15/15] Fix test and reduce number of nodes in `test_join` --- tests/pubsub/test_gossipsub.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index ab76e26..a8f58d3 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -13,20 +13,20 @@ SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] @pytest.mark.asyncio async def test_join(): # Create libp2p hosts - num_hosts = 10 + num_hosts = 4 hosts_indices = list(range(num_hosts)) libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ SUPPORTED_PROTOCOLS, \ - 10, 9, 11, 30, 3, 5, 0.5) + 4, 3, 5, 30, 3, 5, 0.5) topic = "test_join" central_node_index = 0 # Remove index of central host from the indices hosts_indices.remove(central_node_index) - num_subscribed_peer = 6 + num_subscribed_peer = 2 subscribed_peer_indices = random.sample(hosts_indices, num_subscribed_peer) # All pubsub except the one of central node subscribe to topic @@ -57,6 +57,8 @@ async def test_join(): # Central node subscribes the topic await pubsubs[central_node_index].subscribe(topic) + await asyncio.sleep(2) + # Check that the gossipsub of central node no longer has fanout for the topic assert topic not in gossipsubs[central_node_index].fanout @@ -66,7 +68,7 @@ async def test_join(): assert str(libp2p_hosts[central_node_index].get_id()) in gossipsubs[i].mesh[topic] else: assert str(libp2p_hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] - assert str(libp2p_hosts[central_node_index].get_id()) not in gossipsubs[i].mesh[topic] + assert topic not in gossipsubs[i].mesh await cleanup()