Add is_gossipsub fixture in interop test

To use the same code to test against both routers: floodsub and
gossipsub.
This commit is contained in:
mhchia 2019-09-03 16:49:00 +08:00
parent 33dae87c35
commit 7385a7a677
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
7 changed files with 86 additions and 60 deletions

View File

@ -4,6 +4,7 @@ from typing import Callable, Dict, List, Sequence
from multiaddr import Multiaddr from multiaddr import Multiaddr
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerstore import PeerStoreError
from libp2p.peer.peerstore_interface import IPeerStore from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_client import MultiselectClient
@ -15,7 +16,6 @@ from libp2p.transport.listener_interface import IListener
from libp2p.transport.transport_interface import ITransport from libp2p.transport.transport_interface import ITransport
from libp2p.transport.upgrader import TransportUpgrader from libp2p.transport.upgrader import TransportUpgrader
from libp2p.typing import StreamHandlerFn, TProtocol from libp2p.typing import StreamHandlerFn, TProtocol
from libp2p.peer.peerstore import PeerStoreError
from .connection.raw_connection import RawConnection from .connection.raw_connection import RawConnection
from .exceptions import SwarmException from .exceptions import SwarmException

View File

@ -2,6 +2,7 @@ from typing import Iterable, List, Sequence
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub import Pubsub from .pubsub import Pubsub
@ -78,7 +79,7 @@ class FloodSub(IPubsubRouter):
stream = self.pubsub.peers[peer_id] stream = self.pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages. # FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
await stream.write(rpc_msg.SerializeToString()) await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
async def join(self, topic: str) -> None: async def join(self, topic: str) -> None:
""" """

View File

@ -3,8 +3,7 @@ import asyncio
import pytest import pytest
from .configs import LISTEN_MADDR from .configs import LISTEN_MADDR
from .factories import FloodsubFactory, GossipsubFactory, HostFactory, PubsubFactory from .factories import HostFactory
from .pubsub.configs import GOSSIPSUB_PARAMS
@pytest.fixture @pytest.fixture
@ -32,50 +31,3 @@ async def hosts(num_hosts, is_host_secure):
await asyncio.gather( await asyncio.gather(
*[_host.close() for _host in _hosts], return_exceptions=True *[_host.close() for _host in _hosts], return_exceptions=True
) )
@pytest.fixture
def floodsubs(num_hosts):
return FloodsubFactory.create_batch(num_hosts)
@pytest.fixture
def gossipsub_params():
return GOSSIPSUB_PARAMS
@pytest.fixture
def gossipsubs(num_hosts, gossipsub_params):
yield GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict())
# TODO: Clean up
def _make_pubsubs(hosts, pubsub_routers, cache_size):
if len(pubsub_routers) != len(hosts):
raise ValueError(
f"lenght of pubsub_routers={pubsub_routers} should be equaled to the "
f"length of hosts={len(hosts)}"
)
return tuple(
PubsubFactory(host=host, router=router, cache_size=cache_size)
for host, router in zip(hosts, pubsub_routers)
)
@pytest.fixture
def pubsub_cache_size():
return None # default
@pytest.fixture
def pubsubs_fsub(hosts, floodsubs, pubsub_cache_size):
_pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size)
yield _pubsubs_fsub
# TODO: Clean up
@pytest.fixture
def pubsubs_gsub(hosts, gossipsubs, pubsub_cache_size):
_pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size)
yield _pubsubs_gsub
# TODO: Clean up

View File

@ -4,6 +4,9 @@ import sys
import pexpect import pexpect
import pytest import pytest
from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory
from tests.pubsub.configs import GOSSIPSUB_PARAMS
from .daemon import make_p2pd from .daemon import make_p2pd
@ -33,11 +36,35 @@ def num_p2pds():
@pytest.fixture @pytest.fixture
async def p2pds(num_p2pds, is_host_secure, unused_tcp_port_factory): def is_gossipsub():
return True
@pytest.fixture
async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory):
p2pds = await asyncio.gather( p2pds = await asyncio.gather(
*[make_p2pd(unused_tcp_port_factory, is_host_secure) for _ in range(num_p2pds)] *[
make_p2pd(
unused_tcp_port_factory, is_host_secure, is_gossipsub=is_gossipsub
)
for _ in range(num_p2pds)
]
) )
try: try:
yield p2pds yield p2pds
finally: finally:
await asyncio.gather(*[p2pd.close() for p2pd in p2pds]) await asyncio.gather(*[p2pd.close() for p2pd in p2pds])
@pytest.fixture
def pubsubs(num_hosts, hosts, is_gossipsub):
routers = None
if is_gossipsub:
routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict())
else:
routers = FloodsubFactory.create_batch(num_hosts)
_pubsubs = tuple(
PubsubFactory(host=host, router=router) for host, router in zip(hosts, routers)
)
yield _pubsubs
# TODO: Clean up

