Merge pull request #198 from mhchia/fix/clean-up-tests-with-fixture

Use factories and fixtures in pubsub tests
This commit is contained in:
Kevin Mai-Husan Chia 2019-08-01 22:08:03 +08:00 committed by GitHub
commit f6e456c96e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 208 additions and 412 deletions

View File

@ -8,6 +8,7 @@ classifiers = [f"Programming Language :: Python :: {version}" for version in ["3
extras_require = { extras_require = {
"test": [ "test": [
"codecov>=2.0.15,<3.0.0", "codecov>=2.0.15,<3.0.0",
"factory-boy>=2.12.0,<3.0.0",
"pytest>=4.6.3,<5.0.0", "pytest>=4.6.3,<5.0.0",
"pytest-cov>=2.7.1,<3.0.0", "pytest-cov>=2.7.1,<3.0.0",
"pytest-asyncio>=0.10.0,<1.0.0", "pytest-asyncio>=0.10.0,<1.0.0",

3
tests/configs.py Normal file
View File

@ -0,0 +1,3 @@
import multiaddr
LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")

View File

@ -1,7 +1,18 @@
import multiaddr from typing import NamedTuple
FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0"
GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0"
LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
class GossipsubParams(NamedTuple):
degree: int = 10
degree_low: int = 9
degree_high: int = 11
time_to_live: int = 30
gossip_window: int = 3
gossip_history: int = 5
heartbeat_interval: float = 0.5
GOSSIPSUB_PARAMS = GossipsubParams()

View File

@ -1,17 +1,11 @@
import asyncio import asyncio
from typing import NamedTuple
import pytest import pytest
from multiaddr import Multiaddr from tests.configs import LISTEN_MADDR
from libp2p import new_node from .configs import GOSSIPSUB_PARAMS
from .factories import FloodsubFactory, GossipsubFactory, HostFactory, PubsubFactory
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pubsub import Pubsub
from .configs import FLOODSUB_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID, LISTEN_MADDR
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
@ -24,9 +18,7 @@ def num_hosts():
@pytest.fixture @pytest.fixture
async def hosts(num_hosts): async def hosts(num_hosts):
_hosts = await asyncio.gather( _hosts = HostFactory.create_batch(num_hosts)
*[new_node(transport_opt=[str(LISTEN_MADDR)]) for _ in range(num_hosts)]
)
await asyncio.gather( await asyncio.gather(
*[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts] *[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts]
) )
@ -42,54 +34,46 @@ async def hosts(num_hosts):
@pytest.fixture @pytest.fixture
def floodsubs(num_hosts): def floodsubs(num_hosts):
return tuple(FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) for _ in range(num_hosts)) return FloodsubFactory.create_batch(num_hosts)
class GossipsubParams(NamedTuple):
degree: int = 10
degree_low: int = 9
degree_high: int = 11
fanout_ttl: int = 30
gossip_window: int = 3
gossip_history: int = 5
heartbeat_interval: float = 0.5
@pytest.fixture @pytest.fixture
def gossipsub_params(): def gossipsub_params():
return GossipsubParams() return GOSSIPSUB_PARAMS
@pytest.fixture @pytest.fixture
def gossipsubs(num_hosts, gossipsub_params): def gossipsubs(num_hosts, gossipsub_params):
yield tuple( yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict())
GossipSub(protocols=[GOSSIPSUB_PROTOCOL_ID], **gossipsub_params._asdict())
for _ in range(num_hosts)
)
# TODO: Clean up # TODO: Clean up
def _make_pubsubs(hosts, pubsub_routers): def _make_pubsubs(hosts, pubsub_routers, cache_size):
if len(pubsub_routers) != len(hosts): if len(pubsub_routers) != len(hosts):
raise ValueError( raise ValueError(
f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " f"lenght of pubsub_routers={pubsub_routers} should be equaled to the "
f"length of hosts={len(hosts)}" f"length of hosts={len(hosts)}"
) )
return tuple( return tuple(
Pubsub(host=host, router=router, my_id=host.get_id()) PubsubFactory(host=host, router=router, cache_size=cache_size)
for host, router in zip(hosts, pubsub_routers) for host, router in zip(hosts, pubsub_routers)
) )
@pytest.fixture @pytest.fixture
def pubsubs_fsub(hosts, floodsubs): def pubsub_cache_size():
_pubsubs_fsub = _make_pubsubs(hosts, floodsubs) return None # default
@pytest.fixture
def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size):
_pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size)
yield _pubsubs_fsub yield _pubsubs_fsub
# TODO: Clean up # TODO: Clean up
@pytest.fixture @pytest.fixture
def pubsubs_gsub(hosts, gossipsubs): def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size):
_pubsubs_gsub = _make_pubsubs(hosts, gossipsubs) _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size)
yield _pubsubs_gsub yield _pubsubs_gsub
# TODO: Clean up # TODO: Clean up

View File

@ -1,18 +1,16 @@
import asyncio import asyncio
import uuid import uuid
import multiaddr
from libp2p import new_node
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from .configs import FLOODSUB_PROTOCOL_ID from tests.configs import LISTEN_MADDR
from .factories import FloodsubFactory, PubsubFactory
from .utils import message_id_generator from .utils import message_id_generator
SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
CRYPTO_TOPIC = "ethereum" CRYPTO_TOPIC = "ethereum"
# Message format: # Message format:
@ -52,14 +50,9 @@ class DummyAccountNode:
to use async await, unlike the init function to use async await, unlike the init function
""" """
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) pubsub = PubsubFactory(router=FloodsubFactory())
await libp2p_node.get_network().listen( await pubsub.host.get_network().listen(LISTEN_MADDR)
multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") return cls(libp2p_node=pubsub.host, pubsub=pubsub, floodsub=pubsub.router)
)
floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
pubsub = Pubsub(libp2p_node, floodsub, "a")
return cls(libp2p_node=libp2p_node, pubsub=pubsub, floodsub=floodsub)
async def handle_incoming_msgs(self): async def handle_incoming_msgs(self):
""" """

55
tests/pubsub/factories.py Normal file
View File

@ -0,0 +1,55 @@
import functools
import factory
from libp2p import initialize_default_swarm
from libp2p.host.basic_host import BasicHost
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pubsub import Pubsub
from tests.configs import LISTEN_MADDR
from .configs import FLOODSUB_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID, GOSSIPSUB_PARAMS
def swarm_factory():
return initialize_default_swarm(transport_opt=[str(LISTEN_MADDR)])
class HostFactory(factory.Factory):
class Meta:
model = BasicHost
network = factory.LazyFunction(swarm_factory)
class FloodsubFactory(factory.Factory):
class Meta:
model = FloodSub
protocols = (FLOODSUB_PROTOCOL_ID,)
class GossipsubFactory(factory.Factory):
class Meta:
model = GossipSub
protocols = (GOSSIPSUB_PROTOCOL_ID,)
degree = GOSSIPSUB_PARAMS.degree
degree_low = GOSSIPSUB_PARAMS.degree_low
degree_high = GOSSIPSUB_PARAMS.degree_high
time_to_live = GOSSIPSUB_PARAMS.time_to_live
gossip_window = GOSSIPSUB_PARAMS.gossip_window
gossip_history = GOSSIPSUB_PARAMS.gossip_history
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
class PubsubFactory(factory.Factory):
class Meta:
model = Pubsub
host = factory.SubFactory(HostFactory)
router = None
my_id = factory.LazyAttribute(lambda obj: obj.host.get_id())
cache_size = None

View File

