diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 895ceb5..bccfdac 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -4,6 +4,7 @@ from typing import Callable, Dict, List, Sequence from multiaddr import Multiaddr from libp2p.peer.id import ID +from libp2p.peer.peerstore import PeerStoreError from libp2p.peer.peerstore_interface import IPeerStore from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect_client import MultiselectClient @@ -15,7 +16,6 @@ from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import StreamHandlerFn, TProtocol -from libp2p.peer.peerstore import PeerStoreError from .connection.raw_connection import RawConnection from .exceptions import SwarmException diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 3fc713b..3ded0fe 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -2,6 +2,7 @@ from typing import Iterable, List, Sequence from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed from .pb import rpc_pb2 from .pubsub import Pubsub @@ -78,7 +79,7 @@ class FloodSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - await stream.write(rpc_msg.SerializeToString()) + await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) async def join(self, topic: str) -> None: """ diff --git a/tests/conftest.py b/tests/conftest.py index 188c655..fd753be 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,7 @@ import asyncio import pytest from .configs import LISTEN_MADDR -from .factories import FloodsubFactory, GossipsubFactory, HostFactory, PubsubFactory -from .pubsub.configs import GOSSIPSUB_PARAMS +from .factories import HostFactory @pytest.fixture @@ -32,50 +31,3 @@ async def hosts(num_hosts, is_host_secure): await asyncio.gather( *[_host.close() for _host in _hosts], return_exceptions=True ) - - -@pytest.fixture -def floodsubs(num_hosts): - return FloodsubFactory.create_batch(num_hosts) - - -@pytest.fixture -def gossipsub_params(): - return GOSSIPSUB_PARAMS - - -@pytest.fixture -def gossipsubs(num_hosts, gossipsub_params): - yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) - # TODO: Clean up - - -def _make_pubsubs(hosts, pubsub_routers, cache_size): - if len(pubsub_routers) != len(hosts): - raise ValueError( - f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " - f"length of hosts={len(hosts)}" - ) - return tuple( - PubsubFactory(host=host, router=router, cache_size=cache_size) - for host, router in zip(hosts, pubsub_routers) - ) - - -@pytest.fixture -def pubsub_cache_size(): - return None # default - - -@pytest.fixture -def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size): - _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) - yield _pubsubs_fsub - # TODO: Clean up - - -@pytest.fixture -def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size): - _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) - yield _pubsubs_gsub - # TODO: Clean up diff --git a/tests/interop/conftest.py b/tests/interop/conftest.py index ef9de3f..06dfeec 100644 --- a/tests/interop/conftest.py +++ b/tests/interop/conftest.py @@ -4,6 +4,9 @@ import sys import pexpect import pytest +from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory +from tests.pubsub.configs import GOSSIPSUB_PARAMS + from .daemon import make_p2pd @@ -33,11 +36,35 @@ def num_p2pds(): @pytest.fixture -async def p2pds(num_p2pds, is_host_secure, unused_tcp_port_factory): +def is_gossipsub(): + return True + + +@pytest.fixture +async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory): p2pds = await asyncio.gather( - *[make_p2pd(unused_tcp_port_factory, is_host_secure) for _ in range(num_p2pds)] + *[ + make_p2pd( + unused_tcp_port_factory, is_host_secure, is_gossipsub=is_gossipsub + ) + for _ in range(num_p2pds) + ] ) try: yield p2pds finally: await asyncio.gather(*[p2pd.close() for p2pd in p2pds]) + + +@pytest.fixture +def pubsubs(num_hosts, hosts, is_gossipsub): + routers = None + if is_gossipsub: + routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict()) + else: + routers = FloodsubFactory.create_batch(num_hosts) + _pubsubs = tuple( + PubsubFactory(host=host, router=router) for host, router in zip(hosts, routers) + ) + yield _pubsubs + # TODO: Clean up diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py index 53aee07..bb37e35 100644 --- a/tests/interop/test_pubsub.py +++ b/tests/interop/test_pubsub.py @@ -1,13 +1,12 @@ import asyncio import functools +from p2pclient.pb import p2pd_pb2 import pytest from libp2p.peer.id import ID -from libp2p.utils import read_varint_prefixed_bytes from libp2p.pubsub.pb import rpc_pb2 - -from p2pclient.pb import p2pd_pb2 +from libp2p.utils import read_varint_prefixed_bytes from .utils import connect @@ -57,13 +56,14 @@ def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) -> assert msg.data == data and msg.from_id == from_peer_id +@pytest.mark.parametrize("is_gossipsub", (True, False)) @pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) @pytest.mark.asyncio -async def test_pubsub(pubsubs_gsub, p2pds): +async def test_pubsub(pubsubs, p2pds): # # Test: Recognize pubsub peers on connection. # - py_pubsub = pubsubs_gsub[0] + py_pubsub = pubsubs[0] # go0 <-> py <-> go1 await connect(p2pds[0], py_pubsub.host) await connect(py_pubsub.host, p2pds[1]) diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index e69de29..246ca15 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -0,0 +1,43 @@ +import pytest + +from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory + +from .configs import GOSSIPSUB_PARAMS + + +def _make_pubsubs(hosts, pubsub_routers, cache_size): + if len(pubsub_routers) != len(hosts): + raise ValueError( + f"lenght of pubsub_routers={pubsub_routers} should be equaled to the " + f"length of hosts={len(hosts)}" + ) + return tuple( + PubsubFactory(host=host, router=router, cache_size=cache_size) + for host, router in zip(hosts, pubsub_routers) + ) + + +@pytest.fixture +def pubsub_cache_size(): + return None # default + + +@pytest.fixture +def gossipsub_params(): + return GOSSIPSUB_PARAMS + + +@pytest.fixture +def pubsubs_fsub(num_hosts, hosts, pubsub_cache_size): + floodsubs = FloodsubFactory.create_batch(num_hosts) + _pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size) + yield _pubsubs_fsub + # TODO: Clean up + + +@pytest.fixture +def pubsubs_gsub(num_hosts, hosts, pubsub_cache_size, gossipsub_params): + gossipsubs = GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict()) + _pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size) + yield _pubsubs_gsub + # TODO: Clean up diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index e091f66..a0a8a28 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -14,7 +14,8 @@ from .utils import dense_connect, one_to_all_connect ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),), ) @pytest.mark.asyncio -async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub): +async def test_join(num_hosts, hosts, pubsubs_gsub): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) hosts_indices = list(range(num_hosts)) topic = "test_join" @@ -85,7 +86,9 @@ async def test_leave(pubsubs_gsub): @pytest.mark.parametrize("num_hosts", (2,)) @pytest.mark.asyncio -async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch): +async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch): + gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub) + index_alice = 0 id_alice = hosts[index_alice].get_id() index_bob = 1