View File

@ -1,13 +1,12 @@
import asyncio import asyncio
import functools import functools
from p2pclient.pb import p2pd_pb2
import pytest import pytest
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.utils import read_varint_prefixed_bytes
from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pb import rpc_pb2
from libp2p.utils import read_varint_prefixed_bytes
from p2pclient.pb import p2pd_pb2
from .utils import connect from .utils import connect
@ -57,13 +56,14 @@ def validate_pubsub_msg(msg: rpc_pb2.Message, data: bytes, from_peer_id: ID) ->
assert msg.data == data and msg.from_id == from_peer_id assert msg.data == data and msg.from_id == from_peer_id
@pytest.mark.parametrize("is_gossipsub", (True, False))
@pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),)) @pytest.mark.parametrize("num_hosts, num_p2pds", ((1, 2),))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pubsub(pubsubs_gsub, p2pds): async def test_pubsub(pubsubs, p2pds):
# #
# Test: Recognize pubsub peers on connection. # Test: Recognize pubsub peers on connection.
# #
py_pubsub = pubsubs_gsub[0] py_pubsub = pubsubs[0]
# go0 <-> py <-> go1 # go0 <-> py <-> go1
await connect(p2pds[0], py_pubsub.host) await connect(p2pds[0], py_pubsub.host)
await connect(py_pubsub.host, p2pds[1]) await connect(py_pubsub.host, p2pds[1])

View File

@ -0,0 +1,43 @@
import pytest
from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory
from .configs import GOSSIPSUB_PARAMS
def _make_pubsubs(hosts, pubsub_routers, cache_size):
if len(pubsub_routers) != len(hosts):
raise ValueError(
f"lenght of pubsub_routers={pubsub_routers} should be equaled to the "
f"length of hosts={len(hosts)}"
)
return tuple(
PubsubFactory(host=host, router=router, cache_size=cache_size)
for host, router in zip(hosts, pubsub_routers)
)
@pytest.fixture
def pubsub_cache_size():
return None # default
@pytest.fixture
def gossipsub_params():
return GOSSIPSUB_PARAMS
@pytest.fixture
def pubsubs_fsub(num_hosts, hosts, pubsub_cache_size):
floodsubs = FloodsubFactory.create_batch(num_hosts)
_pubsubs_fsub = _make_pubsubs(hosts, floodsubs, pubsub_cache_size)
yield _pubsubs_fsub
# TODO: Clean up
@pytest.fixture
def pubsubs_gsub(num_hosts, hosts, pubsub_cache_size, gossipsub_params):
gossipsubs = GossipsubFactory.create_batch(num_hosts, **gossipsub_params._asdict())
_pubsubs_gsub = _make_pubsubs(hosts, gossipsubs, pubsub_cache_size)
yield _pubsubs_gsub
# TODO: Clean up

View File

@ -14,7 +14,8 @@ from .utils import dense_connect, one_to_all_connect
((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),), ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),),
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub): async def test_join(num_hosts, hosts, pubsubs_gsub):
gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub)
hosts_indices = list(range(num_hosts)) hosts_indices = list(range(num_hosts))
topic = "test_join" topic = "test_join"
@ -85,7 +86,9 @@ async def test_leave(pubsubs_gsub):
@pytest.mark.parametrize("num_hosts", (2,)) @pytest.mark.parametrize("num_hosts", (2,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_graft(pubsubs_gsub, hosts, gossipsubs, event_loop, monkeypatch): async def test_handle_graft(pubsubs_gsub, hosts, event_loop, monkeypatch):
gossipsubs = tuple(pubsub.router for pubsub in pubsubs_gsub)
index_alice = 0 index_alice = 0
id_alice = hosts[index_alice].get_id() id_alice = hosts[index_alice].get_id()
index_bob = 1 index_bob = 1