From 6d9ec7a9c5717c45108437e9d8d128758d7a2ba4 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 15 Jul 2019 16:28:29 +0800 Subject: [PATCH 1/6] Handle the unsubscribe case --- libp2p/pubsub/pubsub.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index e0f6580..3e8f74c 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -107,8 +107,7 @@ class Pubsub(): # to know that it is subscribed to the topic (doesn't # need everyone to know) for message in rpc_incoming.subscriptions: - if message.subscribe: - self.handle_subscription(peer_id, message) + self.handle_subscription(peer_id, message) if should_publish: # relay message to peers with router From f25d97fbd3e70e394c6328f2e847d7a26b7c0f91 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 15 Jul 2019 16:32:05 +0800 Subject: [PATCH 2/6] Prevent self re-subscription --- libp2p/pubsub/pubsub.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3e8f74c..07008ca 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -209,6 +209,10 @@ class Pubsub(): :param topic_id: topic_id to subscribe to """ + # Already subscribed + if topic_id in self.my_topics: + return self.my_topics[topic_id] + # Map topic_id to blocking queue self.my_topics[topic_id] = asyncio.Queue() From 89347be5265fe5bc83ea900d40f9b5adbdee39e5 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 18 Jul 2019 13:26:31 +0800 Subject: [PATCH 3/6] Prevent self re-unsubscription --- libp2p/pubsub/pubsub.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 07008ca..f80ed97 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -238,9 +238,11 @@ class Pubsub(): :param topic_id: topic_id to unsubscribe from """ + # Return if we already unsubscribed from the topic + if topic_id not in self.my_topics: + return # Remove topic_id from map if present - if topic_id in self.my_topics: - del self.my_topics[topic_id] + del self.my_topics[topic_id] # Create unsubscribe message packet = rpc_pb2.RPC() From 183eee0e85506499329e5f19a5521bff97fc34f7 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 18 Jul 2019 13:39:22 +0800 Subject: [PATCH 4/6] Add self subscription tests --- tests/pubsub/test_subscription.py | 63 +++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 tests/pubsub/test_subscription.py diff --git a/tests/pubsub/test_subscription.py b/tests/pubsub/test_subscription.py new file mode 100644 index 0000000..c8f46f5 --- /dev/null +++ b/tests/pubsub/test_subscription.py @@ -0,0 +1,63 @@ +# pylint: disable=redefined-outer-name +import pytest + +from libp2p import new_node +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub + +SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +TESTING_TOPIC = "TEST_SUBSCRIBE" + + +class NoConnNode: + # pylint: disable=too-few-public-methods + + def __init__(self, host, pubsub): + self.host = host + self.pubsub = pubsub + + @classmethod + async def create(cls): + host = await new_node() + floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) + pubsub = Pubsub(host, floodsub, "test") + return cls(host, pubsub) + + +@pytest.fixture +async def node(): + return await NoConnNode.create() + + +@pytest.mark.asyncio +async def test_subscribe_unsubscribe(node): + await node.pubsub.subscribe(TESTING_TOPIC) + assert TESTING_TOPIC in node.pubsub.my_topics + + await node.pubsub.unsubscribe(TESTING_TOPIC) + assert TESTING_TOPIC not in node.pubsub.my_topics + + +@pytest.mark.asyncio +async def test_re_subscribe(node): + await node.pubsub.subscribe(TESTING_TOPIC) + assert TESTING_TOPIC in node.pubsub.my_topics + + await node.pubsub.subscribe(TESTING_TOPIC) + assert TESTING_TOPIC in node.pubsub.my_topics + + +@pytest.mark.asyncio +async def test_re_unsubscribe(node): + # Unsubscribe from topic we didn't even subscribe to + assert "NOT_MY_TOPIC" not in node.pubsub.my_topics + await node.pubsub.unsubscribe("NOT_MY_TOPIC") + + await node.pubsub.subscribe(TESTING_TOPIC) + assert TESTING_TOPIC in node.pubsub.my_topics + + await node.pubsub.unsubscribe(TESTING_TOPIC) + assert TESTING_TOPIC not in node.pubsub.my_topics + + await node.pubsub.unsubscribe(TESTING_TOPIC) + assert TESTING_TOPIC not in node.pubsub.my_topics From 36575e8c9be6b8d3daabc93102285f09b01abbfd Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 19 Jul 2019 19:56:25 +0800 Subject: [PATCH 5/6] Add check to prevent gossipsub re-join and re-leave --- libp2p/pubsub/gossipsub.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8524ca6..b2e6f38 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -179,9 +179,10 @@ class GossipSub(IPubsubRouter): subscription announcement :param topic: topic to join """ + if topic in self.mesh: + return # Create mesh[topic] if it does not yet exist - if topic not in self.mesh: - self.mesh[topic] = [] + self.mesh[topic] = [] if topic in self.fanout and len(self.fanout[topic]) == self.degree: # If router already has D peers from the fanout peers of a topic @@ -228,6 +229,8 @@ class GossipSub(IPubsubRouter): It is invoked after the unsubscription announcement. :param topic: topic to leave """ + if topic not in self.mesh: + return # Notify the peers in mesh[topic] with a PRUNE(topic) message for peer in self.mesh[topic]: await self.emit_prune(topic, peer) From 29aae7dca442b157268ee803daa3453cbfb9a294 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Fri, 19 Jul 2019 20:16:53 +0800 Subject: [PATCH 6/6] Add gossipsub join/leave test --- tests/pubsub/test_gossipsub.py | 48 ++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 0c94fae..1f9b859 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -14,6 +14,54 @@ from tests.utils import cleanup SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] +@pytest.mark.asyncio +async def test_join(): + num_hosts = 1 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + gossipsub = gossipsubs[0] + topic = "test_join" + + assert topic not in gossipsub.mesh + await gossipsub.join(topic) + assert topic in gossipsub.mesh + + # Test re-join + await gossipsub.join(topic) + + await cleanup() + + +@pytest.mark.asyncio +async def test_leave(): + num_hosts = 1 + libp2p_hosts = await create_libp2p_hosts(num_hosts) + + # Create pubsub, gossipsub instances + _, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + SUPPORTED_PROTOCOLS, \ + 10, 9, 11, 30, 3, 5, 0.5) + + gossipsub = gossipsubs[0] + topic = "test_leave" + + await gossipsub.join(topic) + assert topic in gossipsub.mesh + + await gossipsub.leave(topic) + assert topic not in gossipsub.mesh + + # Test re-leave + await gossipsub.leave(topic) + + await cleanup() + + @pytest.mark.asyncio async def test_dense(): # Create libp2p hosts