Fix stream read

This commit is contained in:
mhchia 2019-09-09 22:48:49 +08:00
parent df312f3e57
commit e5eb01d22b
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A

View File

@ -1,5 +1,5 @@
import asyncio import asyncio
from typing import TYPE_CHECKING, cast from typing import TYPE_CHECKING
from libp2p.stream_muxer.abc import IMuxedStream from libp2p.stream_muxer.abc import IMuxedStream
@ -8,7 +8,6 @@ from .datastructures import StreamID
from .exceptions import MplexStreamClosed, MplexStreamEOF, MplexStreamReset from .exceptions import MplexStreamClosed, MplexStreamEOF, MplexStreamReset
if TYPE_CHECKING: if TYPE_CHECKING:
from typing import Any # noqa: F401
from libp2p.stream_muxer.mplex.mplex import Mplex from libp2p.stream_muxer.mplex.mplex import Mplex
@ -66,16 +65,33 @@ class MplexStream(IMuxedStream):
) )
for fut in pending: for fut in pending:
fut.cancel() fut.cancel()
if self.event_reset.is_set(): if self.event_reset.is_set():
raise MplexStreamReset raise MplexStreamReset
done_task = cast("asyncio.Task[Any]", tuple(done)[0])
# TODO: `_coro` is not in `asyncio.Task`'s typeshed. if len(done) != 1:
if done_task._coro.__qualname__ == "Queue.get": # type: ignore raise Exception(f"Should be exactly 1 job in {done}.")
done_task = tuple(done)[0]
# NOTE: Ignore type check because the typeshed for `asyncio.Task` does not
# have the field `_coro`.
coro_qualname = done_task._coro.__qualname__ # type: ignore
# If `qualname == "Queue.get"` then there is incoming data. We can add it to the buffer.
if coro_qualname == "Queue.get":
data = done_task.result() data = done_task.result()
self._buf.extend(data) self._buf.extend(data)
return return
if self.event_remote_closed.is_set(): if self.event_remote_closed.is_set():
raise MplexStreamEOF raise MplexStreamEOF
# If the task is not `Queue.get`, then it must be `Event.wait`.
# However, it is abnormal that `Event.wait` is unblocked without any of the event
# (remote_closed and reset) is set. Then it is highly possible that the task
# is cancelled.
raise Exception(
"Should not enter here. "
f"It is highly possible that `done_task` is cancelled. `done_task`={done_task}"
)
# TODO: Handle timeout when deadline is used. # TODO: Handle timeout when deadline is used.
async def _read_until_eof(self) -> bytes: async def _read_until_eof(self) -> bytes:
@ -107,7 +123,7 @@ class MplexStream(IMuxedStream):
return await self._read_until_eof() return await self._read_until_eof()
if len(self._buf) == 0 and self.incoming_data.empty(): if len(self._buf) == 0 and self.incoming_data.empty():
await self._wait_for_data() await self._wait_for_data()
# Either `buf` is not empty or `incoming_data` is not empty now. # Now we are sure we have something to read.
# Try to put enough incoming data into `self._buf`. # Try to put enough incoming data into `self._buf`.
while len(self._buf) < n: while len(self._buf) < n:
try: try: