From 19907e18ec3c8e9edada23fa85446f06e8ce78c3 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Wed, 20 Nov 2019 23:06:37 +0800 Subject: [PATCH 1/3] Replace (check and) del pattern with pop method --- libp2p/network/swarm.py | 6 ++---- libp2p/pubsub/gossipsub.py | 12 +++++------- libp2p/pubsub/mcache.py | 3 +-- libp2p/pubsub/pubsub.py | 11 +++++------ libp2p/security/security_multistream.py | 3 +-- libp2p/stream_muxer/mplex/mplex.py | 6 ++---- libp2p/stream_muxer/mplex/mplex_stream.py | 10 +++------- libp2p/stream_muxer/muxer_multistream.py | 3 +-- 8 files changed, 20 insertions(+), 34 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 7bb40ce..18676b3 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -248,7 +248,7 @@ class Swarm(INetwork): # TODO: Should be changed to close multisple connections, # if we have several connections per peer in the future. connection = self.connections[peer_id] - # NOTE: `connection.close` will perform `del self.connections[peer_id]` + # NOTE: `connection.close` will delete `peer_id` from `self.connections` # and `notify_disconnected` for us. await connection.close() @@ -270,11 +270,9 @@ class Swarm(INetwork): """Simply remove the connection from Swarm's records, without closing the connection.""" peer_id = swarm_conn.muxed_conn.peer_id - if peer_id not in self.connections: - return # TODO: Should be changed to remove the exact connection, # if we have several connections per peer in the future. - del self.connections[peer_id] + self.connections.pop(peer_id, None) # Notifee diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 93faebd..e9d0f35 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -144,8 +144,7 @@ class GossipSub(IPubsubRouter): elif peer_id in self.peers_floodsub: self.peers_floodsub.remove(peer_id) - if peer_id in self.peers_to_protocol: - del self.peers_to_protocol[peer_id] + self.peers_to_protocol.pop(peer_id, None) async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: """ @@ -274,8 +273,7 @@ class GossipSub(IPubsubRouter): self.mesh[topic].append(peer) await self.emit_graft(topic, peer) - if topic_in_fanout: - del self.fanout[topic] + self.fanout.pop(topic, None) async def leave(self, topic: str) -> None: # Note: the comments here are the near-exact algorithm description from the spec @@ -294,7 +292,7 @@ class GossipSub(IPubsubRouter): await self.emit_prune(topic, peer) # Forget mesh[topic] - del self.mesh[topic] + self.mesh.pop(topic, None) # Heartbeat async def heartbeat(self) -> None: @@ -355,8 +353,8 @@ class GossipSub(IPubsubRouter): # TODO: there's no way time_since_last_publish gets set anywhere yet if self.time_since_last_publish[topic] > self.time_to_live: # Remove topic from fanout - del self.fanout[topic] - del self.time_since_last_publish[topic] + self.fanout.pop(topic, None) + self.time_since_last_publish.pop(topic, None) else: num_fanout_peers_in_topic = len(self.fanout[topic]) diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index b17f867..c848912 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -96,8 +96,7 @@ class MessageCache: last_entries: List[CacheEntry] = self.history[len(self.history) - 1] for entry in last_entries: - if entry.mid in self.msgs: - del self.msgs[entry.mid] + self.msgs.pop(entry.mid) i: int = len(self.history) - 2 diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3834eb4..ba1994e 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -232,8 +232,7 @@ class Pubsub: :param topic: the topic to remove validator from """ - if topic in self.topic_validators: - del self.topic_validators[topic] + self.topic_validators.pop(topic, None) def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: """ @@ -283,7 +282,7 @@ class Pubsub: await stream.write(encode_varint_prefixed(hello.SerializeToString())) except StreamClosed: logger.debug("Fail to add new peer %s: stream closed", peer_id) - del self.peers[peer_id] + self.peers.pop(peer_id, None) return # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. @@ -291,7 +290,7 @@ class Pubsub: self.router.add_peer(peer_id, stream.get_protocol()) except Exception as error: logger.debug("fail to add new peer %s, error %s", peer_id, error) - del self.peers[peer_id] + self.peers.pop(peer_id, None) return logger.debug("added new peer %s", peer_id) @@ -299,7 +298,7 @@ class Pubsub: def _handle_dead_peer(self, peer_id: ID) -> None: if peer_id not in self.peers: return - del self.peers[peer_id] + self.peers.pop(peer_id, None) for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: @@ -411,7 +410,7 @@ class Pubsub: if topic_id not in self.my_topics: return # Remove topic_id from map if present - del self.my_topics[topic_id] + self.my_topics.pop(topic_id, None) # Create unsubscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC() diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 52c957c..0507a52 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -50,8 +50,7 @@ class SecurityMultistream(ABC): :param transport: the corresponding transportation to the ``protocol``. """ # If protocol is already added before, remove it and add it again. - if protocol in self.transports: - del self.transports[protocol] + self.transports.pop(protocol, None) self.transports[protocol] = transport # Note: None is added as the handler for the given protocol since # we only care about selecting the protocol, not any handler function diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 1a43c7c..f70cae2 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -297,8 +297,7 @@ class Mplex(IMuxedConn): # the entry of this stream, to avoid others from accessing it. if is_local_closed: async with self.streams_lock: - if stream_id in self.streams: - del self.streams[stream_id] + self.streams.pop(stream_id, None) async def _handle_reset(self, stream_id: StreamID) -> None: async with self.streams_lock: @@ -316,8 +315,7 @@ class Mplex(IMuxedConn): if not stream.event_local_closed.is_set(): stream.event_local_closed.set() async with self.streams_lock: - if stream_id in self.streams: - del self.streams[stream_id] + self.streams.pop(stream_id, None) async def _cleanup(self) -> None: if not self.event_shutting_down.is_set(): diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index f080d3c..7630c96 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -180,8 +180,7 @@ class MplexStream(IMuxedStream): if _is_remote_closed: # Both sides are closed, we can safely remove the buffer from the dict. async with self.muxed_conn.streams_lock: - if self.stream_id in self.muxed_conn.streams: - del self.muxed_conn.streams[self.stream_id] + self.muxed_conn.streams.pop(self.stream_id, None) async def reset(self) -> None: """closes both ends of the stream tells this remote side to hang up.""" @@ -208,11 +207,8 @@ class MplexStream(IMuxedStream): self.event_remote_closed.set() async with self.muxed_conn.streams_lock: - if ( - self.muxed_conn.streams is not None - and self.stream_id in self.muxed_conn.streams - ): - del self.muxed_conn.streams[self.stream_id] + if self.muxed_conn.streams is not None: + self.muxed_conn.streams.pop(self.stream_id, None) # TODO deadline not in use def set_deadline(self, ttl: int) -> bool: diff --git a/libp2p/stream_muxer/muxer_multistream.py b/libp2p/stream_muxer/muxer_multistream.py index f82cd19..d83869f 100644 --- a/libp2p/stream_muxer/muxer_multistream.py +++ b/libp2p/stream_muxer/muxer_multistream.py @@ -44,8 +44,7 @@ class MuxerMultistream: :param transport: the corresponding transportation to the ``protocol``. """ # If protocol is already added before, remove it and add it again. - if protocol in self.transports: - del self.transports[protocol] + self.transports.pop(protocol, None) self.transports[protocol] = transport self.multiselect.add_handler(protocol, None) From 501eef59deabaa155622648b41c994861ace8439 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Thu, 21 Nov 2019 14:48:03 +0800 Subject: [PATCH 2/3] Apply PR feedback: Only use pop method if graceful failure handling is desired --- libp2p/network/swarm.py | 4 +++- libp2p/pubsub/gossipsub.py | 4 ++-- libp2p/pubsub/pubsub.py | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 18676b3..de80dd8 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -270,9 +270,11 @@ class Swarm(INetwork): """Simply remove the connection from Swarm's records, without closing the connection.""" peer_id = swarm_conn.muxed_conn.peer_id + if peer_id not in self.connections: + return # TODO: Should be changed to remove the exact connection, # if we have several connections per peer in the future. - self.connections.pop(peer_id, None) + del self.connections[peer_id] # Notifee diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index e9d0f35..3bf5fa1 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -353,8 +353,8 @@ class GossipSub(IPubsubRouter): # TODO: there's no way time_since_last_publish gets set anywhere yet if self.time_since_last_publish[topic] > self.time_to_live: # Remove topic from fanout - self.fanout.pop(topic, None) - self.time_since_last_publish.pop(topic, None) + del self.fanout[topic] + del self.time_since_last_publish[topic] else: num_fanout_peers_in_topic = len(self.fanout[topic]) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index ba1994e..869c968 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -298,7 +298,7 @@ class Pubsub: def _handle_dead_peer(self, peer_id: ID) -> None: if peer_id not in self.peers: return - self.peers.pop(peer_id, None) + del self.peers[peer_id] for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: From e355cb2600050abf9b8f8c587c0ca80d18834b03 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 23 Nov 2019 16:04:22 +0800 Subject: [PATCH 3/3] Apply PR feedback: Only use pop method if error handling is in place --- libp2p/pubsub/gossipsub.py | 2 +- libp2p/pubsub/pubsub.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 3bf5fa1..d09db76 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -292,7 +292,7 @@ class GossipSub(IPubsubRouter): await self.emit_prune(topic, peer) # Forget mesh[topic] - self.mesh.pop(topic, None) + del self.mesh[topic] # Heartbeat async def heartbeat(self) -> None: diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 869c968..493a303 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -282,7 +282,7 @@ class Pubsub: await stream.write(encode_varint_prefixed(hello.SerializeToString())) except StreamClosed: logger.debug("Fail to add new peer %s: stream closed", peer_id) - self.peers.pop(peer_id, None) + del self.peers[peer_id] return # TODO: Check EOF of this stream. # TODO: Check if the peer in black list. @@ -290,7 +290,7 @@ class Pubsub: self.router.add_peer(peer_id, stream.get_protocol()) except Exception as error: logger.debug("fail to add new peer %s, error %s", peer_id, error) - self.peers.pop(peer_id, None) + del self.peers[peer_id] return logger.debug("added new peer %s", peer_id) @@ -410,7 +410,7 @@ class Pubsub: if topic_id not in self.my_topics: return # Remove topic_id from map if present - self.my_topics.pop(topic_id, None) + del self.my_topics[topic_id] # Create unsubscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC()