diff --git a/libp2p/utils.py b/libp2p/utils.py index 3d0794a..bce9e58 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -6,6 +6,9 @@ from libp2p.io.abc import Reader from .io.utils import read_exactly +from typing import Generic, TypeVar +import trio + # Unsigned LEB128(varint codec) # Reference: https://github.com/ethereum/py-wasm/blob/master/wasm/parsers/leb128.py @@ -95,3 +98,25 @@ async def read_fixedint_prefixed(reader: Reader) -> bytes: len_bytes = await reader.read(SIZE_LEN_BYTES) len_int = int.from_bytes(len_bytes, "big") return await reader.read(len_int) + + +TItem = TypeVar("TItem") + + +class IQueue(Generic[TItem]): + async def put(self, item: TItem): + ... + + async def get(self) -> TItem: + ... + + +class TrioQueue(IQueue): + def __init__(self): + self.send_channel, self.receive_channel = trio.open_memory_channel(0) + + async def put(self, item: TItem): + await self.send_channel.send(item) + + async def get(self) -> TItem: + return await self.receive_channel.receive() diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..a628437 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,19 @@ +import trio +import pytest +from libp2p.utils import TrioQueue + + +@pytest.mark.trio +async def test_trio_queue(): + queue = TrioQueue() + + async def queue_get(task_status=None): + result = await queue.get() + task_status.started(result) + + async with trio.open_nursery() as nursery: + nursery.start_soon(queue.put, 123) + result = await nursery.start(queue_get) + + assert result == 123 +