diff --git a/tests/network/test_net_stream.py b/tests/network/test_net_stream.py index d0fea93..2c2772a 100644 --- a/tests/network/test_net_stream.py +++ b/tests/network/test_net_stream.py @@ -1,4 +1,4 @@ -import asyncio +import trio import pytest @@ -8,7 +8,7 @@ from libp2p.tools.constants import MAX_READ_LEN DATA = b"data_123" -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_write(net_stream_pair): stream_0, stream_1 = net_stream_pair assert ( @@ -19,7 +19,7 @@ async def test_net_stream_read_write(net_stream_pair): assert (await stream_1.read(MAX_READ_LEN)) == DATA -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_until_eof(net_stream_pair): read_bytes = bytearray() stream_0, stream_1 = net_stream_pair @@ -27,41 +27,39 @@ async def test_net_stream_read_until_eof(net_stream_pair): async def read_until_eof(): read_bytes.extend(await stream_1.read()) - task = asyncio.ensure_future(read_until_eof()) + async with trio.open_nursery() as nursery: + nursery.start_soon(read_until_eof) + expected_data = bytearray() - expected_data = bytearray() + # Test: `read` doesn't return before `close` is called. + await stream_0.write(DATA) + expected_data.extend(DATA) + await trio.sleep(0.01) + assert len(read_bytes) == 0 + # Test: `read` doesn't return before `close` is called. + await stream_0.write(DATA) + expected_data.extend(DATA) + await trio.sleep(0.01) + assert len(read_bytes) == 0 - # Test: `read` doesn't return before `close` is called. - await stream_0.write(DATA) - expected_data.extend(DATA) - await asyncio.sleep(0.01) - assert len(read_bytes) == 0 - # Test: `read` doesn't return before `close` is called. - await stream_0.write(DATA) - expected_data.extend(DATA) - await asyncio.sleep(0.01) - assert len(read_bytes) == 0 - - # Test: Close the stream, `read` returns, and receive previous sent data. - await stream_0.close() - await asyncio.sleep(0.01) - assert read_bytes == expected_data - - task.cancel() + # Test: Close the stream, `read` returns, and receive previous sent data. + await stream_0.close() + await trio.sleep(0.01) + assert read_bytes == expected_data -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_remote_closed(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_0.write(DATA) await stream_0.close() - await asyncio.sleep(0.01) + await trio.sleep(0.01) assert (await stream_1.read(MAX_READ_LEN)) == DATA with pytest.raises(StreamEOF): await stream_1.read(MAX_READ_LEN) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_local_reset(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_0.reset() @@ -69,29 +67,29 @@ async def test_net_stream_read_after_local_reset(net_stream_pair): await stream_0.read(MAX_READ_LEN) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_remote_reset(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_0.write(DATA) await stream_0.reset() # Sleep to let `stream_1` receive the message. - await asyncio.sleep(0.01) + await trio.sleep(0.01) with pytest.raises(StreamReset): await stream_1.read(MAX_READ_LEN) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_read_after_remote_closed_and_reset(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_0.write(DATA) await stream_0.close() await stream_0.reset() # Sleep to let `stream_1` receive the message. - await asyncio.sleep(0.01) + await trio.sleep(0.01) assert (await stream_1.read(MAX_READ_LEN)) == DATA -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_write_after_local_closed(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_0.write(DATA) @@ -100,7 +98,7 @@ async def test_net_stream_write_after_local_closed(net_stream_pair): await stream_0.write(DATA) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_write_after_local_reset(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_0.reset() @@ -108,10 +106,10 @@ async def test_net_stream_write_after_local_reset(net_stream_pair): await stream_0.write(DATA) -@pytest.mark.asyncio +@pytest.mark.trio async def test_net_stream_write_after_remote_reset(net_stream_pair): stream_0, stream_1 = net_stream_pair await stream_1.reset() - await asyncio.sleep(0.01) + await trio.sleep(0.01) with pytest.raises(StreamClosed): await stream_0.write(DATA) diff --git a/tests/network/test_notify.py b/tests/network/test_notify.py index f8187b1..1d9982e 100644 --- a/tests/network/test_notify.py +++ b/tests/network/test_notify.py @@ -8,12 +8,13 @@ into network after network has already started listening TODO: Add tests for closed_stream, listen_close when those features are implemented in swarm """ - -import asyncio +import trio import enum import pytest +from async_service import background_trio_service + from libp2p.network.notifee_interface import INotifee from libp2p.tools.constants import LISTEN_MADDR from libp2p.tools.factories import SwarmFactory @@ -54,59 +55,63 @@ class MyNotifee(INotifee): pass -@pytest.mark.asyncio +@pytest.mark.trio async def test_notify(is_host_secure): swarms = [SwarmFactory(is_secure=is_host_secure) for _ in range(2)] events_0_0 = [] events_1_0 = [] events_0_without_listen = [] - swarms[0].register_notifee(MyNotifee(events_0_0)) - swarms[1].register_notifee(MyNotifee(events_1_0)) - # Listen - await asyncio.gather(*[swarm.listen(LISTEN_MADDR) for swarm in swarms]) + # Run swarms. + async with background_trio_service(swarms[0]), background_trio_service(swarms[1]): + # Register events before listening, to allow `MyNotifee` is notified with the event + # `listen`. + swarms[0].register_notifee(MyNotifee(events_0_0)) + swarms[1].register_notifee(MyNotifee(events_1_0)) - swarms[0].register_notifee(MyNotifee(events_0_without_listen)) + # Listen + async with trio.open_nursery() as nursery: + nursery.start_soon(swarms[0].listen, LISTEN_MADDR) + nursery.start_soon(swarms[1].listen, LISTEN_MADDR) - # Connected - await connect_swarm(swarms[0], swarms[1]) - # OpenedStream: first - await swarms[0].new_stream(swarms[1].get_peer_id()) - # OpenedStream: second - await swarms[0].new_stream(swarms[1].get_peer_id()) - # OpenedStream: third, but different direction. - await swarms[1].new_stream(swarms[0].get_peer_id()) + swarms[0].register_notifee(MyNotifee(events_0_without_listen)) - await asyncio.sleep(0.01) + # Connected + await connect_swarm(swarms[0], swarms[1]) + # OpenedStream: first + await swarms[0].new_stream(swarms[1].get_peer_id()) + # OpenedStream: second + await swarms[0].new_stream(swarms[1].get_peer_id()) + # OpenedStream: third, but different direction. + await swarms[1].new_stream(swarms[0].get_peer_id()) - # TODO: Check `ClosedStream` and `ListenClose` events after they are ready. + await trio.sleep(0.01) - # Disconnected - await swarms[0].close_peer(swarms[1].get_peer_id()) - await asyncio.sleep(0.01) + # TODO: Check `ClosedStream` and `ListenClose` events after they are ready. - # Connected again, but different direction. - await connect_swarm(swarms[1], swarms[0]) - await asyncio.sleep(0.01) + # Disconnected + await swarms[0].close_peer(swarms[1].get_peer_id()) + await trio.sleep(0.01) - # Disconnected again, but different direction. - await swarms[1].close_peer(swarms[0].get_peer_id()) - await asyncio.sleep(0.01) + # Connected again, but different direction. + await connect_swarm(swarms[1], swarms[0]) + await trio.sleep(0.01) - expected_events_without_listen = [ - Event.Connected, - Event.OpenedStream, - Event.OpenedStream, - Event.OpenedStream, - Event.Disconnected, - Event.Connected, - Event.Disconnected, - ] - expected_events = [Event.Listen] + expected_events_without_listen + # Disconnected again, but different direction. + await swarms[1].close_peer(swarms[0].get_peer_id()) + await trio.sleep(0.01) - assert events_0_0 == expected_events - assert events_1_0 == expected_events - assert events_0_without_listen == expected_events_without_listen + expected_events_without_listen = [ + Event.Connected, + Event.OpenedStream, + Event.OpenedStream, + Event.OpenedStream, + Event.Disconnected, + Event.Connected, + Event.Disconnected, + ] + expected_events = [Event.Listen] + expected_events_without_listen - # Clean up - await asyncio.gather(*[swarm.close() for swarm in swarms]) + assert events_0_0 == expected_events + assert events_1_0 == expected_events + assert events_0_without_listen == expected_events_without_listen diff --git a/tests/network/test_swarm_conn.py b/tests/network/test_swarm_conn.py index 2abc7d0..0b93808 100644 --- a/tests/network/test_swarm_conn.py +++ b/tests/network/test_swarm_conn.py @@ -1,9 +1,9 @@ -import asyncio +import trio import pytest -@pytest.mark.asyncio +@pytest.mark.trio async def test_swarm_conn_close(swarm_conn_pair): conn_0, conn_1 = swarm_conn_pair @@ -12,7 +12,7 @@ async def test_swarm_conn_close(swarm_conn_pair): await conn_0.close() - await asyncio.sleep(0.01) + await trio.sleep(0.01) assert conn_0.event_closed.is_set() assert conn_1.event_closed.is_set() @@ -20,7 +20,7 @@ async def test_swarm_conn_close(swarm_conn_pair): assert conn_1 not in conn_1.swarm.connections.values() -@pytest.mark.asyncio +@pytest.mark.trio async def test_swarm_conn_streams(swarm_conn_pair): conn_0, conn_1 = swarm_conn_pair @@ -28,12 +28,12 @@ async def test_swarm_conn_streams(swarm_conn_pair): assert len(await conn_1.get_streams()) == 0 stream_0_0 = await conn_0.new_stream() - await asyncio.sleep(0.01) + await trio.sleep(0.01) assert len(await conn_0.get_streams()) == 1 assert len(await conn_1.get_streams()) == 1 stream_0_1 = await conn_0.new_stream() - await asyncio.sleep(0.01) + await trio.sleep(0.01) assert len(await conn_0.get_streams()) == 2 assert len(await conn_1.get_streams()) == 2