py-libp2p/tests/pubsub/test_gossipsub.py

434 lines
12 KiB
Python
Raw Normal View History

import asyncio
import random
2019-07-26 18:35:25 +08:00
import pytest
2019-08-01 06:00:12 +08:00
from tests.utils import cleanup, connect
2019-07-26 18:35:25 +08:00
from .configs import GOSSIPSUB_PROTOCOL_ID
from .utils import (
create_libp2p_hosts,
create_pubsub_and_gossipsub_instances,
dense_connect,
one_to_all_connect,
)
SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID]
2019-07-19 20:16:53 +08:00
@pytest.mark.asyncio
async def test_join():
2019-07-19 12:56:15 +08:00
# Create libp2p hosts
num_hosts = 4
2019-07-19 12:56:15 +08:00
hosts_indices = list(range(num_hosts))
2019-07-19 20:16:53 +08:00
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 4, 3, 5, 30, 3, 5, 0.5
)
2019-07-19 20:16:53 +08:00
topic = "test_join"
2019-07-19 12:56:15 +08:00
central_node_index = 0
# Remove index of central host from the indices
hosts_indices.remove(central_node_index)
num_subscribed_peer = 2
2019-07-19 12:56:15 +08:00
subscribed_peer_indices = random.sample(hosts_indices, num_subscribed_peer)
# All pubsub except the one of central node subscribe to topic
for i in subscribed_peer_indices:
2019-07-19 19:43:12 +08:00
await pubsubs[i].subscribe(topic)
2019-07-19 12:56:15 +08:00
# Connect central host to all other hosts
await one_to_all_connect(libp2p_hosts, central_node_index)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
2019-07-19 20:16:53 +08:00
2019-07-22 19:28:12 +08:00
# Central node publish to the topic so that this topic
# is added to central node's fanout
# publish from the randomly chosen host
await pubsubs[central_node_index].publish(topic, b"data")
2019-07-22 19:28:12 +08:00
# Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout
# Check that the gossipsub of central node does not have a mesh for the topic
2019-07-19 12:56:15 +08:00
assert topic not in gossipsubs[central_node_index].mesh
2019-07-19 20:16:53 +08:00
# Central node subscribes the topic
2019-07-19 12:56:15 +08:00
await pubsubs[central_node_index].subscribe(topic)
2019-07-19 20:16:53 +08:00
await asyncio.sleep(2)
2019-07-22 19:28:12 +08:00
# Check that the gossipsub of central node no longer has fanout for the topic
assert topic not in gossipsubs[central_node_index].fanout
2019-07-19 12:56:15 +08:00
for i in hosts_indices:
if i in subscribed_peer_indices:
2019-08-01 06:00:12 +08:00
assert (
str(libp2p_hosts[i].get_id())
in gossipsubs[central_node_index].mesh[topic]
)
assert (
str(libp2p_hosts[central_node_index].get_id())
in gossipsubs[i].mesh[topic]
)
2019-07-19 12:56:15 +08:00
else:
2019-08-01 06:00:12 +08:00
assert (
str(libp2p_hosts[i].get_id())
not in gossipsubs[central_node_index].mesh[topic]
)
assert topic not in gossipsubs[i].mesh
2019-07-19 20:16:53 +08:00
2019-07-19 19:43:12 +08:00
await cleanup()
2019-07-19 20:16:53 +08:00
@pytest.mark.asyncio
async def test_leave():
num_hosts = 1
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
_, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
2019-07-19 20:16:53 +08:00
gossipsub = gossipsubs[0]
topic = "test_leave"
2019-07-26 18:35:25 +08:00
assert topic not in gossipsub.mesh
2019-07-19 20:16:53 +08:00
await gossipsub.join(topic)
assert topic in gossipsub.mesh
await gossipsub.leave(topic)
assert topic not in gossipsub.mesh
# Test re-leave
await gossipsub.leave(topic)
await cleanup()
2019-07-21 21:22:20 +08:00
@pytest.mark.asyncio
async def test_handle_graft(event_loop, monkeypatch):
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
_, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
2019-07-21 21:22:20 +08:00
index_alice = 0
id_alice = str(libp2p_hosts[index_alice].get_id())
index_bob = 1
id_bob = str(libp2p_hosts[index_bob].get_id())
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob])
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
topic = "test_handle_graft"
# Only lice subscribe to the topic
await gossipsubs[index_alice].join(topic)
# Monkey patch bob's `emit_prune` function so we can
# check if it is called in `handle_graft`
event_emit_prune = asyncio.Event()
2019-08-01 06:00:12 +08:00
2019-07-21 21:22:20 +08:00
async def emit_prune(topic, sender_peer_id):
event_emit_prune.set()
2019-08-01 06:00:12 +08:00
monkeypatch.setattr(gossipsubs[index_bob], "emit_prune", emit_prune)
2019-07-21 21:22:20 +08:00
# Check that alice is bob's peer but not his mesh peer
assert id_alice in gossipsubs[index_bob].peers_gossipsub
assert topic not in gossipsubs[index_bob].mesh
2019-07-21 22:28:43 +08:00
2019-07-21 21:22:20 +08:00
await gossipsubs[index_alice].emit_graft(topic, id_bob)
# Check that `emit_prune` is called
2019-08-01 06:00:12 +08:00
await asyncio.wait_for(event_emit_prune.wait(), timeout=1, loop=event_loop)
2019-07-21 21:22:20 +08:00
assert event_emit_prune.is_set()
# Check that bob is alice's peer but not her mesh peer
assert topic in gossipsubs[index_alice].mesh
assert id_bob not in gossipsubs[index_alice].mesh[topic]
assert id_bob in gossipsubs[index_alice].peers_gossipsub
2019-07-21 22:28:43 +08:00
2019-07-21 21:22:20 +08:00
await gossipsubs[index_bob].emit_graft(topic, id_alice)
await asyncio.sleep(1)
# Check that bob is now alice's mesh peer
assert id_bob in gossipsubs[index_alice].mesh[topic]
await cleanup()
2019-07-21 22:28:43 +08:00
@pytest.mark.asyncio
async def test_handle_prune():
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 3
)
2019-07-21 22:28:43 +08:00
index_alice = 0
id_alice = str(libp2p_hosts[index_alice].get_id())
index_bob = 1
id_bob = str(libp2p_hosts[index_bob].get_id())
topic = "test_handle_prune"
2019-07-24 11:35:14 +08:00
for pubsub in pubsubs:
await pubsub.subscribe(topic)
2019-07-21 22:28:43 +08:00
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob])
2019-07-24 11:35:14 +08:00
# Wait 3 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(3)
2019-07-21 22:28:43 +08:00
# Check that they are each other's mesh peer
assert id_alice in gossipsubs[index_bob].mesh[topic]
assert id_bob in gossipsubs[index_alice].mesh[topic]
# alice emit prune message to bob, alice should be removed
# from bob's mesh peer
await gossipsubs[index_alice].emit_prune(topic, id_bob)
# FIXME: This test currently works because the heartbeat interval
2019-07-24 11:35:14 +08:00
# is increased to 3 seconds, so alice won't get add back into
2019-07-21 22:28:43 +08:00
# bob's mesh peer during heartbeat.
await asyncio.sleep(1)
# Check that alice is no longer bob's mesh peer
assert id_alice not in gossipsubs[index_bob].mesh[topic]
assert id_bob in gossipsubs[index_alice].mesh[topic]
await cleanup()
@pytest.mark.asyncio
async def test_dense():
# Create libp2p hosts
num_hosts = 10
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar
queues = []
for pubsub in pubsubs:
q = await pubsub.subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
for i in range(num_msgs):
2019-08-01 06:00:12 +08:00
msg_content = b"foo " + i.to_bytes(1, "big")
# randomly pick a message origin
origin_idx = random.randint(0, num_hosts - 1)
# publish from the randomly chosen host
2019-07-26 18:35:25 +08:00
await pubsubs[origin_idx].publish("foobar", msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
2019-07-26 18:35:25 +08:00
assert msg.data == msg_content
await cleanup()
2019-07-26 18:35:25 +08:00
@pytest.mark.asyncio
async def test_fanout():
# Create libp2p hosts
num_hosts = 10
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
2019-07-26 18:35:25 +08:00
# All pubsub subscribe to foobar except for `pubsubs[0]`
queues = []
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
2019-07-26 18:35:25 +08:00
topic = "foobar"
# Send messages with origin not subscribed
for i in range(num_msgs):
2019-07-26 18:35:25 +08:00
msg_content = b"foo " + i.to_bytes(1, "big")
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
# publish from the randomly chosen host
2019-07-26 18:35:25 +08:00
await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
2019-07-26 18:35:25 +08:00
assert msg.data == msg_content
# Subscribe message origin
2019-07-26 18:35:25 +08:00
queues.insert(0, await pubsubs[0].subscribe(topic))
# Send messages again
for i in range(num_msgs):
2019-08-01 06:00:12 +08:00
msg_content = b"bar " + i.to_bytes(1, "big")
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
# publish from the randomly chosen host
2019-07-26 18:35:25 +08:00
await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
2019-07-26 18:35:25 +08:00
assert msg.data == msg_content
await cleanup()
2019-08-01 06:00:12 +08:00
@pytest.mark.asyncio
async def test_fanout_maintenance():
# Create libp2p hosts
num_hosts = 10
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-08-01 06:00:12 +08:00
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar
queues = []
2019-07-26 18:35:25 +08:00
topic = "foobar"
for i in range(1, len(pubsubs)):
2019-07-26 18:35:25 +08:00
q = await pubsubs[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
# Send messages with origin not subscribed
for i in range(num_msgs):
2019-08-01 06:00:12 +08:00
msg_content = b"foo " + i.to_bytes(1, "big")
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
# publish from the randomly chosen host
2019-07-26 18:35:25 +08:00
await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
2019-07-26 18:35:25 +08:00
assert msg.data == msg_content
for sub in pubsubs:
2019-07-26 18:35:25 +08:00
await sub.unsubscribe(topic)
queues = []
await asyncio.sleep(2)
# Resub and repeat
for i in range(1, len(pubsubs)):
2019-07-26 18:35:25 +08:00
q = await pubsubs[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues
queues.append(q)
await asyncio.sleep(2)
# Check messages can still be sent
for i in range(num_msgs):
2019-08-01 06:00:12 +08:00
msg_content = b"bar " + i.to_bytes(1, "big")
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
# publish from the randomly chosen host
2019-07-26 18:35:25 +08:00
await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
2019-07-26 18:35:25 +08:00
assert msg.data == msg_content
await cleanup()
2019-07-26 18:35:25 +08:00
@pytest.mark.asyncio
async def test_gossip_propagation():
# Create libp2p hosts
num_hosts = 2
2019-07-26 18:35:25 +08:00
hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
2019-07-26 18:35:25 +08:00
pubsubs, _ = create_pubsub_and_gossipsub_instances(
2019-08-01 06:00:12 +08:00
hosts, SUPPORTED_PROTOCOLS, 1, 0, 2, 30, 50, 100, 0.5
2019-07-26 18:35:25 +08:00
)
2019-07-26 18:35:25 +08:00
topic = "foo"
await pubsubs[0].subscribe(topic)
2019-07-26 18:35:25 +08:00
# node 0 publish to topic
2019-08-01 06:00:12 +08:00
msg_content = b"foo_msg"
# publish from the randomly chosen host
2019-07-26 18:35:25 +08:00
await pubsubs[0].publish(topic, msg_content)
2019-07-26 18:35:25 +08:00
# now node 1 subscribes
queue_1 = await pubsubs[1].subscribe(topic)
2019-07-26 18:35:25 +08:00
await connect(hosts[0], hosts[1])
# wait for gossip heartbeat
await asyncio.sleep(2)
# should be able to read message
2019-07-26 18:35:25 +08:00
msg = await queue_1.get()
assert msg.data == msg_content
await cleanup()