diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index d09db76..ea2594e 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -106,7 +106,8 @@ class GossipSub(IPubsubRouter): logger.debug("attached to pusub") # Start heartbeat now that we have a pubsub instance - # TODO: Start after delay + # Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501 + await asyncio.sleep(0.1) asyncio.ensure_future(self.heartbeat()) def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None: @@ -127,8 +128,7 @@ class GossipSub(IPubsubRouter): # instance in multistream-select, but it is not the protocol that gossipsub supports. # In this case, probably we registered gossipsub to a wrong `protocol_id` # in multistream-select, or wrong versions. - # TODO: Better handling - raise Exception(f"protocol is not supported: protocol_id={protocol_id}") + raise Exception(f"This should not happen. Protocol={protocol_id} is not supported.") self.peers_to_protocol[peer_id] = protocol_id def remove_peer(self, peer_id: ID) -> None: @@ -187,7 +187,6 @@ class GossipSub(IPubsubRouter): stream = self.pubsub.peers[peer_id] # FIXME: We should add a `WriteMsg` similar to write delimited messages. # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 - # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. try: await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) except StreamClosed: @@ -218,15 +217,9 @@ class GossipSub(IPubsubRouter): # gossipsub peers in_topic_gossipsub_peers: List[ID] = None - # TODO: Do we need to check `topic in self.pubsub.my_topics`? if topic in self.mesh: in_topic_gossipsub_peers = self.mesh[topic] else: - # TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not - # subscribed? - # I assume there could be short periods between heartbeats where topic may not - # be but we should check that this path gets hit appropriately - if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): # If no peers in fanout, choose some peers from gossipsub peers in topic. self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( @@ -373,7 +366,6 @@ class GossipSub(IPubsubRouter): for topic in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: - # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in a topic and only add them if they are gossipsub peers too if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] @@ -397,7 +389,6 @@ class GossipSub(IPubsubRouter): if topic not in self.mesh: msg_ids = self.mcache.window(topic) if msg_ids: - # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in topic and only add if they are gossipsub peers also if topic in self.pubsub.peer_topics: # Select D peers from peers.gossipsub[topic] @@ -409,7 +400,6 @@ class GossipSub(IPubsubRouter): peer not in self.mesh[topic] and peer not in self.fanout[topic] ): - msg_id_strs = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer)