Cleanup outdated TODOs in gossipsub
This commit is contained in:
parent
0a52a05375
commit
50fd0acf41
|
@ -106,7 +106,8 @@ class GossipSub(IPubsubRouter):
|
||||||
logger.debug("attached to pusub")
|
logger.debug("attached to pusub")
|
||||||
|
|
||||||
# Start heartbeat now that we have a pubsub instance
|
# Start heartbeat now that we have a pubsub instance
|
||||||
# TODO: Start after delay
|
# Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
asyncio.ensure_future(self.heartbeat())
|
asyncio.ensure_future(self.heartbeat())
|
||||||
|
|
||||||
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
|
def add_peer(self, peer_id: ID, protocol_id: TProtocol) -> None:
|
||||||
|
@ -127,8 +128,7 @@ class GossipSub(IPubsubRouter):
|
||||||
# instance in multistream-select, but it is not the protocol that gossipsub supports.
|
# instance in multistream-select, but it is not the protocol that gossipsub supports.
|
||||||
# In this case, probably we registered gossipsub to a wrong `protocol_id`
|
# In this case, probably we registered gossipsub to a wrong `protocol_id`
|
||||||
# in multistream-select, or wrong versions.
|
# in multistream-select, or wrong versions.
|
||||||
# TODO: Better handling
|
raise Exception(f"This should not happen. Protocol={protocol_id} is not supported.")
|
||||||
raise Exception(f"protocol is not supported: protocol_id={protocol_id}")
|
|
||||||
self.peers_to_protocol[peer_id] = protocol_id
|
self.peers_to_protocol[peer_id] = protocol_id
|
||||||
|
|
||||||
def remove_peer(self, peer_id: ID) -> None:
|
def remove_peer(self, peer_id: ID) -> None:
|
||||||
|
@ -187,7 +187,6 @@ class GossipSub(IPubsubRouter):
|
||||||
stream = self.pubsub.peers[peer_id]
|
stream = self.pubsub.peers[peer_id]
|
||||||
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
|
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
|
||||||
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
|
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
|
||||||
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
|
|
||||||
try:
|
try:
|
||||||
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
|
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
|
||||||
except StreamClosed:
|
except StreamClosed:
|
||||||
|
@ -218,15 +217,9 @@ class GossipSub(IPubsubRouter):
|
||||||
|
|
||||||
# gossipsub peers
|
# gossipsub peers
|
||||||
in_topic_gossipsub_peers: List[ID] = None
|
in_topic_gossipsub_peers: List[ID] = None
|
||||||
# TODO: Do we need to check `topic in self.pubsub.my_topics`?
|
|
||||||
if topic in self.mesh:
|
if topic in self.mesh:
|
||||||
in_topic_gossipsub_peers = self.mesh[topic]
|
in_topic_gossipsub_peers = self.mesh[topic]
|
||||||
else:
|
else:
|
||||||
# TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not
|
|
||||||
# subscribed?
|
|
||||||
# I assume there could be short periods between heartbeats where topic may not
|
|
||||||
# be but we should check that this path gets hit appropriately
|
|
||||||
|
|
||||||
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
|
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
|
||||||
# If no peers in fanout, choose some peers from gossipsub peers in topic.
|
# If no peers in fanout, choose some peers from gossipsub peers in topic.
|
||||||
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
|
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
|
||||||
|
@ -373,7 +366,6 @@ class GossipSub(IPubsubRouter):
|
||||||
for topic in self.mesh:
|
for topic in self.mesh:
|
||||||
msg_ids = self.mcache.window(topic)
|
msg_ids = self.mcache.window(topic)
|
||||||
if msg_ids:
|
if msg_ids:
|
||||||
# TODO: Make more efficient, possibly using a generator?
|
|
||||||
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too
|
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too
|
||||||
if topic in self.pubsub.peer_topics:
|
if topic in self.pubsub.peer_topics:
|
||||||
# Select D peers from peers.gossipsub[topic]
|
# Select D peers from peers.gossipsub[topic]
|
||||||
|
@ -397,7 +389,6 @@ class GossipSub(IPubsubRouter):
|
||||||
if topic not in self.mesh:
|
if topic not in self.mesh:
|
||||||
msg_ids = self.mcache.window(topic)
|
msg_ids = self.mcache.window(topic)
|
||||||
if msg_ids:
|
if msg_ids:
|
||||||
# TODO: Make more efficient, possibly using a generator?
|
|
||||||
# Get all pubsub peers in topic and only add if they are gossipsub peers also
|
# Get all pubsub peers in topic and only add if they are gossipsub peers also
|
||||||
if topic in self.pubsub.peer_topics:
|
if topic in self.pubsub.peer_topics:
|
||||||
# Select D peers from peers.gossipsub[topic]
|
# Select D peers from peers.gossipsub[topic]
|
||||||
|
@ -409,7 +400,6 @@ class GossipSub(IPubsubRouter):
|
||||||
peer not in self.mesh[topic]
|
peer not in self.mesh[topic]
|
||||||
and peer not in self.fanout[topic]
|
and peer not in self.fanout[topic]
|
||||||
):
|
):
|
||||||
|
|
||||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
msg_id_strs = [str(msg) for msg in msg_ids]
|
||||||
await self.emit_ihave(topic, msg_id_strs, peer)
|
await self.emit_ihave(topic, msg_id_strs, peer)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user