@ -2,13 +2,11 @@ import asyncio
import pytest import pytest
from libp2p import new_node from tests.configs import LISTEN_MADDR
from libp2p.peer.id import ID
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import cleanup, connect from tests.utils import cleanup, connect
from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR from .configs import FLOODSUB_PROTOCOL_ID
from .factories import PubsubFactory
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
@ -182,11 +180,10 @@ async def perform_test_from_obj(obj, router_factory):
pubsub_map = {} pubsub_map = {}
async def add_node(node_id: str) -> None: async def add_node(node_id: str) -> None:
node = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node.get_network().listen(LISTEN_MADDR)
node_map[node_id] = node
pubsub_router = router_factory(protocols=obj["supported_protocols"]) pubsub_router = router_factory(protocols=obj["supported_protocols"])
pubsub = Pubsub(node, pubsub_router, ID(node_id.encode())) pubsub = PubsubFactory(router=pubsub_router)
await pubsub.host.get_network().listen(LISTEN_MADDR)
node_map[node_id] = pubsub.host
pubsub_map[node_id] = pubsub pubsub_map[node_id] = pubsub
tasks_connect = [] tasks_connect = []

View File

@ -30,7 +30,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
# Create nodes # Create nodes
dummy_nodes = [] dummy_nodes = []
for i in range(num_nodes): for _ in range(num_nodes):
dummy_nodes.append(await DummyAccountNode.create()) dummy_nodes.append(await DummyAccountNode.create())
# Create network # Create network

View File

@ -1,56 +1,41 @@
import asyncio import asyncio
import multiaddr
import pytest import pytest
from libp2p import new_node
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import cleanup, connect from tests.utils import cleanup, connect
from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR from .factories import FloodsubFactory
from .floodsub_integration_test_settings import ( from .floodsub_integration_test_settings import (
perform_test_from_obj, perform_test_from_obj,
floodsub_protocol_pytest_params, floodsub_protocol_pytest_params,
) )
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@pytest.mark.parametrize("num_hosts", (2,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes(): async def test_simple_two_nodes(pubsubs_fsub):
node_a = await new_node(transport_opt=[str(LISTEN_MADDR)])
node_b = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node_a.get_network().listen(LISTEN_MADDR)
await node_b.get_network().listen(LISTEN_MADDR)
supported_protocols = [FLOODSUB_PROTOCOL_ID]
topic = "my_topic" topic = "my_topic"
data = b"some data" data = b"some data"
floodsub_a = FloodSub(supported_protocols) await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host)
pubsub_a = Pubsub(node_a, floodsub_a, ID(b"\x12\x20" + b"a" * 32))
floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, ID(b"\x12\x20" + b"a" * 32))
await connect(node_a, node_b)
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
sub_b = await pubsub_b.subscribe(topic) sub_b = await pubsubs_fsub[1].subscribe(topic)
# Sleep to let a know of b's subscription # Sleep to let a know of b's subscription
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
await pubsub_a.publish(topic, data) await pubsubs_fsub[0].publish(topic, data)
res_b = await sub_b.get() res_b = await sub_b.get()
# Check that the msg received by node_b is the same # Check that the msg received by node_b is the same
# as the message sent by node_a # as the message sent by node_a
assert ID(res_b.from_id) == node_a.get_id() assert ID(res_b.from_id) == pubsubs_fsub[0].host.get_id()
assert res_b.data == data assert res_b.data == data
assert res_b.topicIDs == [topic] assert res_b.topicIDs == [topic]
@ -58,21 +43,16 @@ async def test_simple_two_nodes():
await cleanup() await cleanup()
# Initialize Pubsub with a cache_size of 4
@pytest.mark.parametrize("num_hosts, pubsub_cache_size", ((2, 4),))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_lru_cache_two_nodes(monkeypatch): async def test_lru_cache_two_nodes(pubsubs_fsub, monkeypatch):
# two nodes with cache_size of 4 # two nodes with cache_size of 4
# `node_a` send the following messages to node_b # `node_a` send the following messages to node_b
message_indices = [1, 1, 2, 1, 3, 1, 4, 1, 5, 1] message_indices = [1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
# `node_b` should only receive the following # `node_b` should only receive the following
expected_received_indices = [1, 2, 3, 4, 5, 1] expected_received_indices = [1, 2, 3, 4, 5, 1]
node_a = await new_node(transport_opt=[str(LISTEN_MADDR)])
node_b = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node_a.get_network().listen(LISTEN_MADDR)
await node_b.get_network().listen(LISTEN_MADDR)
supported_protocols = SUPPORTED_PROTOCOLS
topic = "my_topic" topic = "my_topic"
# Mock `get_msg_id` to make us easier to manipulate `msg_id` by `data`. # Mock `get_msg_id` to make us easier to manipulate `msg_id` by `data`.
@ -84,18 +64,10 @@ async def test_lru_cache_two_nodes(monkeypatch):
monkeypatch.setattr(libp2p.pubsub.pubsub, "get_msg_id", get_msg_id) monkeypatch.setattr(libp2p.pubsub.pubsub, "get_msg_id", get_msg_id)
# Initialize Pubsub with a cache_size of 4 await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host)
cache_size = 4
floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32), cache_size)
floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32), cache_size)
await connect(node_a, node_b)
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
sub_b = await pubsub_b.subscribe(topic) sub_b = await pubsubs_fsub[1].subscribe(topic)
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
def _make_testing_data(i: int) -> bytes: def _make_testing_data(i: int) -> bytes:
@ -105,7 +77,7 @@ async def test_lru_cache_two_nodes(monkeypatch):
return b"data" + i.to_bytes(num_int_bytes, "big") return b"data" + i.to_bytes(num_int_bytes, "big")
for index in message_indices: for index in message_indices:
await pubsub_a.publish(topic, _make_testing_data(index)) await pubsubs_fsub[0].publish(topic, _make_testing_data(index))
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
for index in expected_received_indices: for index in expected_received_indices:
@ -120,4 +92,4 @@ async def test_lru_cache_two_nodes(monkeypatch):
@pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params) @pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gossipsub_run_with_floodsub_tests(test_case_obj): async def test_gossipsub_run_with_floodsub_tests(test_case_obj):
await perform_test_from_obj(test_case_obj, FloodSub) await perform_test_from_obj(test_case_obj, FloodsubFactory)

View File

@ -5,29 +5,17 @@ import pytest
from tests.utils import cleanup, connect from tests.utils import cleanup, connect
from .configs import GOSSIPSUB_PROTOCOL_ID from .configs import GossipsubParams
from .utils import ( from .utils import dense_connect, one_to_all_connect
create_libp2p_hosts,
create_pubsub_and_gossipsub_instances,
dense_connect, @pytest.mark.parametrize(
one_to_all_connect, "num_hosts, gossipsub_params",
((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),),
) )
SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_join(): async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub):
# Create libp2p hosts
num_hosts = 4
hosts_indices = list(range(num_hosts)) hosts_indices = list(range(num_hosts))
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 4, 3, 5, 30, 3, 5, 0.5
)
topic = "test_join" topic = "test_join"
central_node_index = 0 central_node_index = 0
@ -38,10 +26,10 @@ async def test_join():
# All pubsub except the one of central node subscribe to topic # All pubsub except the one of central node subscribe to topic
for i in subscribed_peer_indices: for i in subscribed_peer_indices:
await pubsubs[i].subscribe(topic) await pubsubs_gsub[i].subscribe(topic)
# Connect central host to all other hosts # Connect central host to all other hosts
await one_to_all_connect(libp2p_hosts, central_node_index) await one_to_all_connect(hosts, central_node_index)
# Wait 2 seconds for heartbeat to allow mesh to connect # Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2) await asyncio.sleep(2)
@ -49,7 +37,7 @@ async def test_join():
# Central node publish to the topic so that this topic # Central node publish to the topic so that this topic
# is added to central node's fanout # is added to central node's fanout
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[central_node_index].publish(topic, b"data") await pubsubs_gsub[central_node_index].publish(topic, b"data")
# Check that the gossipsub of central node has fanout for the topic # Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout assert topic in gossipsubs[central_node_index].fanout
@ -57,7 +45,7 @@ async def test_join():
assert topic not in gossipsubs[central_node_index].mesh assert topic not in gossipsubs[central_node_index].mesh
# Central node subscribes the topic # Central node subscribes the topic
await pubsubs[central_node_index].subscribe(topic) await pubsubs_gsub[central_node_index].subscribe(topic)
await asyncio.sleep(2) await asyncio.sleep(2)
@ -66,35 +54,21 @@ async def test_join():
for i in hosts_indices: for i in hosts_indices:
if i in subscribed_peer_indices: if i in subscribed_peer_indices:
assert ( assert str(hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic]
str(libp2p_hosts[i].get_id()) assert str(hosts[central_node_index].get_id()) in gossipsubs[i].mesh[topic]
in gossipsubs[central_node_index].mesh[topic]
)
assert (
str(libp2p_hosts[central_node_index].get_id())
in gossipsubs[i].mesh[topic]
)
else: else:
assert ( assert (
str(libp2p_hosts[i].get_id()) str(hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic]
not in gossipsubs[central_node_index].mesh[topic]
) )
assert topic not in gossipsubs[i].mesh assert topic not in gossipsubs[i].mesh
await cleanup() await cleanup()
@pytest.mark.parametrize("num_hosts", (1,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_leave(): async def test_leave(pubsubs_gsub):
num_hosts = 1 gossipsub = pubsubs_gsub[0].router
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
_, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
gossipsub = gossipsubs[0]
topic = "test_leave" topic = "test_leave"
assert topic not in gossipsub.mesh assert topic not in gossipsub.mesh
@ -111,21 +85,14 @@ async def test_leave():
await cleanup() await cleanup()
@pytest.mark.parametrize("num_hosts", (2,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_graft(event_loop, monkeypatch): async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch):
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
_, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
index_alice = 0 index_alice = 0
id_alice = str(libp2p_hosts[index_alice].get_id()) id_alice = str(hosts[index_alice].get_id())
index_bob = 1 index_bob = 1
id_bob = str(libp2p_hosts[index_bob].get_id()) id_bob = str(hosts[index_bob].get_id())
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) await connect(hosts[index_alice], hosts[index_bob])
# Wait 2 seconds for heartbeat to allow mesh to connect # Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2) await asyncio.sleep(2)
@ -168,26 +135,21 @@ async def test_handle_graft(event_loop, monkeypatch):
await cleanup() await cleanup()
@pytest.mark.parametrize(
"num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),)
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_prune(): async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs):
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 3
)
index_alice = 0 index_alice = 0
id_alice = str(libp2p_hosts[index_alice].get_id()) id_alice = str(hosts[index_alice].get_id())
index_bob = 1 index_bob = 1
id_bob = str(libp2p_hosts[index_bob].get_id()) id_bob = str(hosts[index_bob].get_id())
topic = "test_handle_prune" topic = "test_handle_prune"
for pubsub in pubsubs: for pubsub in pubsubs_gsub:
await pubsub.subscribe(topic) await pubsub.subscribe(topic)
await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) await connect(hosts[index_alice], hosts[index_bob])
# Wait 3 seconds for heartbeat to allow mesh to connect # Wait 3 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(3) await asyncio.sleep(3)
@ -212,28 +174,21 @@ async def test_handle_prune():
await cleanup() await cleanup()
@pytest.mark.parametrize("num_hosts", (10,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_dense(): async def test_dense(num_hosts, pubsubs_gsub, hosts):
# Create libp2p hosts
num_hosts = 10
num_msgs = 5 num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar # All pubsub subscribe to foobar
queues = [] queues = []
for pubsub in pubsubs: for pubsub in pubsubs_gsub:
q = await pubsub.subscribe("foobar") q = await pubsub.subscribe("foobar")
# Add each blocking queue to an array of blocking queues # Add each blocking queue to an array of blocking queues
queues.append(q) queues.append(q)
# Sparsely connect libp2p hosts in random way # Densely connect libp2p hosts in a random way
await dense_connect(libp2p_hosts) await dense_connect(hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect # Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2) await asyncio.sleep(2)
@ -245,7 +200,7 @@ async def test_dense():
origin_idx = random.randint(0, num_hosts - 1) origin_idx = random.randint(0, num_hosts - 1)
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[origin_idx].publish("foobar", msg_content) await pubsubs_gsub[origin_idx].publish("foobar", msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
@ -255,28 +210,21 @@ async def test_dense():
await cleanup() await cleanup()
@pytest.mark.parametrize("num_hosts", (10,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fanout(): async def test_fanout(hosts, pubsubs_gsub):
# Create libp2p hosts
num_hosts = 10
num_msgs = 5 num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances # All pubsub subscribe to foobar except for `pubsubs_gsub[0]`
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar except for `pubsubs[0]`
queues = [] queues = []
for i in range(1, len(pubsubs)): for i in range(1, len(pubsubs_gsub)):
q = await pubsubs[i].subscribe("foobar") q = await pubsubs_gsub[i].subscribe("foobar")
# Add each blocking queue to an array of blocking queues # Add each blocking queue to an array of blocking queues
queues.append(q) queues.append(q)
# Sparsely connect libp2p hosts in random way # Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts) await dense_connect(hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect # Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2) await asyncio.sleep(2)
@ -290,7 +238,7 @@ async def test_fanout():
origin_idx = 0 origin_idx = 0
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content) await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
@ -299,7 +247,7 @@ async def test_fanout():
assert msg.data == msg_content assert msg.data == msg_content
# Subscribe message origin # Subscribe message origin
queues.insert(0, await pubsubs[0].subscribe(topic)) queues.insert(0, await pubsubs_gsub[0].subscribe(topic))
# Send messages again # Send messages again
for i in range(num_msgs): for i in range(num_msgs):
@ -309,7 +257,7 @@ async def test_fanout():
origin_idx = 0 origin_idx = 0
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content) await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
@ -320,29 +268,22 @@ async def test_fanout():
await cleanup() await cleanup()
@pytest.mark.parametrize("num_hosts", (10,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fanout_maintenance(): async def test_fanout_maintenance(hosts, pubsubs_gsub):
# Create libp2p hosts
num_hosts = 10
num_msgs = 5 num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5
)
# All pubsub subscribe to foobar # All pubsub subscribe to foobar
queues = [] queues = []
topic = "foobar" topic = "foobar"
for i in range(1, len(pubsubs)): for i in range(1, len(pubsubs_gsub)):
q = await pubsubs[i].subscribe(topic) q = await pubsubs_gsub[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues # Add each blocking queue to an array of blocking queues
queues.append(q) queues.append(q)
# Sparsely connect libp2p hosts in random way # Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts) await dense_connect(hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect # Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2) await asyncio.sleep(2)
@ -355,7 +296,7 @@ async def test_fanout_maintenance():
origin_idx = 0 origin_idx = 0
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content) await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
@ -363,7 +304,7 @@ async def test_fanout_maintenance():
msg = await queue.get() msg = await queue.get()
assert msg.data == msg_content assert msg.data == msg_content
for sub in pubsubs: for sub in pubsubs_gsub:
await sub.unsubscribe(topic) await sub.unsubscribe(topic)
queues = [] queues = []
@ -371,8 +312,8 @@ async def test_fanout_maintenance():
await asyncio.sleep(2) await asyncio.sleep(2)
# Resub and repeat # Resub and repeat
for i in range(1, len(pubsubs)): for i in range(1, len(pubsubs_gsub)):
q = await pubsubs[i].subscribe(topic) q = await pubsubs_gsub[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues # Add each blocking queue to an array of blocking queues
queues.append(q) queues.append(q)
@ -387,7 +328,7 @@ async def test_fanout_maintenance():
origin_idx = 0 origin_idx = 0
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[origin_idx].publish(topic, msg_content) await pubsubs_gsub[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
@ -398,28 +339,34 @@ async def test_fanout_maintenance():
await cleanup() await cleanup()
@pytest.mark.parametrize(
"num_hosts, gossipsub_params",
(
(
2,
GossipsubParams(
degree=1,
degree_low=0,
degree_high=2,
gossip_window=50,
gossip_history=100,
),
),
),
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gossip_propagation(): async def test_gossip_propagation(hosts, pubsubs_gsub):
# Create libp2p hosts
num_hosts = 2
hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, _ = create_pubsub_and_gossipsub_instances(
hosts, SUPPORTED_PROTOCOLS, 1, 0, 2, 30, 50, 100, 0.5
)
topic = "foo" topic = "foo"
await pubsubs[0].subscribe(topic) await pubsubs_gsub[0].subscribe(topic)
# node 0 publish to topic # node 0 publish to topic
msg_content = b"foo_msg" msg_content = b"foo_msg"
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[0].publish(topic, msg_content) await pubsubs_gsub[0].publish(topic, msg_content)
# now node 1 subscribes # now node 1 subscribes
queue_1 = await pubsubs[1].subscribe(topic) queue_1 = await pubsubs_gsub[1].subscribe(topic)
await connect(hosts[0], hosts[1]) await connect(hosts[0], hosts[1])

View File

@ -2,13 +2,8 @@ import functools
import pytest import pytest
from libp2p import new_node from .configs import FLOODSUB_PROTOCOL_ID
from libp2p.pubsub.gossipsub import GossipSub from .factories import GossipsubFactory
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import cleanup
from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR
from .floodsub_integration_test_settings import ( from .floodsub_integration_test_settings import (
perform_test_from_obj, perform_test_from_obj,
floodsub_protocol_pytest_params, floodsub_protocol_pytest_params,
@ -18,17 +13,7 @@ from .floodsub_integration_test_settings import (
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gossipsub_initialize_with_floodsub_protocol(): async def test_gossipsub_initialize_with_floodsub_protocol():
node = await new_node(transport_opt=[str(LISTEN_MADDR)]) GossipsubFactory(protocols=[FLOODSUB_PROTOCOL_ID])
await node.get_network().listen(LISTEN_MADDR)
gossipsub = GossipSub([FLOODSUB_PROTOCOL_ID], 3, 2, 4, 30)
pubsub = Pubsub(node, gossipsub, "a")
# Did it work?
assert gossipsub and pubsub
await cleanup()
@pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params) @pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params)
@ -37,6 +22,6 @@ async def test_gossipsub_run_with_floodsub_tests(test_case_obj):
await perform_test_from_obj( await perform_test_from_obj(
test_case_obj, test_case_obj,
functools.partial( functools.partial(
GossipSub, degree=3, degree_low=2, degree_high=4, time_to_live=30 GossipsubFactory, degree=3, degree_low=2, degree_high=4, time_to_live=30
), ),
) )

View File

@ -1,66 +0,0 @@
# pylint: disable=redefined-outer-name
import pytest
from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from .configs import FLOODSUB_PROTOCOL_ID
SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
TESTING_TOPIC = "TEST_SUBSCRIBE"
class NoConnNode:
# pylint: disable=too-few-public-methods
def __init__(self, host, pubsub):
self.host = host
self.pubsub = pubsub
@classmethod
async def create(cls):
host = await new_node()
floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
pubsub = Pubsub(host, floodsub, "test")
return cls(host, pubsub)
@pytest.fixture
async def node():
return await NoConnNode.create()
@pytest.mark.asyncio
async def test_subscribe_unsubscribe(node):
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
await node.pubsub.unsubscribe(TESTING_TOPIC)
assert TESTING_TOPIC not in node.pubsub.my_topics
@pytest.mark.asyncio
async def test_re_subscribe(node):
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
@pytest.mark.asyncio
async def test_re_unsubscribe(node):
# Unsubscribe from topic we didn't even subscribe to
assert "NOT_MY_TOPIC" not in node.pubsub.my_topics
await node.pubsub.unsubscribe("NOT_MY_TOPIC")
await node.pubsub.subscribe(TESTING_TOPIC)
assert TESTING_TOPIC in node.pubsub.my_topics
await node.pubsub.unsubscribe(TESTING_TOPIC)
assert TESTING_TOPIC not in node.pubsub.my_topics
await node.pubsub.unsubscribe(TESTING_TOPIC)
assert TESTING_TOPIC not in node.pubsub.my_topics

View File

@ -1,19 +1,11 @@
import asyncio
import struct import struct
from typing import Sequence from typing import Sequence
import multiaddr
from libp2p import new_node
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import connect from tests.utils import connect
from .configs import LISTEN_MADDR
def message_id_generator(start_val): def message_id_generator(start_val):
""" """
@ -44,80 +36,6 @@ def make_pubsub_msg(
) )
def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
"""
Generate RPC packet to send over wire
:param origin_id: peer id of the message origin
:param topics: list of topics
:param msg_content: string of content in data
:param msg_id: seqno for the message
"""
packet = rpc_pb2.RPC()
message = rpc_pb2.Message(
from_id=origin_id.encode("utf-8"),
seqno=msg_id,
data=msg_content.encode("utf-8"),
)
for topic in topics:
message.topicIDs.extend([topic.encode("utf-8")])
packet.publish.extend([message])
return packet
async def create_libp2p_hosts(num_hosts):
"""
Create libp2p hosts
:param num_hosts: number of hosts to create
"""
hosts = []
tasks_create = []
for i in range(0, num_hosts):
# Create node
tasks_create.append(new_node(transport_opt=[str(LISTEN_MADDR)]))
hosts = await asyncio.gather(*tasks_create)
tasks_listen = []
for node in hosts:
# Start listener
tasks_listen.append(node.get_network().listen(LISTEN_MADDR))
await asyncio.gather(*tasks_listen)
return hosts
def create_pubsub_and_gossipsub_instances(
libp2p_hosts,
supported_protocols,
degree,
degree_low,
degree_high,
time_to_live,
gossip_window,
gossip_history,
heartbeat_interval,
):
pubsubs = []
gossipsubs = []
for node in libp2p_hosts:
gossipsub = GossipSub(
supported_protocols,
degree,
degree_low,
degree_high,
time_to_live,
gossip_window,
gossip_history,
heartbeat_interval,
)
pubsub = Pubsub(node, gossipsub, node.get_id())
pubsubs.append(pubsub)
gossipsubs.append(gossipsub)
return pubsubs, gossipsubs
# FIXME: There is no difference between `sparse_connect` and `dense_connect`, # FIXME: There is no difference between `sparse_connect` and `dense_connect`,
# before `connect_some` is fixed. # before `connect_some` is fixed.
@ -130,12 +48,17 @@ async def dense_connect(hosts):
await connect_some(hosts, 10) await connect_some(hosts, 10)
async def connect_all(hosts):
for i, host in enumerate(hosts):
for host2 in hosts[i + 1 :]:
await connect(host, host2)
# FIXME: `degree` is not used at all # FIXME: `degree` is not used at all
async def connect_some(hosts, degree): async def connect_some(hosts, degree):
for i, host in enumerate(hosts): for i, host in enumerate(hosts):
for j, host2 in enumerate(hosts): for host2 in hosts[i + 1 :]:
if i != j and i < j: await connect(host, host2)
await connect(host, host2)
# TODO: USE THE CODE BELOW # TODO: USE THE CODE BELOW
# for i, host in enumerate(hosts): # for i, host in enumerate(hosts):

View File

@ -7,7 +7,9 @@ from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.protocol_muxer.multiselect_client import MultiselectClientError from libp2p.protocol_muxer.multiselect_client import MultiselectClientError
from libp2p.security.insecure_security import InsecureTransport from libp2p.security.insecure_security import InsecureTransport
from libp2p.security.simple_security import SimpleSecurityTransport from libp2p.security.simple_security import SimpleSecurityTransport
from tests.utils import cleanup
from tests.utils import cleanup, connect
# TODO: Add tests for multiple streams being opened on different # TODO: Add tests for multiple streams being opened on different
# protocols through the same connection # protocols through the same connection
@ -19,15 +21,6 @@ def peer_id_for_node(node):
return info.peer_id return info.peer_id
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
async def perform_simple_test( async def perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator assertion_func, transports_for_initiator, transports_for_noninitiator
): ):

View File

@ -13,8 +13,6 @@ async def connect(node1, node2):
addr = node2.get_addrs()[0] addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr) info = info_from_p2p_addr(addr)
await node1.connect(info) await node1.connect(info)
assert node1.get_id() in node2.get_network().connections
assert node2.get_id() in node1.get_network().connections
async def cleanup(): async def cleanup():