diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 678e509..b7b7910 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -89,6 +89,8 @@ class FloodSub(IPubsubRouter): logger.debug("publishing message %s", pubsub_msg) for peer_id in peers_gen: + if peer_id not in self.pubsub.peers: + continue stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index bddafa0..a64b9ca 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -193,6 +193,8 @@ class GossipSub(IPubsubRouter): logger.debug("publishing message %s", pubsub_msg) for peer_id in peers_gen: + if peer_id not in self.pubsub.peers: + continue stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 @@ -604,6 +606,11 @@ class GossipSub(IPubsubRouter): rpc_msg: bytes = packet.SerializeToString() # 3) Get the stream to this peer + if sender_peer_id not in self.pubsub.peers: + logger.debug( + "Fail to responed to iwant request from %s: peer disconnected", + sender_peer_id, + ) peer_stream = self.pubsub.peers[sender_peer_id] # 4) And write the packet to the stream @@ -710,6 +717,8 @@ class GossipSub(IPubsubRouter): rpc_msg: bytes = packet.SerializeToString() # Get stream for peer from pubsub + if to_peer not in self.pubsub.peers: + logger.debug("Fail to emit control message to %s: peer disconnected", to_peer) peer_stream = self.pubsub.peers[to_peer] # Write rpc to stream