diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 5668405..3c7898a 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -178,107 +178,101 @@ class Mplex(IMuxedConn): """ Read a message off of the secured connection and add it to the corresponding message buffer """ - # TODO Deal with other types of messages using flag (currently _) while True: try: channel_id, flag, message = await self._wait_until_shutting_down_or_closed( self.read_message() ) - except ( - MplexUnavailable, - ConnectionResetError, - IncompleteReadError, - ) as error: + except MplexUnavailable as error: print(f"!@# handle_incoming: read_message: exception={error}") break - if channel_id is not None and flag is not None and message is not None: - stream_id = StreamID(channel_id=channel_id, is_initiator=bool(flag & 1)) - is_stream_id_seen: bool - stream: MplexStream - async with self.streams_lock: - is_stream_id_seen = stream_id in self.streams - if is_stream_id_seen: - stream = self.streams[stream_id] - # Other consequent stream message should wait until the stream get accepted - # TODO: Handle more tags, and refactor `HeaderTags` - if flag == HeaderTags.NewStream.value: - if is_stream_id_seen: - # `NewStream` for the same id is received twice... - # TODO: Shutdown - pass - mplex_stream = await self._initialize_stream( - stream_id, message.decode() + stream_id = StreamID(channel_id=channel_id, is_initiator=bool(flag & 1)) + is_stream_id_seen: bool + stream: MplexStream + async with self.streams_lock: + is_stream_id_seen = stream_id in self.streams + if is_stream_id_seen: + stream = self.streams[stream_id] + if flag == HeaderTags.NewStream.value: + if is_stream_id_seen: + # `NewStream` for the same id is received twice... + # TODO: Shutdown + pass + mplex_stream = await self._initialize_stream( + stream_id, message.decode() + ) + try: + await self._wait_until_shutting_down_or_closed( + self.new_stream_queue.put(mplex_stream) ) - try: - await self._wait_until_shutting_down_or_closed( - self.new_stream_queue.put(mplex_stream) - ) - except MplexUnavailable: - break - elif flag in ( - HeaderTags.MessageInitiator.value, - HeaderTags.MessageReceiver.value, - ): - if not is_stream_id_seen: - # We receive a message of the stream `stream_id` which is not accepted - # before. It is abnormal. Possibly disconnect? - # TODO: Warn and emit logs about this. + except MplexUnavailable: + break + elif flag in ( + HeaderTags.MessageInitiator.value, + HeaderTags.MessageReceiver.value, + ): + if not is_stream_id_seen: + # We receive a message of the stream `stream_id` which is not accepted + # before. It is abnormal. Possibly disconnect? + # TODO: Warn and emit logs about this. + continue + async with stream.close_lock: + if stream.event_remote_closed.is_set(): + # TODO: Warn "Received data from remote after stream was closed by them. (len = %d)" # noqa: E501 continue - async with stream.close_lock: - if stream.event_remote_closed.is_set(): - # TODO: Warn "Received data from remote after stream was closed by them. (len = %d)" # noqa: E501 - continue - try: - await self._wait_until_shutting_down_or_closed( - stream.incoming_data.put(message) - ) - except MplexUnavailable: - break - elif flag in ( - HeaderTags.CloseInitiator.value, - HeaderTags.CloseReceiver.value, - ): - if not is_stream_id_seen: + try: + await self._wait_until_shutting_down_or_closed( + stream.incoming_data.put(message) + ) + except MplexUnavailable: + break + elif flag in ( + HeaderTags.CloseInitiator.value, + HeaderTags.CloseReceiver.value, + ): + if not is_stream_id_seen: + continue + # NOTE: If remote is already closed, then return: Technically a bug + # on the other side. We should consider killing the connection. + async with stream.close_lock: + if stream.event_remote_closed.is_set(): continue - # NOTE: If remote is already closed, then return: Technically a bug - # on the other side. We should consider killing the connection. - async with stream.close_lock: - if stream.event_remote_closed.is_set(): - continue - is_local_closed: bool - async with stream.close_lock: - stream.event_remote_closed.set() - is_local_closed = stream.event_local_closed.is_set() - # If local is also closed, both sides are closed. Then, we should clean up - # the entry of this stream, to avoid others from accessing it. - if is_local_closed: - async with self.streams_lock: - del self.streams[stream_id] - elif flag in ( - HeaderTags.ResetInitiator.value, - HeaderTags.ResetReceiver.value, - ): - if not is_stream_id_seen: - # This is *ok*. We forget the stream on reset. - continue - async with stream.close_lock: - if not stream.event_remote_closed.is_set(): - stream.event_reset.set() - - stream.event_remote_closed.set() - # If local is not closed, we should close it. - if not stream.event_local_closed.is_set(): - stream.event_local_closed.set() + is_local_closed: bool + async with stream.close_lock: + stream.event_remote_closed.set() + is_local_closed = stream.event_local_closed.is_set() + # If local is also closed, both sides are closed. Then, we should clean up + # the entry of this stream, to avoid others from accessing it. + if is_local_closed: async with self.streams_lock: del self.streams[stream_id] - else: - # TODO: logging - if is_stream_id_seen: - await stream.reset() + elif flag in ( + HeaderTags.ResetInitiator.value, + HeaderTags.ResetReceiver.value, + ): + if not is_stream_id_seen: + # This is *ok*. We forget the stream on reset. + continue + async with stream.close_lock: + if not stream.event_remote_closed.is_set(): + stream.event_reset.set() + + stream.event_remote_closed.set() + # If local is not closed, we should close it. + if not stream.event_local_closed.is_set(): + stream.event_local_closed.set() + async with self.streams_lock: + del self.streams[stream_id] + else: + # TODO: logging + if is_stream_id_seen: + await stream.reset() # Force context switch await asyncio.sleep(0) + # If we enter here, it means this connection is shutting down. + # We should clean the things up. await self._cleanup() async def read_message(self) -> Tuple[int, int, bytes]: @@ -290,15 +284,19 @@ class Mplex(IMuxedConn): # FIXME: No timeout is used in Go implementation. # Timeout is set to a relatively small value to alleviate wait time to exit # loop in handle_incoming - header = await decode_uvarint_from_stream(self.secured_conn) - # TODO: Handle the case of EOF and other exceptions? try: + header = await decode_uvarint_from_stream(self.secured_conn) message = await asyncio.wait_for( read_varint_prefixed_bytes(self.secured_conn), timeout=5 ) - except asyncio.TimeoutError: - # TODO: Investigate what we should do if time is out. - return None, None, None + except (ConnectionResetError, IncompleteReadError) as error: + raise MplexUnavailable( + "failed to read messages correctly from the underlying connection" + ) from error + except asyncio.TimeoutError as error: + raise MplexUnavailable( + "failed to read more message body within the timeout" + ) from error flag = header & 0x07 channel_id = header >> 3