Rewrite _wait_for_data, to handle task precisely

Make the futures first, and then we can compare them with the return
value from `asyncio.wait`.
This commit is contained in:
mhchia 2019-09-10 23:38:45 +08:00
parent df87f5adb9
commit 31fb4e0b69
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A

View File

@ -55,43 +55,45 @@ class MplexStream(IMuxedStream):
return self.stream_id.is_initiator return self.stream_id.is_initiator
async def _wait_for_data(self) -> None: async def _wait_for_data(self) -> None:
task_event_reset = asyncio.ensure_future(self.event_reset.wait())
task_incoming_data_get = asyncio.ensure_future(self.incoming_data.get())
task_event_remote_closed = asyncio.ensure_future(
self.event_remote_closed.wait()
)
done, pending = await asyncio.wait( # type: ignore done, pending = await asyncio.wait( # type: ignore
[ [task_event_reset, task_incoming_data_get, task_event_remote_closed],
self.event_reset.wait(),
self.incoming_data.get(),
self.event_remote_closed.wait(),
],
return_when=asyncio.FIRST_COMPLETED, return_when=asyncio.FIRST_COMPLETED,
) )
for fut in pending: for fut in pending:
fut.cancel() fut.cancel()
if task_event_reset in done:
if self.event_reset.is_set(): if self.event_reset.is_set():
raise MplexStreamReset raise MplexStreamReset
else:
# However, it is abnormal that `Event.wait` is unblocked without any of the flag
# is set. The task is probably cancelled.
raise Exception(
"Should not enter here. "
f"It is probably because {task_event_remote_closed} is cancelled."
)
if len(done) != 1: if task_incoming_data_get in done:
raise Exception(f"Should be exactly 1 job in {done}.") data = task_incoming_data_get.result()
done_task = tuple(done)[0]
# NOTE: Ignore type check because the typeshed for `asyncio.Task` does not
# have the field `_coro`.
coro_qualname = done_task._coro.__qualname__ # type: ignore
# If `qualname == "Queue.get"` then there is incoming data. We can add it to the buffer.
if coro_qualname == "Queue.get":
data = done_task.result()
self._buf.extend(data) self._buf.extend(data)
return return
if task_event_remote_closed in done:
if self.event_remote_closed.is_set(): if self.event_remote_closed.is_set():
raise MplexStreamEOF raise MplexStreamEOF
else:
# If the task is not `Queue.get`, then it must be `Event.wait`. # However, it is abnormal that `Event.wait` is unblocked without any of the flag
# However, it is abnormal that `Event.wait` is unblocked without any of the event # is set. The task is probably cancelled.
# (remote_closed and reset) is set. Then it is highly possible that the task
# is cancelled.
raise Exception( raise Exception(
"Should not enter here. " "Should not enter here. "
f"It is highly possible that `done_task` is cancelled. `done_task`={done_task}" f"It is probably because {task_event_remote_closed} is cancelled."
) )
# TODO: Handle timeout when deadline is used. # TODO: Handle timeout when deadline is used.
async def _read_until_eof(self) -> bytes: async def _read_until_eof(self) -> bytes: