Group messages for peer in heartbeat
This commit is contained in:
parent
ab1500c708
commit
5efdf4c703
|
@ -2,7 +2,7 @@ from ast import literal_eval
|
|||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
from typing import Any, Dict, Iterable, List, Sequence, Set
|
||||
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple
|
||||
|
||||
from libp2p.network.stream.exceptions import StreamClosed
|
||||
from libp2p.peer.id import ID
|
||||
|
@ -305,6 +305,75 @@ class GossipSub(IPubsubRouter):
|
|||
# Forget mesh[topic]
|
||||
del self.mesh[topic]
|
||||
|
||||
async def _emit_control_msgs(
|
||||
self,
|
||||
peers_to_graft: Dict[ID, List[str]],
|
||||
peers_to_prune: Dict[ID, List[str]],
|
||||
peers_to_gossip: Dict[ID, Dict[str, List[str]]],
|
||||
) -> None:
|
||||
# Starting with GRAFT messages
|
||||
for peer, topics in peers_to_graft.items():
|
||||
graft_msgs: List[rpc_pb2.ControlGraft] = []
|
||||
for topic in topics:
|
||||
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft()
|
||||
graft_msg.topicID = topic
|
||||
graft_msgs.append(graft_msg)
|
||||
|
||||
# If there are also PRUNE messages to send to this peer
|
||||
if peer in peers_to_prune:
|
||||
prune_msgs: List[rpc_pb2.ControlPrune] = []
|
||||
for topic in peers_to_prune[peer]:
|
||||
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
|
||||
prune_msg.topicID = topic
|
||||
prune_msgs.append(prune_msg)
|
||||
del peers_to_prune[peer]
|
||||
|
||||
# If there are also IHAVE messages to send to this peer
|
||||
if peer in peers_to_gossip:
|
||||
ihave_msgs: List[rpc_pb2.ControlIHave] = []
|
||||
for topic in peers_to_gossip[peer]:
|
||||
ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave()
|
||||
ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic])
|
||||
ihave_msg.topicID = topic
|
||||
ihave_msgs.append(ihave_msg)
|
||||
del peers_to_gossip[peer]
|
||||
|
||||
control_msg = self.pack_control_msgs(ihave_msgs, graft_msgs, prune_msgs)
|
||||
await self.emit_control_message(control_msg, peer)
|
||||
|
||||
# Next with PRUNE messages
|
||||
for peer, topics in peers_to_prune.items():
|
||||
prune_msgs = []
|
||||
for topic in topics:
|
||||
prune_msg = rpc_pb2.ControlPrune()
|
||||
prune_msg.topicID = topic
|
||||
prune_msgs.append(prune_msg)
|
||||
|
||||
# If there are also IHAVE messages to send to this peer
|
||||
if peer in peers_to_gossip:
|
||||
ihave_msgs = []
|
||||
for topic in peers_to_gossip[peer]:
|
||||
ihave_msg = rpc_pb2.ControlIHave()
|
||||
ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic])
|
||||
ihave_msg.topicID = topic
|
||||
ihave_msgs.append(ihave_msg)
|
||||
del peers_to_gossip[peer]
|
||||
|
||||
control_msg = self.pack_control_msgs(ihave_msgs, None, prune_msgs)
|
||||
await self.emit_control_message(control_msg, peer)
|
||||
|
||||
# Fianlly IHAVE messages
|
||||
for peer in peers_to_gossip:
|
||||
ihave_msgs = []
|
||||
for topic in peers_to_gossip[peer]:
|
||||
ihave_msg = rpc_pb2.ControlIHave()
|
||||
ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic])
|
||||
ihave_msg.topicID = topic
|
||||
ihave_msgs.append(ihave_msg)
|
||||
|
||||
control_msg = self.pack_control_msgs(ihave_msgs, None, None)
|
||||
await self.emit_control_message(control_msg, peer)
|
||||
|
||||
# Heartbeat
|
||||
async def heartbeat(self) -> None:
|
||||
"""
|
||||
|
@ -316,15 +385,24 @@ class GossipSub(IPubsubRouter):
|
|||
# Start after a delay. Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L410 # Noqa: E501
|
||||
await asyncio.sleep(self.heartbeat_initial_delay)
|
||||
while True:
|
||||
# Maintain mesh and keep track of which peers to send GRAFT or PRUNE to
|
||||
peers_to_graft, peers_to_prune = self.mesh_heartbeat()
|
||||
# Maintain fanout
|
||||
self.fanout_heartbeat()
|
||||
# Get the peers to send IHAVE to
|
||||
peers_to_gossip = self.gossip_heartbeat()
|
||||
# Pack GRAFT, PRUNE and IHAVE for the same peer into one control message and send it
|
||||
await self._emit_control_msgs(
|
||||
peers_to_graft, peers_to_prune, peers_to_gossip
|
||||
)
|
||||
|
||||
await self.mesh_heartbeat()
|
||||
await self.fanout_heartbeat()
|
||||
await self.gossip_heartbeat()
|
||||
self.mcache.shift()
|
||||
|
||||
await asyncio.sleep(self.heartbeat_interval)
|
||||
|
||||
async def mesh_heartbeat(self) -> None:
|
||||
# Note: the comments here are the exact pseudocode from the spec
|
||||
def mesh_heartbeat(self) -> Tuple[Dict[ID, List[str]], Dict[ID, List[str]]]:
|
||||
peers_to_graft: Dict[ID, List[str]] = {}
|
||||
peers_to_prune: Dict[ID, List[str]] = {}
|
||||
for topic in self.mesh:
|
||||
# Skip if no peers have subscribed to the topic
|
||||
if topic not in self.pubsub.peer_topics:
|
||||
|
@ -342,7 +420,10 @@ class GossipSub(IPubsubRouter):
|
|||
self.mesh[topic].append(peer)
|
||||
|
||||
# Emit GRAFT(topic) control message to peer
|
||||
await self.emit_graft(topic, peer)
|
||||
if peer not in peers_to_graft:
|
||||
peers_to_graft[peer] = [topic]
|
||||
else:
|
||||
peers_to_graft[peer].append(topic)
|
||||
|
||||
if num_mesh_peers_in_topic > self.degree_high:
|
||||
# Select |mesh[topic]| - D peers from mesh[topic]
|
||||
|
@ -354,9 +435,13 @@ class GossipSub(IPubsubRouter):
|
|||
self.mesh[topic].remove(peer)
|
||||
|
||||
# Emit PRUNE(topic) control message to peer
|
||||
await self.emit_prune(topic, peer)
|
||||
if peer not in peers_to_prune:
|
||||
peers_to_prune[peer] = [topic]
|
||||
else:
|
||||
peers_to_prune[peer].append(topic)
|
||||
return peers_to_graft, peers_to_prune
|
||||
|
||||
async def fanout_heartbeat(self) -> None:
|
||||
def fanout_heartbeat(self) -> None:
|
||||
# Note: the comments here are the exact pseudocode from the spec
|
||||
for topic in self.fanout:
|
||||
# Delete topic entry if it's not in `pubsub.peer_topics`
|
||||
|
@ -388,7 +473,8 @@ class GossipSub(IPubsubRouter):
|
|||
# Add the peers to fanout[topic]
|
||||
self.fanout[topic].extend(selected_peers)
|
||||
|
||||
async def gossip_heartbeat(self) -> None:
|
||||
def gossip_heartbeat(self) -> Dict[ID, Dict[str, List[str]]]:
|
||||
peers_to_gossip: Dict[ID, Dict[str, List[str]]] = {}
|
||||
for topic in self.mesh:
|
||||
msg_ids = self.mcache.window(topic)
|
||||
if msg_ids:
|
||||
|
@ -401,7 +487,10 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
|
||||
for peer in peers_to_emit_ihave_to:
|
||||
await self.emit_ihave(topic, msg_id_strs, peer)
|
||||
if peer not in peers_to_gossip:
|
||||
peers_to_gossip[peer] = {topic: msg_id_strs}
|
||||
else:
|
||||
peers_to_gossip[peer][topic] = msg_id_strs
|
||||
|
||||
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
|
||||
# Do the same for fanout, for all topics not already hit in mesh
|
||||
|
@ -417,9 +506,11 @@ class GossipSub(IPubsubRouter):
|
|||
)
|
||||
msg_id_strs = [str(msg) for msg in msg_ids]
|
||||
for peer in peers_to_emit_ihave_to:
|
||||
await self.emit_ihave(topic, msg_id_strs, peer)
|
||||
|
||||
self.mcache.shift()
|
||||
if peer not in peers_to_gossip:
|
||||
peers_to_gossip[peer] = {topic: msg_id_strs}
|
||||
else:
|
||||
peers_to_gossip[peer][topic] = msg_id_strs
|
||||
return peers_to_gossip
|
||||
|
||||
@staticmethod
|
||||
def select_from_minus(
|
||||
|
@ -554,6 +645,21 @@ class GossipSub(IPubsubRouter):
|
|||
|
||||
# RPC emitters
|
||||
|
||||
def pack_control_msgs(
|
||||
self,
|
||||
ihave_msgs: List[rpc_pb2.ControlIHave],
|
||||
graft_msgs: List[rpc_pb2.ControlGraft],
|
||||
prune_msgs: List[rpc_pb2.ControlPrune],
|
||||
) -> rpc_pb2.ControlMessage:
|
||||
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
|
||||
if ihave_msgs:
|
||||
control_msg.ihave.extend(ihave_msgs)
|
||||
if graft_msgs:
|
||||
control_msg.graft.extend(graft_msgs)
|
||||
if prune_msgs:
|
||||
control_msg.prune.extend(prune_msgs)
|
||||
return control_msg
|
||||
|
||||
async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None:
|
||||
"""Emit ihave message, sent to to_peer, for topic and msg_ids."""
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user