py-libp2p/tests/transport/test_tcp.py
aratz-lasa b53675e288 Changed 'nursery' for 'task_status'
This way it is possible to implement a I/O agnostic Listener. It would  still have the 'task_status' default argument, but there would be no need for receiving it. And this way there is also no need for explicit nursery passing
2020-02-26 22:56:45 +01:00

54 lines
1.6 KiB
Python

from multiaddr import Multiaddr
import pytest
import trio
from libp2p.network.connection.raw_connection import RawConnection
from libp2p.tools.constants import LISTEN_MADDR
from libp2p.transport.exceptions import OpenConnectionError
from libp2p.transport.tcp.tcp import TCP
@pytest.mark.trio
async def test_tcp_listener(nursery):
transport = TCP()
async def handler(tcp_stream):
pass
listener = transport.create_listener(handler)
assert len(listener.get_addrs()) == 0
await nursery.start(listener.listen, LISTEN_MADDR)
assert len(listener.get_addrs()) == 1
await nursery.start(listener.listen, LISTEN_MADDR)
assert len(listener.get_addrs()) == 2
@pytest.mark.trio
async def test_tcp_dial(nursery):
transport = TCP()
raw_conn_other_side = None
event = trio.Event()
async def handler(tcp_stream):
nonlocal raw_conn_other_side
raw_conn_other_side = RawConnection(tcp_stream, False)
event.set()
await trio.sleep_forever()
# Test: `OpenConnectionError` is raised when trying to dial to a port which
# no one is not listening to.
with pytest.raises(OpenConnectionError):
await transport.dial(Multiaddr("/ip4/127.0.0.1/tcp/1"))
listener = transport.create_listener(handler)
await nursery.start(listener.listen, LISTEN_MADDR)
addrs = listener.get_addrs()
assert len(addrs) == 1
listen_addr = addrs[0]
raw_conn = await transport.dial(listen_addr)
await event.wait()
data = b"123"
await raw_conn_other_side.write(data)
assert (await raw_conn.read(len(data))) == data