diff --git a/setup.py b/setup.py index 2ca4b58..b24d0cf 100644 --- a/setup.py +++ b/setup.py @@ -8,6 +8,7 @@ classifiers = [f"Programming Language :: Python :: {version}" for version in ["3 extras_require = { "test": [ "codecov>=2.0.15,<3.0.0", + "factory-boy>=2.12.0,<3.0.0", "pytest>=4.6.3,<5.0.0", "pytest-cov>=2.7.1,<3.0.0", "pytest-asyncio>=0.10.0,<1.0.0", diff --git a/tests/configs.py b/tests/configs.py new file mode 100644 index 0000000..e02cd64 --- /dev/null +++ b/tests/configs.py @@ -0,0 +1,3 @@ +import multiaddr + +LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py index 99295e4..31f50bb 100644 --- a/tests/pubsub/configs.py +++ b/tests/pubsub/configs.py @@ -1,7 +1,18 @@ -import multiaddr +from typing import NamedTuple FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" -LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + +class GossipsubParams(NamedTuple): + degree: int = 10 + degree_low: int = 9 + degree_high: int = 11 + time_to_live: int = 30 + gossip_window: int = 3 + gossip_history: int = 5 + heartbeat_interval: float = 0.5 + + +GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index 93689ae..a531175 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -1,17 +1,11 @@ import asyncio -from typing import NamedTuple import pytest -from multiaddr import Multiaddr +from tests.configs import LISTEN_MADDR -from libp2p import new_node - -from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.gossipsub import GossipSub -from libp2p.pubsub.pubsub import Pubsub - -from .configs import FLOODSUB_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID, LISTEN_MADDR +from .configs import GOSSIPSUB_PARAMS +from .factories import FloodsubFactory, GossipsubFactory, HostFactory, PubsubFactory # pylint: disable=redefined-outer-name @@ -24,9 +18,7 @@ def num_hosts(): @pytest.fixture async def hosts(num_hosts): - _hosts = await asyncio.gather( - *[new_node(transport_opt=[str(LISTEN_MADDR)]) for _ in range(num_hosts)] - ) + _hosts = HostFactory.create_batch(num_hosts) await asyncio.gather( *[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts] ) @@ -42,54 +34,46 @@ async def hosts(num_hosts): @pytest.fixture def floodsubs(num_hosts): - return tuple(FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) for _ in range(num_hosts)) - - -class GossipsubParams(NamedTuple): - degree: int = 10 - degree_low: int = 9 - degree_high: int = 11 - fanout_ttl: int = 30 - gossip_window: int = 3 - gossip_history: int = 5 - heartbeat_interval: float = 0.5 + return FloodsubFactory.create_batch(num_hosts) @pytest.fixture def gossipsub_params(): - return GossipsubParams() + return GOSSIPSUB_PARAMS @pytest.fixture def gossipsubs(num_hosts, gossipsub_params): - yield tuple( - GossipSub(protocols=[GOSSIPSUB_PROTOCOL_ID], **gossipsub_params._asdict()) - for _ in range(num_hosts) - ) + yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) # TODO: Clean up -def _make_pubsubs(hosts, pubsub_routers): +def _make_pubsubs(hosts, pubsub_routers, cache_size): if len(pubsub_routers) != len(hosts): raise ValueError( f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " f"length of hosts={len(hosts)}" ) return tuple( - Pubsub(host=host, router=router, my_id=host.get_id()) + PubsubFactory(host=host, router=router, cache_size=cache_size) for host, router in zip(hosts, pubsub_routers) ) @pytest.fixture -def pubsubs_fsub(hosts, floodsubs): - _pubsubs_fsub = _make_pubsubs(hosts, floodsubs) +def pubsub_cache_size(): + return None # default + + +@pytest.fixture +def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size): + _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) yield _pubsubs_fsub # TODO: Clean up @pytest.fixture -def pubsubs_gsub(hosts, gossipsubs): - _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs) +def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size): + _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) yield _pubsubs_gsub # TODO: Clean up diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 186b06c..083e10e 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,18 +1,16 @@ import asyncio import uuid -import multiaddr - -from libp2p import new_node from libp2p.host.host_interface import IHost from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.pubsub import Pubsub -from .configs import FLOODSUB_PROTOCOL_ID +from tests.configs import LISTEN_MADDR + +from .factories import FloodsubFactory, PubsubFactory from .utils import message_id_generator -SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] CRYPTO_TOPIC = "ethereum" # Message format: @@ -52,14 +50,9 @@ class DummyAccountNode: to use async await, unlike the init function """ - libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await libp2p_node.get_network().listen( - multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") - ) - - floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) - pubsub = Pubsub(libp2p_node, floodsub, "a") - return cls(libp2p_node=libp2p_node, pubsub=pubsub, floodsub=floodsub) + pubsub = PubsubFactory(router=FloodsubFactory()) + await pubsub.host.get_network().listen(LISTEN_MADDR) + return cls(libp2p_node=pubsub.host, pubsub=pubsub, floodsub=pubsub.router) async def handle_incoming_msgs(self): """ diff --git a/tests/pubsub/factories.py b/tests/pubsub/factories.py new file mode 100644 index 0000000..d48ffa1 --- /dev/null +++ b/tests/pubsub/factories.py @@ -0,0 +1,55 @@ +import functools + +import factory + +from libp2p import initialize_default_swarm +from libp2p.host.basic_host import BasicHost +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub + +from tests.configs import LISTEN_MADDR + +from .configs import FLOODSUB_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID, GOSSIPSUB_PARAMS + + +def swarm_factory(): + return initialize_default_swarm(transport_opt=[str(LISTEN_MADDR)]) + + +class HostFactory(factory.Factory): + class Meta: + model = BasicHost + + network = factory.LazyFunction(swarm_factory) + + +class FloodsubFactory(factory.Factory): + class Meta: + model = FloodSub + + protocols = (FLOODSUB_PROTOCOL_ID,) + + +class GossipsubFactory(factory.Factory): + class Meta: + model = GossipSub + + protocols = (GOSSIPSUB_PROTOCOL_ID,) + degree = GOSSIPSUB_PARAMS.degree + degree_low = GOSSIPSUB_PARAMS.degree_low + degree_high = GOSSIPSUB_PARAMS.degree_high + time_to_live = GOSSIPSUB_PARAMS.time_to_live + gossip_window = GOSSIPSUB_PARAMS.gossip_window + gossip_history = GOSSIPSUB_PARAMS.gossip_history + heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval + + +class PubsubFactory(factory.Factory): + class Meta: + model = Pubsub + + host = factory.SubFactory(HostFactory) + router = None + my_id = factory.LazyAttribute(lambda obj: obj.host.get_id()) + cache_size = None diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py index 4d9f543..950d3c2 100644 --- a/tests/pubsub/floodsub_integration_test_settings.py +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -2,13 +2,11 @@ import asyncio import pytest -from libp2p import new_node -from libp2p.peer.id import ID -from libp2p.pubsub.pubsub import Pubsub - +from tests.configs import LISTEN_MADDR from tests.utils import cleanup, connect -from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR +from .configs import FLOODSUB_PROTOCOL_ID +from .factories import PubsubFactory SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] @@ -182,11 +180,10 @@ async def perform_test_from_obj(obj, router_factory): pubsub_map = {} async def add_node(node_id: str) -> None: - node = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node.get_network().listen(LISTEN_MADDR) - node_map[node_id] = node pubsub_router = router_factory(protocols=obj["supported_protocols"]) - pubsub = Pubsub(node, pubsub_router, ID(node_id.encode())) + pubsub = PubsubFactory(router=pubsub_router) + await pubsub.host.get_network().listen(LISTEN_MADDR) + node_map[node_id] = pubsub.host pubsub_map[node_id] = pubsub tasks_connect = [] diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index c2f2261..e92bd38 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -30,7 +30,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): # Create nodes dummy_nodes = [] - for i in range(num_nodes): + for _ in range(num_nodes): dummy_nodes.append(await DummyAccountNode.create()) # Create network diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index b15a7c7..3dc6645 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -1,56 +1,41 @@ import asyncio -import multiaddr + import pytest -from libp2p import new_node from libp2p.peer.id import ID -from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.pubsub import Pubsub from tests.utils import cleanup, connect -from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR +from .factories import FloodsubFactory from .floodsub_integration_test_settings import ( perform_test_from_obj, floodsub_protocol_pytest_params, ) -SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] # pylint: disable=too-many-locals +@pytest.mark.parametrize("num_hosts", (2,)) @pytest.mark.asyncio -async def test_simple_two_nodes(): - node_a = await new_node(transport_opt=[str(LISTEN_MADDR)]) - node_b = await new_node(transport_opt=[str(LISTEN_MADDR)]) - - await node_a.get_network().listen(LISTEN_MADDR) - await node_b.get_network().listen(LISTEN_MADDR) - - supported_protocols = [FLOODSUB_PROTOCOL_ID] +async def test_simple_two_nodes(pubsubs_fsub): topic = "my_topic" data = b"some data" - floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, ID(b"\x12\x20" + b"a" * 32)) - floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, ID(b"\x12\x20" + b"a" * 32)) - - await connect(node_a, node_b) + await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) await asyncio.sleep(0.25) - sub_b = await pubsub_b.subscribe(topic) + sub_b = await pubsubs_fsub[1].subscribe(topic) # Sleep to let a know of b's subscription await asyncio.sleep(0.25) - await pubsub_a.publish(topic, data) + await pubsubs_fsub[0].publish(topic, data) res_b = await sub_b.get() # Check that the msg received by node_b is the same # as the message sent by node_a - assert ID(res_b.from_id) == node_a.get_id() + assert ID(res_b.from_id) == pubsubs_fsub[0].host.get_id() assert res_b.data == data assert res_b.topicIDs == [topic] @@ -58,21 +43,16 @@ async def test_simple_two_nodes(): await cleanup() +# Initialize Pubsub with a cache_size of 4 +@pytest.mark.parametrize("num_hosts, pubsub_cache_size", ((2, 4),)) @pytest.mark.asyncio -async def test_lru_cache_two_nodes(monkeypatch): +async def test_lru_cache_two_nodes(pubsubs_fsub, monkeypatch): # two nodes with cache_size of 4 # `node_a` send the following messages to node_b message_indices = [1, 1, 2, 1, 3, 1, 4, 1, 5, 1] # `node_b` should only receive the following expected_received_indices = [1, 2, 3, 4, 5, 1] - node_a = await new_node(transport_opt=[str(LISTEN_MADDR)]) - node_b = await new_node(transport_opt=[str(LISTEN_MADDR)]) - - await node_a.get_network().listen(LISTEN_MADDR) - await node_b.get_network().listen(LISTEN_MADDR) - - supported_protocols = SUPPORTED_PROTOCOLS topic = "my_topic" # Mock `get_msg_id` to make us easier to manipulate `msg_id` by `data`. @@ -84,18 +64,10 @@ async def test_lru_cache_two_nodes(monkeypatch): monkeypatch.setattr(libp2p.pubsub.pubsub, "get_msg_id", get_msg_id) - # Initialize Pubsub with a cache_size of 4 - cache_size = 4 - floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32), cache_size) - - floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32), cache_size) - - await connect(node_a, node_b) + await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) await asyncio.sleep(0.25) - sub_b = await pubsub_b.subscribe(topic) + sub_b = await pubsubs_fsub[1].subscribe(topic) await asyncio.sleep(0.25) def _make_testing_data(i: int) -> bytes: @@ -105,7 +77,7 @@ async def test_lru_cache_two_nodes(monkeypatch): return b"data" + i.to_bytes(num_int_bytes, "big") for index in message_indices: - await pubsub_a.publish(topic, _make_testing_data(index)) + await pubsubs_fsub[0].publish(topic, _make_testing_data(index)) await asyncio.sleep(0.25) for index in expected_received_indices: @@ -120,4 +92,4 @@ async def test_lru_cache_two_nodes(monkeypatch): @pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params) @pytest.mark.asyncio async def test_gossipsub_run_with_floodsub_tests(test_case_obj): - await perform_test_from_obj(test_case_obj, FloodSub) + await perform_test_from_obj(test_case_obj, FloodsubFactory) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 58ed0e1..5dc72af 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -5,29 +5,17 @@ import pytest from tests.utils import cleanup, connect -from .configs import GOSSIPSUB_PROTOCOL_ID -from .utils import ( - create_libp2p_hosts, - create_pubsub_and_gossipsub_instances, - dense_connect, - one_to_all_connect, +from .configs import GossipsubParams +from .utils import dense_connect, one_to_all_connect + + +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", + ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),), ) - - -SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID] - - @pytest.mark.asyncio -async def test_join(): - # Create libp2p hosts - num_hosts = 4 +async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub): hosts_indices = list(range(num_hosts)) - libp2p_hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 4, 3, 5, 30, 3, 5, 0.5 - ) topic = "test_join" central_node_index = 0 @@ -38,10 +26,10 @@ async def test_join(): # All pubsub except the one of central node subscribe to topic for i in subscribed_peer_indices: - await pubsubs[i].subscribe(topic) + await pubsubs_gsub[i].subscribe(topic) # Connect central host to all other hosts - await one_to_all_connect(libp2p_hosts, central_node_index) + await one_to_all_connect(hosts, central_node_index) # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) @@ -49,7 +37,7 @@ async def test_join(): # Central node publish to the topic so that this topic # is added to central node's fanout # publish from the randomly chosen host - await pubsubs[central_node_index].publish(topic, b"data") + await pubsubs_gsub[central_node_index].publish(topic, b"data") # Check that the gossipsub of central node has fanout for the topic assert topic in gossipsubs[central_node_index].fanout @@ -57,7 +45,7 @@ async def test_join(): assert topic not in gossipsubs[central_node_index].mesh # Central node subscribes the topic - await pubsubs[central_node_index].subscribe(topic) + await pubsubs_gsub[central_node_index].subscribe(topic) await asyncio.sleep(2) @@ -66,35 +54,21 @@ async def test_join(): for i in hosts_indices: if i in subscribed_peer_indices: - assert ( - str(libp2p_hosts[i].get_id()) - in gossipsubs[central_node_index].mesh[topic] - ) - assert ( - str(libp2p_hosts[central_node_index].get_id()) - in gossipsubs[i].mesh[topic] - ) + assert str(hosts[i].get_id()) in gossipsubs[central_node_index].mesh[topic] + assert str(hosts[central_node_index].get_id()) in gossipsubs[i].mesh[topic] else: assert ( - str(libp2p_hosts[i].get_id()) - not in gossipsubs[central_node_index].mesh[topic] + str(hosts[i].get_id()) not in gossipsubs[central_node_index].mesh[topic] ) assert topic not in gossipsubs[i].mesh await cleanup() +@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_leave(): - num_hosts = 1 - libp2p_hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 - ) - - gossipsub = gossipsubs[0] +async def test_leave(pubsubs_gsub): + gossipsub = pubsubs_gsub[0].router topic = "test_leave" assert topic not in gossipsub.mesh @@ -111,21 +85,14 @@ async def test_leave(): await cleanup() +@pytest.mark.parametrize("num_hosts", (2,)) @pytest.mark.asyncio -async def test_handle_graft(event_loop, monkeypatch): - num_hosts = 2 - libp2p_hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - _, gossipsubs = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 - ) - +async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch): index_alice = 0 - id_alice = str(libp2p_hosts[index_alice].get_id()) + id_alice = str(hosts[index_alice].get_id()) index_bob = 1 - id_bob = str(libp2p_hosts[index_bob].get_id()) - await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) + id_bob = str(hosts[index_bob].get_id()) + await connect(hosts[index_alice], hosts[index_bob]) # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) @@ -168,26 +135,21 @@ async def test_handle_graft(event_loop, monkeypatch): await cleanup() +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", ((2, GossipsubParams(heartbeat_interval=3)),) +) @pytest.mark.asyncio -async def test_handle_prune(): - num_hosts = 2 - libp2p_hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 3 - ) - +async def test_handle_prune(pubsubs_gsub, hosts, gossipsubs): index_alice = 0 - id_alice = str(libp2p_hosts[index_alice].get_id()) + id_alice = str(hosts[index_alice].get_id()) index_bob = 1 - id_bob = str(libp2p_hosts[index_bob].get_id()) + id_bob = str(hosts[index_bob].get_id()) topic = "test_handle_prune" - for pubsub in pubsubs: + for pubsub in pubsubs_gsub: await pubsub.subscribe(topic) - await connect(libp2p_hosts[index_alice], libp2p_hosts[index_bob]) + await connect(hosts[index_alice], hosts[index_bob]) # Wait 3 seconds for heartbeat to allow mesh to connect await asyncio.sleep(3) @@ -212,28 +174,21 @@ async def test_handle_prune(): await cleanup() +@pytest.mark.parametrize("num_hosts", (10,)) @pytest.mark.asyncio -async def test_dense(): - # Create libp2p hosts - num_hosts = 10 +async def test_dense(num_hosts, pubsubs_gsub, hosts): num_msgs = 5 - libp2p_hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 - ) # All pubsub subscribe to foobar queues = [] - for pubsub in pubsubs: + for pubsub in pubsubs_gsub: q = await pubsub.subscribe("foobar") # Add each blocking queue to an array of blocking queues queues.append(q) - # Sparsely connect libp2p hosts in random way - await dense_connect(libp2p_hosts) + # Densely connect libp2p hosts in a random way + await dense_connect(hosts) # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) @@ -245,7 +200,7 @@ async def test_dense(): origin_idx = random.randint(0, num_hosts - 1) # publish from the randomly chosen host - await pubsubs[origin_idx].publish("foobar", msg_content) + await pubsubs_gsub[origin_idx].publish("foobar", msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message @@ -255,28 +210,21 @@ async def test_dense(): await cleanup() +@pytest.mark.parametrize("num_hosts", (10,)) @pytest.mark.asyncio -async def test_fanout(): - # Create libp2p hosts - num_hosts = 10 +async def test_fanout(hosts, pubsubs_gsub): num_msgs = 5 - libp2p_hosts = await create_libp2p_hosts(num_hosts) - # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 - ) - - # All pubsub subscribe to foobar except for `pubsubs[0]` + # All pubsub subscribe to foobar except for `pubsubs_gsub[0]` queues = [] - for i in range(1, len(pubsubs)): - q = await pubsubs[i].subscribe("foobar") + for i in range(1, len(pubsubs_gsub)): + q = await pubsubs_gsub[i].subscribe("foobar") # Add each blocking queue to an array of blocking queues queues.append(q) # Sparsely connect libp2p hosts in random way - await dense_connect(libp2p_hosts) + await dense_connect(hosts) # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) @@ -290,7 +238,7 @@ async def test_fanout(): origin_idx = 0 # publish from the randomly chosen host - await pubsubs[origin_idx].publish(topic, msg_content) + await pubsubs_gsub[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message @@ -299,7 +247,7 @@ async def test_fanout(): assert msg.data == msg_content # Subscribe message origin - queues.insert(0, await pubsubs[0].subscribe(topic)) + queues.insert(0, await pubsubs_gsub[0].subscribe(topic)) # Send messages again for i in range(num_msgs): @@ -309,7 +257,7 @@ async def test_fanout(): origin_idx = 0 # publish from the randomly chosen host - await pubsubs[origin_idx].publish(topic, msg_content) + await pubsubs_gsub[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message @@ -320,29 +268,22 @@ async def test_fanout(): await cleanup() +@pytest.mark.parametrize("num_hosts", (10,)) @pytest.mark.asyncio -async def test_fanout_maintenance(): - # Create libp2p hosts - num_hosts = 10 +async def test_fanout_maintenance(hosts, pubsubs_gsub): num_msgs = 5 - libp2p_hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances( - libp2p_hosts, SUPPORTED_PROTOCOLS, 10, 9, 11, 30, 3, 5, 0.5 - ) # All pubsub subscribe to foobar queues = [] topic = "foobar" - for i in range(1, len(pubsubs)): - q = await pubsubs[i].subscribe(topic) + for i in range(1, len(pubsubs_gsub)): + q = await pubsubs_gsub[i].subscribe(topic) # Add each blocking queue to an array of blocking queues queues.append(q) # Sparsely connect libp2p hosts in random way - await dense_connect(libp2p_hosts) + await dense_connect(hosts) # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) @@ -355,7 +296,7 @@ async def test_fanout_maintenance(): origin_idx = 0 # publish from the randomly chosen host - await pubsubs[origin_idx].publish(topic, msg_content) + await pubsubs_gsub[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message @@ -363,7 +304,7 @@ async def test_fanout_maintenance(): msg = await queue.get() assert msg.data == msg_content - for sub in pubsubs: + for sub in pubsubs_gsub: await sub.unsubscribe(topic) queues = [] @@ -371,8 +312,8 @@ async def test_fanout_maintenance(): await asyncio.sleep(2) # Resub and repeat - for i in range(1, len(pubsubs)): - q = await pubsubs[i].subscribe(topic) + for i in range(1, len(pubsubs_gsub)): + q = await pubsubs_gsub[i].subscribe(topic) # Add each blocking queue to an array of blocking queues queues.append(q) @@ -387,7 +328,7 @@ async def test_fanout_maintenance(): origin_idx = 0 # publish from the randomly chosen host - await pubsubs[origin_idx].publish(topic, msg_content) + await pubsubs_gsub[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message @@ -398,28 +339,34 @@ async def test_fanout_maintenance(): await cleanup() +@pytest.mark.parametrize( + "num_hosts, gossipsub_params", + ( + ( + 2, + GossipsubParams( + degree=1, + degree_low=0, + degree_high=2, + gossip_window=50, + gossip_history=100, + ), + ), + ), +) @pytest.mark.asyncio -async def test_gossip_propagation(): - # Create libp2p hosts - num_hosts = 2 - hosts = await create_libp2p_hosts(num_hosts) - - # Create pubsub, gossipsub instances - pubsubs, _ = create_pubsub_and_gossipsub_instances( - hosts, SUPPORTED_PROTOCOLS, 1, 0, 2, 30, 50, 100, 0.5 - ) - +async def test_gossip_propagation(hosts, pubsubs_gsub): topic = "foo" - await pubsubs[0].subscribe(topic) + await pubsubs_gsub[0].subscribe(topic) # node 0 publish to topic msg_content = b"foo_msg" # publish from the randomly chosen host - await pubsubs[0].publish(topic, msg_content) + await pubsubs_gsub[0].publish(topic, msg_content) # now node 1 subscribes - queue_1 = await pubsubs[1].subscribe(topic) + queue_1 = await pubsubs_gsub[1].subscribe(topic) await connect(hosts[0], hosts[1]) diff --git a/tests/pubsub/test_gossipsub_backward_compatibility.py b/tests/pubsub/test_gossipsub_backward_compatibility.py index 855a0cc..8f7469c 100644 --- a/tests/pubsub/test_gossipsub_backward_compatibility.py +++ b/tests/pubsub/test_gossipsub_backward_compatibility.py @@ -2,13 +2,8 @@ import functools import pytest -from libp2p import new_node -from libp2p.pubsub.gossipsub import GossipSub -from libp2p.pubsub.pubsub import Pubsub - -from tests.utils import cleanup - -from .configs import FLOODSUB_PROTOCOL_ID, LISTEN_MADDR +from .configs import FLOODSUB_PROTOCOL_ID +from .factories import GossipsubFactory from .floodsub_integration_test_settings import ( perform_test_from_obj, floodsub_protocol_pytest_params, @@ -18,17 +13,7 @@ from .floodsub_integration_test_settings import ( # pylint: disable=too-many-locals @pytest.mark.asyncio async def test_gossipsub_initialize_with_floodsub_protocol(): - node = await new_node(transport_opt=[str(LISTEN_MADDR)]) - - await node.get_network().listen(LISTEN_MADDR) - - gossipsub = GossipSub([FLOODSUB_PROTOCOL_ID], 3, 2, 4, 30) - pubsub = Pubsub(node, gossipsub, "a") - - # Did it work? - assert gossipsub and pubsub - - await cleanup() + GossipsubFactory(protocols=[FLOODSUB_PROTOCOL_ID]) @pytest.mark.parametrize("test_case_obj", floodsub_protocol_pytest_params) @@ -37,6 +22,6 @@ async def test_gossipsub_run_with_floodsub_tests(test_case_obj): await perform_test_from_obj( test_case_obj, functools.partial( - GossipSub, degree=3, degree_low=2, degree_high=4, time_to_live=30 + GossipsubFactory, degree=3, degree_low=2, degree_high=4, time_to_live=30 ), ) diff --git a/tests/pubsub/test_subscription.py b/tests/pubsub/test_subscription.py deleted file mode 100644 index 0dfdc4d..0000000 --- a/tests/pubsub/test_subscription.py +++ /dev/null @@ -1,66 +0,0 @@ -# pylint: disable=redefined-outer-name -import pytest - -from libp2p import new_node -from libp2p.pubsub.pubsub import Pubsub -from libp2p.pubsub.floodsub import FloodSub - -from .configs import FLOODSUB_PROTOCOL_ID - - -SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] -TESTING_TOPIC = "TEST_SUBSCRIBE" - - -class NoConnNode: - # pylint: disable=too-few-public-methods - - def __init__(self, host, pubsub): - self.host = host - self.pubsub = pubsub - - @classmethod - async def create(cls): - host = await new_node() - floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) - pubsub = Pubsub(host, floodsub, "test") - return cls(host, pubsub) - - -@pytest.fixture -async def node(): - return await NoConnNode.create() - - -@pytest.mark.asyncio -async def test_subscribe_unsubscribe(node): - await node.pubsub.subscribe(TESTING_TOPIC) - assert TESTING_TOPIC in node.pubsub.my_topics - - await node.pubsub.unsubscribe(TESTING_TOPIC) - assert TESTING_TOPIC not in node.pubsub.my_topics - - -@pytest.mark.asyncio -async def test_re_subscribe(node): - await node.pubsub.subscribe(TESTING_TOPIC) - assert TESTING_TOPIC in node.pubsub.my_topics - - await node.pubsub.subscribe(TESTING_TOPIC) - assert TESTING_TOPIC in node.pubsub.my_topics - - -@pytest.mark.asyncio -async def test_re_unsubscribe(node): - # Unsubscribe from topic we didn't even subscribe to - assert "NOT_MY_TOPIC" not in node.pubsub.my_topics - await node.pubsub.unsubscribe("NOT_MY_TOPIC") - - await node.pubsub.subscribe(TESTING_TOPIC) - assert TESTING_TOPIC in node.pubsub.my_topics - - await node.pubsub.unsubscribe(TESTING_TOPIC) - assert TESTING_TOPIC not in node.pubsub.my_topics - - await node.pubsub.unsubscribe(TESTING_TOPIC) - assert TESTING_TOPIC not in node.pubsub.my_topics diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index c74d733..0314bc1 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,19 +1,11 @@ -import asyncio import struct from typing import Sequence -import multiaddr - -from libp2p import new_node from libp2p.peer.id import ID -from libp2p.pubsub.gossipsub import GossipSub from libp2p.pubsub.pb import rpc_pb2 -from libp2p.pubsub.pubsub import Pubsub from tests.utils import connect -from .configs import LISTEN_MADDR - def message_id_generator(start_val): """ @@ -44,80 +36,6 @@ def make_pubsub_msg( ) -def generate_RPC_packet(origin_id, topics, msg_content, msg_id): - """ - Generate RPC packet to send over wire - :param origin_id: peer id of the message origin - :param topics: list of topics - :param msg_content: string of content in data - :param msg_id: seqno for the message - """ - packet = rpc_pb2.RPC() - message = rpc_pb2.Message( - from_id=origin_id.encode("utf-8"), - seqno=msg_id, - data=msg_content.encode("utf-8"), - ) - - for topic in topics: - message.topicIDs.extend([topic.encode("utf-8")]) - - packet.publish.extend([message]) - return packet - - -async def create_libp2p_hosts(num_hosts): - """ - Create libp2p hosts - :param num_hosts: number of hosts to create - """ - hosts = [] - tasks_create = [] - for i in range(0, num_hosts): - # Create node - tasks_create.append(new_node(transport_opt=[str(LISTEN_MADDR)])) - hosts = await asyncio.gather(*tasks_create) - - tasks_listen = [] - for node in hosts: - # Start listener - tasks_listen.append(node.get_network().listen(LISTEN_MADDR)) - await asyncio.gather(*tasks_listen) - - return hosts - - -def create_pubsub_and_gossipsub_instances( - libp2p_hosts, - supported_protocols, - degree, - degree_low, - degree_high, - time_to_live, - gossip_window, - gossip_history, - heartbeat_interval, -): - pubsubs = [] - gossipsubs = [] - for node in libp2p_hosts: - gossipsub = GossipSub( - supported_protocols, - degree, - degree_low, - degree_high, - time_to_live, - gossip_window, - gossip_history, - heartbeat_interval, - ) - pubsub = Pubsub(node, gossipsub, node.get_id()) - pubsubs.append(pubsub) - gossipsubs.append(gossipsub) - - return pubsubs, gossipsubs - - # FIXME: There is no difference between `sparse_connect` and `dense_connect`, # before `connect_some` is fixed. @@ -130,12 +48,17 @@ async def dense_connect(hosts): await connect_some(hosts, 10) +async def connect_all(hosts): + for i, host in enumerate(hosts): + for host2 in hosts[i + 1 :]: + await connect(host, host2) + + # FIXME: `degree` is not used at all async def connect_some(hosts, degree): for i, host in enumerate(hosts): - for j, host2 in enumerate(hosts): - if i != j and i < j: - await connect(host, host2) + for host2 in hosts[i + 1 :]: + await connect(host, host2) # TODO: USE THE CODE BELOW # for i, host in enumerate(hosts): diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index fe9409d..674ff72 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -7,7 +7,9 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.protocol_muxer.multiselect_client import MultiselectClientError from libp2p.security.insecure_security import InsecureTransport from libp2p.security.simple_security import SimpleSecurityTransport -from tests.utils import cleanup + +from tests.utils import cleanup, connect + # TODO: Add tests for multiple streams being opened on different # protocols through the same connection @@ -19,15 +21,6 @@ def peer_id_for_node(node): return info.peer_id -async def connect(node1, node2): - """ - Connect node1 to node2 - """ - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) - - async def perform_simple_test( assertion_func, transports_for_initiator, transports_for_noninitiator ): diff --git a/tests/utils.py b/tests/utils.py index cc914c0..36a4d9b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,8 +13,6 @@ async def connect(node1, node2): addr = node2.get_addrs()[0] info = info_from_p2p_addr(addr) await node1.connect(info) - assert node1.get_id() in node2.get_network().connections - assert node2.get_id() in node1.get_network().connections async def cleanup():