diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index bac0bd7..678e509 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -77,10 +77,12 @@ class FloodSub(IPubsubRouter): :param pubsub_msg: pubsub message in protobuf. """ - peers_gen = self._get_peers_to_send( - pubsub_msg.topicIDs, - msg_forwarder=msg_forwarder, - origin=ID(pubsub_msg.from_id), + peers_gen = set( + self._get_peers_to_send( + pubsub_msg.topicIDs, + msg_forwarder=msg_forwarder, + origin=ID(pubsub_msg.from_id), + ) ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 045ef39..1969d4f 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -148,11 +148,9 @@ class GossipSub(IPubsubRouter): for topic in self.mesh: if peer_id in self.mesh[topic]: - # Delete the entry if no other peers left self.mesh[topic].remove(peer_id) for topic in self.fanout: if peer_id in self.fanout[topic]: - # Delete the entry if no other peers left self.fanout[topic].remove(peer_id) self.peers_to_protocol.pop(peer_id, None) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index a44aa05..6e9d948 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -289,24 +289,22 @@ class Pubsub: logger.debug("fail to add new peer %s, error %s", peer_id, error) return - self.peers[peer_id] = stream - # Send hello packet hello = self.get_hello_packet() try: await stream.write(encode_varint_prefixed(hello.SerializeToString())) except StreamClosed: logger.debug("Fail to add new peer %s: stream closed", peer_id) - del self.peers[peer_id] return # TODO: Check if the peer in black list. try: self.router.add_peer(peer_id, stream.get_protocol()) except Exception as error: logger.debug("fail to add new peer %s, error %s", peer_id, error) - del self.peers[peer_id] return + self.peers[peer_id] = stream + logger.debug("added new peer %s", peer_id) def _handle_dead_peer(self, peer_id: ID) -> None: @@ -316,7 +314,6 @@ class Pubsub: for topic in self.peer_topics: if peer_id in self.peer_topics[topic]: - # Delete the entry if no other peers left self.peer_topics[topic].remove(peer_id) self.router.remove_peer(peer_id) @@ -360,7 +357,6 @@ class Pubsub: else: if sub_message.topicid in self.peer_topics: if origin_id in self.peer_topics[sub_message.topicid]: - # Delete the entry if no other peers left self.peer_topics[sub_message.topicid].remove(origin_id) # FIXME(mhchia): Change the function name?