diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index 233b7cf..b17f867 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -96,7 +96,8 @@ class MessageCache: last_entries: List[CacheEntry] = self.history[len(self.history) - 1] for entry in last_entries: - del self.msgs[entry.mid] + if entry.mid in self.msgs: + del self.msgs[entry.mid] i: int = len(self.history) - 2 diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 2228360..1a43c7c 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -297,7 +297,8 @@ class Mplex(IMuxedConn): # the entry of this stream, to avoid others from accessing it. if is_local_closed: async with self.streams_lock: - del self.streams[stream_id] + if stream_id in self.streams: + del self.streams[stream_id] async def _handle_reset(self, stream_id: StreamID) -> None: async with self.streams_lock: @@ -315,7 +316,8 @@ class Mplex(IMuxedConn): if not stream.event_local_closed.is_set(): stream.event_local_closed.set() async with self.streams_lock: - del self.streams[stream_id] + if stream_id in self.streams: + del self.streams[stream_id] async def _cleanup(self) -> None: if not self.event_shutting_down.is_set(): diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 3f283c8..f080d3c 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -180,7 +180,8 @@ class MplexStream(IMuxedStream): if _is_remote_closed: # Both sides are closed, we can safely remove the buffer from the dict. async with self.muxed_conn.streams_lock: - del self.muxed_conn.streams[self.stream_id] + if self.stream_id in self.muxed_conn.streams: + del self.muxed_conn.streams[self.stream_id] async def reset(self) -> None: """closes both ends of the stream tells this remote side to hang up."""