diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 744c37c..9534a95 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -340,11 +340,11 @@ class Pubsub: :param msg_forwarder: the peer who forward us the message. :param msg: the message we are going to push out. """ - # TODO: - Check if the `source` is in the blacklist. If yes, reject. + # TODO: Check if the `source` is in the blacklist. If yes, reject. - # TODO: - Check if the `from` is in the blacklist. If yes, reject. + # TODO: Check if the `from` is in the blacklist. If yes, reject. - # TODO: - Check if signing is required and if so signature should be attached. + # TODO: Check if signing is required and if so signature should be attached. if self._is_msg_seen(msg): return diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py new file mode 100644 index 0000000..a2a3419 --- /dev/null +++ b/tests/pubsub/conftest.py @@ -0,0 +1,109 @@ +import asyncio +from typing import NamedTuple + +import pytest + +from multiaddr import Multiaddr + +from libp2p import new_node + +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub + +from .configs import ( + FLOODSUB_PROTOCOL_ID, + GOSSIPSUB_PROTOCOL_ID, + LISTEN_MADDR, +) + + +# pylint: disable=redefined-outer-name + +@pytest.fixture +def num_hosts(): + return 3 + + +@pytest.fixture +async def hosts(num_hosts): + new_node_coros = tuple( + new_node(transport_opt=[str(LISTEN_MADDR)]) + for _ in range(num_hosts) + ) + _hosts = await asyncio.gather(*new_node_coros) + await asyncio.gather(*[ + _host.get_network().listen(LISTEN_MADDR) + for _host in _hosts + ]) + yield _hosts + # Clean up + for _host in _hosts: + for listener in _host.get_network().listeners.values(): + listener.server.close() + await listener.server.wait_closed() + + +@pytest.fixture +def floodsubs(num_hosts): + return tuple( + FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) + for _ in range(num_hosts) + ) + + +class GossipsubParams(NamedTuple): + degree: int = 10 + degree_low: int = 9 + degree_high: int = 11 + fanout_ttl: int = 30 + gossip_window: int = 3 + gossip_history: int = 5 + heartbeat_interval: float = 0.5 + + +@pytest.fixture +def gossipsub_params(): + return GossipsubParams() + + +@pytest.fixture +def gossipsubs(num_hosts, gossipsub_params): + yield tuple( + GossipSub( + protocols=[GOSSIPSUB_PROTOCOL_ID], + **gossipsub_params._asdict(), + ) + for _ in range(num_hosts) + ) + # TODO: Clean up + + +def _make_pubsubs(hosts, pubsub_routers): + if len(pubsub_routers) != len(hosts): + raise ValueError( + f"lenght of pubsub_routers={pubsub_routers} should be equaled to " + f"hosts={hosts}" + ) + return tuple( + Pubsub( + host=host, + router=router, + my_id=host.get_id(), + ) + for host, router in zip(hosts, pubsub_routers) + ) + + +@pytest.fixture +def pubsubs_fsub(hosts, floodsubs): + _pubsubs_gsub = _make_pubsubs(hosts, floodsubs) + yield _pubsubs_gsub + # TODO: Clean up + + +@pytest.fixture +def pubsubs_gsub(hosts, gossipsubs): + _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs) + yield _pubsubs_gsub + # TODO: Clean up diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index c3d3e24..a5876b8 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -39,9 +39,9 @@ async def test_simple_two_nodes(): data = b"some data" floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32)) + pubsub_a = Pubsub(node_a, floodsub_a, ID(b"\x12\x20" + b"a" * 32)) floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32)) + pubsub_b = Pubsub(node_b, floodsub_b, ID(b"\x12\x20" + b"a" * 32)) await connect(node_a, node_b) await asyncio.sleep(0.25) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py new file mode 100644 index 0000000..fd0515a --- /dev/null +++ b/tests/pubsub/test_pubsub.py @@ -0,0 +1,11 @@ +import pytest + + +@pytest.mark.asyncio +async def test_test(pubsubs_fsub): + topic = "topic" + data = b"data" + sub = await pubsubs_fsub[0].subscribe(topic) + await pubsubs_fsub[0].publish(topic, data) + msg = await sub.get() + assert msg.data == data diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index 93fc235..a0cc027 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -80,7 +80,7 @@ async def create_libp2p_hosts(num_hosts): tasks_create = [] for i in range(0, num_hosts): # Create node - tasks_create.append(asyncio.ensure_future(new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]))) + tasks_create.append(new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])) hosts = await asyncio.gather(*tasks_create) tasks_listen = []