Apply PR feedback:

use defaultdict and init control message
This commit is contained in:
NIC619 2019-12-06 23:42:31 +08:00
parent db0017ddbb
commit 2d3bfc8184
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17

View File

@ -1,8 +1,9 @@
from ast import literal_eval
import asyncio
from collections import defaultdict
import logging
import random
from typing import Any, Dict, Iterable, List, Sequence, Tuple
from typing import Any, DefaultDict, Dict, Iterable, List, Sequence, Tuple
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID
@ -319,24 +320,24 @@ class GossipSub(IPubsubRouter):
# Starting with GRAFT messages
for peer, topics in peers_to_graft.items():
for topic in topics:
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft()
graft_msg.topicID = topic
graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft(topicID=topic)
graft_msgs.append(graft_msg)
# If there are also PRUNE messages to send to this peer
if peer in peers_to_prune:
for topic in peers_to_prune[peer]:
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune(
topicID=topic
)
prune_msgs.append(prune_msg)
del peers_to_prune[peer]
# If there are also IHAVE messages to send to this peer
if peer in peers_to_gossip:
for topic in peers_to_gossip[peer]:
ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic])
ihave_msg.topicID = topic
ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave(
messageIDs=peers_to_gossip[peer][topic], topicID=topic
)
ihave_msgs.append(ihave_msg)
del peers_to_gossip[peer]
@ -347,17 +348,16 @@ class GossipSub(IPubsubRouter):
for peer, topics in peers_to_prune.items():
prune_msgs = []
for topic in topics:
prune_msg = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
prune_msg = rpc_pb2.ControlPrune(topicID=topic)
prune_msgs.append(prune_msg)
# If there are also IHAVE messages to send to this peer
if peer in peers_to_gossip:
ihave_msgs = []
for topic in peers_to_gossip[peer]:
ihave_msg = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic])
ihave_msg.topicID = topic
ihave_msg = rpc_pb2.ControlIHave(
messageIDs=peers_to_gossip[peer][topic], topicID=topic
)
ihave_msgs.append(ihave_msg)
del peers_to_gossip[peer]
@ -368,9 +368,9 @@ class GossipSub(IPubsubRouter):
for peer in peers_to_gossip:
ihave_msgs = []
for topic in peers_to_gossip[peer]:
ihave_msg = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(peers_to_gossip[peer][topic])
ihave_msg.topicID = topic
ihave_msg = rpc_pb2.ControlIHave(
messageIDs=peers_to_gossip[peer][topic], topicID=topic
)
ihave_msgs.append(ihave_msg)
control_msg = self.pack_control_msgs(ihave_msgs, None, None)
@ -402,9 +402,11 @@ class GossipSub(IPubsubRouter):
await asyncio.sleep(self.heartbeat_interval)
def mesh_heartbeat(self) -> Tuple[Dict[ID, List[str]], Dict[ID, List[str]]]:
peers_to_graft: Dict[ID, List[str]] = {}
peers_to_prune: Dict[ID, List[str]] = {}
def mesh_heartbeat(
self
) -> Tuple[DefaultDict[ID, List[str]], DefaultDict[ID, List[str]]]:
peers_to_graft: DefaultDict[ID, List[str]] = defaultdict(list)
peers_to_prune: DefaultDict[ID, List[str]] = defaultdict(list)
for topic in self.mesh:
# Skip if no peers have subscribed to the topic
if topic not in self.pubsub.peer_topics:
@ -422,10 +424,7 @@ class GossipSub(IPubsubRouter):
self.mesh[topic].append(peer)
# Emit GRAFT(topic) control message to peer
if peer not in peers_to_graft:
peers_to_graft[peer] = [topic]
else:
peers_to_graft[peer].append(topic)
peers_to_graft[peer].append(topic)
if num_mesh_peers_in_topic > self.degree_high:
# Select |mesh[topic]| - D peers from mesh[topic]
@ -437,10 +436,7 @@ class GossipSub(IPubsubRouter):
self.mesh[topic].remove(peer)
# Emit PRUNE(topic) control message to peer
if peer not in peers_to_prune:
peers_to_prune[peer] = [topic]
else:
peers_to_prune[peer].append(topic)
peers_to_prune[peer].append(topic)
return peers_to_graft, peers_to_prune
def fanout_heartbeat(self) -> None:
@ -478,8 +474,8 @@ class GossipSub(IPubsubRouter):
# Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers)
def gossip_heartbeat(self) -> Dict[ID, Dict[str, List[str]]]:
peers_to_gossip: Dict[ID, Dict[str, List[str]]] = {}
def gossip_heartbeat(self) -> DefaultDict[ID, Dict[str, List[str]]]:
peers_to_gossip: DefaultDict[ID, Dict[str, List[str]]] = defaultdict(dict)
for topic in self.mesh:
msg_ids = self.mcache.window(topic)
if msg_ids:
@ -492,10 +488,7 @@ class GossipSub(IPubsubRouter):
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
for peer in peers_to_emit_ihave_to:
if peer not in peers_to_gossip:
peers_to_gossip[peer] = {topic: msg_id_strs}
else:
peers_to_gossip[peer][topic] = msg_id_strs
peers_to_gossip[peer][topic] = msg_id_strs
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
# Do the same for fanout, for all topics not already hit in mesh
@ -510,10 +503,7 @@ class GossipSub(IPubsubRouter):
)
msg_id_strs = [str(msg) for msg in msg_ids]
for peer in peers_to_emit_ihave_to:
if peer not in peers_to_gossip:
peers_to_gossip[peer] = {topic: msg_id_strs}
else:
peers_to_gossip[peer][topic] = msg_id_strs
peers_to_gossip[peer][topic] = msg_id_strs
return peers_to_gossip
@staticmethod