py-libp2p/tests/interop/conftest.py

152 lines
4.6 KiB
Python

import asyncio
import sys
from typing import Union
from p2pclient.datastructures import StreamInfo
import pexpect
import pytest
from libp2p.io.abc import ReadWriteCloser
from tests.factories import FloodsubFactory, GossipsubFactory, PubsubFactory
from tests.pubsub.configs import GOSSIPSUB_PARAMS
from .daemon import Daemon, make_p2pd
from .utils import connect
@pytest.fixture
def proc_factory():
procs = []
def call_proc(cmd, args, logfile=None, encoding=None):
if logfile is None:
logfile = sys.stdout
if encoding is None:
encoding = "utf-8"
proc = pexpect.spawn(cmd, args, logfile=logfile, encoding=encoding)
procs.append(proc)
return proc
try:
yield call_proc
finally:
for proc in procs:
proc.close()
@pytest.fixture
def num_p2pds():
return 1
@pytest.fixture
def is_gossipsub():
return True
@pytest.fixture
async def p2pds(num_p2pds, is_host_secure, is_gossipsub, unused_tcp_port_factory):
p2pds: Union[Daemon, Exception] = await asyncio.gather(
*[
make_p2pd(
unused_tcp_port_factory(),
unused_tcp_port_factory(),
is_host_secure,
is_gossipsub=is_gossipsub,
)
for _ in range(num_p2pds)
],
return_exceptions=True,
)
p2pds_succeeded = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Daemon))
if len(p2pds_succeeded) != len(p2pds):
# Not all succeeded. Close the succeeded ones and print the failed ones(exceptions).
await asyncio.gather(*[p2pd.close() for p2pd in p2pds_succeeded])
exceptions = tuple(p2pd for p2pd in p2pds if isinstance(p2pd, Exception))
raise Exception(f"not all p2pds succeed: first exception={exceptions[0]}")
try:
yield p2pds
finally:
await asyncio.gather(*[p2pd.close() for p2pd in p2pds])
@pytest.fixture
def pubsubs(num_hosts, hosts, is_gossipsub):
if is_gossipsub:
routers = GossipsubFactory.create_batch(num_hosts, **GOSSIPSUB_PARAMS._asdict())
else:
routers = FloodsubFactory.create_batch(num_hosts)
_pubsubs = tuple(
PubsubFactory(host=host, router=router) for host, router in zip(hosts, routers)
)
yield _pubsubs
# TODO: Clean up
class DaemonStream(ReadWriteCloser):
stream_info: StreamInfo
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
def __init__(
self,
stream_info: StreamInfo,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
self.stream_info = stream_info
self.reader = reader
self.writer = writer
async def close(self) -> None:
self.writer.close()
await self.writer.wait_closed()
async def read(self, n: int = -1) -> bytes:
return await self.reader.read(n)
async def write(self, data: bytes) -> int:
return self.writer.write(data)
@pytest.fixture
async def is_to_fail_daemon_stream():
return False
@pytest.fixture
async def py_to_daemon_stream_pair(hosts, p2pds, is_to_fail_daemon_stream):
assert len(hosts) >= 1
assert len(p2pds) >= 1
host = hosts[0]
p2pd = p2pds[0]
protocol_id = "/protocol/id/123"
stream_py = None
stream_daemon = None
event_stream_handled = asyncio.Event()
await connect(host, p2pd)
async def daemon_stream_handler(stream_info, reader, writer):
nonlocal stream_daemon
stream_daemon = DaemonStream(stream_info, reader, writer)
event_stream_handled.set()
await p2pd.control.stream_handler(protocol_id, daemon_stream_handler)
if is_to_fail_daemon_stream:
# FIXME: This is a workaround to make daemon reset the stream.
# We intentionally close the listener on the python side, it makes the connection from
# daemon to us fail, and therefore the daemon resets the opened stream on their side.
# Reference: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/stream.go#L47-L50 # noqa: E501
# We need it because we want to test against `stream_py` after the remote side(daemon)
# is reset. This should be removed after the API `stream.reset` is exposed in daemon
# some day.
listener = p2pds[0].control.control.listener
listener.close()
await listener.wait_closed()
stream_py = await host.new_stream(p2pd.peer_id, [protocol_id])
if not is_to_fail_daemon_stream:
await event_stream_handled.wait()
# NOTE: If `is_to_fail_daemon_stream == True`, `stream_daemon == None`.
yield stream_py, stream_daemon