Merge pull request #181 from NIC619/fix_handle_unsubscribe

Fix: handle unsubscribe message
This commit is contained in:
NIC Lin 2019-07-21 20:09:51 +08:00 committed by GitHub
commit 1e78c21eca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 6 deletions

View File

@ -179,9 +179,10 @@ class GossipSub(IPubsubRouter):
subscription announcement subscription announcement
:param topic: topic to join :param topic: topic to join
""" """
if topic in self.mesh:
return
# Create mesh[topic] if it does not yet exist # Create mesh[topic] if it does not yet exist
if topic not in self.mesh: self.mesh[topic] = []
self.mesh[topic] = []
if topic in self.fanout and len(self.fanout[topic]) == self.degree: if topic in self.fanout and len(self.fanout[topic]) == self.degree:
# If router already has D peers from the fanout peers of a topic # If router already has D peers from the fanout peers of a topic
@ -228,6 +229,8 @@ class GossipSub(IPubsubRouter):
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.
:param topic: topic to leave :param topic: topic to leave
""" """
if topic not in self.mesh:
return
# Notify the peers in mesh[topic] with a PRUNE(topic) message # Notify the peers in mesh[topic] with a PRUNE(topic) message
for peer in self.mesh[topic]: for peer in self.mesh[topic]:
await self.emit_prune(topic, peer) await self.emit_prune(topic, peer)

View File

@ -107,8 +107,7 @@ class Pubsub():
# to know that it is subscribed to the topic (doesn't # to know that it is subscribed to the topic (doesn't
# need everyone to know) # need everyone to know)
for message in rpc_incoming.subscriptions: for message in rpc_incoming.subscriptions:
if message.subscribe: self.handle_subscription(peer_id, message)
self.handle_subscription(peer_id, message)
if should_publish: if should_publish:
# relay message to peers with router # relay message to peers with router
@ -210,6 +209,10 @@ class Pubsub():
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
""" """
# Already subscribed
if topic_id in self.my_topics:
return self.my_topics[topic_id]
# Map topic_id to blocking queue # Map topic_id to blocking queue
self.my_topics[topic_id] = asyncio.Queue() self.my_topics[topic_id] = asyncio.Queue()
@ -235,9 +238,11 @@ class Pubsub():
:param topic_id: topic_id to unsubscribe from :param topic_id: topic_id to unsubscribe from
""" """
# Return if we already unsubscribed from the topic
if topic_id not in self.my_topics:
return
# Remove topic_id from map if present # Remove topic_id from map if present
if topic_id in self.my_topics: del self.my_topics[topic_id]
del self.my_topics[topic_id]
# Create unsubscribe message # Create unsubscribe message
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()

View File

@ -14,6 +14,54 @@ from tests.utils import cleanup
SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"]
@pytest.mark.asyncio
async def test_join():
num_hosts = 1
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
_, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5)
gossipsub = gossipsubs[0]
topic = "test_join"
assert topic not in gossipsub.mesh
await gossipsub.join(topic)
assert topic in gossipsub.mesh
# Test re-join
await gossipsub.join(topic)
await cleanup()
@pytest.mark.asyncio
async def test_leave():
num_hosts = 1
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
_, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5)
gossipsub = gossipsubs[0]
topic = "test_leave"
await gossipsub.join(topic)
assert topic in gossipsub.mesh
await gossipsub.leave(topic)
assert topic not in gossipsub.mesh
# Test re-leave
await gossipsub.leave(topic)
await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_dense(): async def test_dense():
# Create libp2p hosts # Create libp2p hosts

View File

@ -0,0 +1,63 @@
# pylint: disable=redefined-outer-name
import pytest
from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
TESTING_TOPIC = "TEST_SUBSCRIBE"
class NoConnNode:
# pylint: disable=too-few-public-methods
def __init__(self, host, pubsub):
self.host = host
self.pubsub = pubsub
@classmethod
async def create(cls):
host = await new_node()
floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
pubsub = Pubsub(host, floodsub, "test")
return cls(host, pubsub)
@pytest.fixture
async def node():
return await NoConnNode.create()
@pytest.mark.asyncio
async def test_subscribe_unsubscribe(node):
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
await node.pubsub.unsubscribe(TESTING_TOPIC)
assert TESTING_TOPIC not in node.pubsub.my_topics
@pytest.mark.asyncio
async def test_re_subscribe(node):
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
@pytest.mark.asyncio
async def test_re_unsubscribe(node):
# Unsubscribe from topic we didn't even subscribe to
assert "NOT_MY_TOPIC" not in node.pubsub.my_topics
await node.pubsub.unsubscribe("NOT_MY_TOPIC")
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
await node.pubsub.unsubscribe(TESTING_TOPIC)
assert TESTING_TOPIC not in node.pubsub.my_topics
await node.pubsub.unsubscribe(TESTING_TOPIC)
assert TESTING_TOPIC not in node.pubsub.my_topics