py-libp2p/libp2p/io/trio.py

56 lines
2.0 KiB
Python
Raw Normal View History

import logging
2019-11-19 14:01:12 +08:00
import trio
2019-11-19 14:01:12 +08:00
from libp2p.io.abc import ReadWriteCloser
from libp2p.io.exceptions import IOException
logger = logging.getLogger("libp2p.io.trio")
2019-12-06 17:06:37 +08:00
class TrioTCPStream(ReadWriteCloser):
stream: trio.SocketStream
# NOTE: Add both read and write lock to avoid `trio.BusyResourceError`
read_lock: trio.Lock
write_lock: trio.Lock
2019-11-19 14:01:12 +08:00
2019-12-06 17:06:37 +08:00
def __init__(self, stream: trio.SocketStream) -> None:
2019-11-19 14:01:12 +08:00
self.stream = stream
2019-12-06 17:06:37 +08:00
self.read_lock = trio.Lock()
self.write_lock = trio.Lock()
2019-11-19 14:01:12 +08:00
async def write(self, data: bytes) -> None:
"""Raise `RawConnError` if the underlying connection breaks."""
2019-12-06 17:06:37 +08:00
async with self.write_lock:
try:
await self.stream.send_all(data)
except (trio.ClosedResourceError, trio.BrokenResourceError) as error:
raise IOException from error
except trio.BusyResourceError as error:
# This should never happen, since we already access streams with read/write locks.
raise Exception(
"this should never happen "
"since we already access streams with read/write locks."
) from error
2019-11-19 14:01:12 +08:00
async def read(self, n: int = -1) -> bytes:
2019-12-06 17:06:37 +08:00
async with self.read_lock:
if n == 0:
# Checkpoint
await trio.hazmat.checkpoint()
return b""
max_bytes = n if n != -1 else None
try:
return await self.stream.receive_some(max_bytes)
except (trio.ClosedResourceError, trio.BrokenResourceError) as error:
raise IOException from error
except trio.BusyResourceError as error:
# This should never happen, since we already access streams with read/write locks.
raise Exception(
"this should never happen "
"since we already access streams with read/write locks."
) from error
2019-11-19 14:01:12 +08:00
async def close(self) -> None:
await self.stream.aclose()