The Gossipsub PR (#162)

* Add handle_rpc call to pubsub

* Scaffold gossipsub functions

* Add timer

* Implement most of mesh construction

* Implement emit and handle

* Implement fanout heartbeat

* Refactor emit

* some gossipsub cleanup and test

* minor lint stuff, more to come

* Implement publish

* Fix comment

* Modify pubsub/gossipsub so that floodsub tests pass using gossipsub router

* Add floodsub tests to gossipsub

* Handle case where select_from_minus, num_to_select > size(pool-minus)

* Add topic membership

* Implement handle ihave

* Implement most of iwant

* Add mcache.add and comments

* Refactor handle_ihave

* Implement stream write in handle_iwant

* Implement gossip heartbeat

* unresolved vars

* initial mcache code

* documenting mcache

* writing test/debugging mcache

* finished mcache test and debugged

* Make gossipsub backward compatibility its own file

* remove mcache prints

* DEBUGGING

* Add sender_peer_id to handle_rpc to get gossip test passing

* Modify gossipsub to make fanout work

* fanout maintenance test

* debugging gsub GOSSIP

* DEBUGGING

* debugged sender seen cachce

* adding lru, removing prints

* pylint cleanup

* Fix github comments in PR

* minor floodsub possible bugfix
This commit is contained in:
Robert Zajac 2019-05-06 23:44:13 -04:00 committed by GitHub
parent eea6a9fda7
commit 9052e8f8bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1663 additions and 13 deletions

View File

@ -35,7 +35,7 @@ class FloodSub(IPubsubRouter):
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
def handle_rpc(self, rpc): async def handle_rpc(self, rpc, sender_peer_id):
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -64,6 +64,11 @@ class FloodSub(IPubsubRouter):
for message in packet.publish: for message in packet.publish:
decoded_from_id = message.from_id.decode('utf-8') decoded_from_id = message.from_id.decode('utf-8')
if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()):
id_in_seen_msgs = (message.seqno, message.from_id)
if id_in_seen_msgs not in self.pubsub.seen_messages:
self.pubsub.seen_messages[id_in_seen_msgs] = 1
await self.pubsub.handle_talk(message) await self.pubsub.handle_talk(message)
# Deliver to self and peers # Deliver to self and peers
@ -81,7 +86,7 @@ class FloodSub(IPubsubRouter):
# Publish the packet # Publish the packet
await stream.write(new_packet.SerializeToString()) await stream.write(new_packet.SerializeToString())
def join(self, topic): async def join(self, topic):
""" """
Join notifies the router that we want to receive and Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the forward messages in a topic. It is invoked after the
@ -89,7 +94,7 @@ class FloodSub(IPubsubRouter):
:param topic: topic to join :param topic: topic to join
""" """
def leave(self, topic): async def leave(self, topic):
""" """
Leave notifies the router that we are no longer interested in a topic. Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.

543
libp2p/pubsub/gossipsub.py Normal file
View File

@ -0,0 +1,543 @@
import random
import asyncio
from ast import literal_eval
from .pb import rpc_pb2
from .pubsub_router_interface import IPubsubRouter
from .mcache import MessageCache
class GossipSub(IPubsubRouter):
# pylint: disable=no-member
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
def __init__(self, protocols, degree, degree_low, degree_high, time_to_live, gossip_window=3,
gossip_history=5, heartbeat_interval=120):
# pylint: disable=too-many-arguments
self.protocols = protocols
self.pubsub = None
# Store target degree, upper degree bound, and lower degree bound
self.degree = degree
self.degree_high = degree_high
self.degree_low = degree_low
# Store time to live (for topics in fanout)
self.time_to_live = time_to_live
# Create topic --> list of peers mappings
self.mesh = {}
self.fanout = {}
# Create topic --> time since last publish map
self.time_since_last_publish = {}
self.peers_gossipsub = []
self.peers_floodsub = []
# Create message cache
self.mcache = MessageCache(gossip_window, gossip_history)
# Create heartbeat timer
self.heartbeat_interval = heartbeat_interval
# Interface functions
def get_protocols(self):
"""
:return: the list of protocols supported by the router
"""
return self.protocols
def attach(self, pubsub):
"""
Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance.
:param pubsub: pubsub instance to attach to
"""
self.pubsub = pubsub
# Start heartbeat now that we have a pubsub instance
# TODO: Start after delay
asyncio.ensure_future(self.heartbeat())
def add_peer(self, peer_id, protocol_id):
"""
Notifies the router that a new peer has been connected
:param peer_id: id of peer to add
"""
# Add peer to the correct peer list
peer_type = GossipSub.get_peer_type(protocol_id)
peer_id_str = str(peer_id)
if peer_type == "gossip":
self.peers_gossipsub.append(peer_id_str)
elif peer_type == "flood":
self.peers_floodsub.append(peer_id_str)
def remove_peer(self, peer_id):
"""
Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove
"""
peer_id_str = str(peer_id)
self.peers_to_protocol.remove(peer_id_str)
async def handle_rpc(self, rpc, sender_peer_id):
"""
Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed
:param rpc: rpc message
"""
control_message = rpc.control
# Relay each rpc control to the appropriate handler
if control_message.ihave:
for ihave in control_message.ihave:
await self.handle_ihave(ihave, sender_peer_id)
if control_message.iwant:
for iwant in control_message.iwant:
await self.handle_iwant(iwant, sender_peer_id)
if control_message.graft:
for graft in control_message.graft:
await self.handle_graft(graft, sender_peer_id)
if control_message.prune:
for prune in control_message.prune:
await self.handle_prune(prune, sender_peer_id)
async def publish(self, sender_peer_id, rpc_message):
# pylint: disable=too-many-locals
"""
Invoked to forward a new message that has been validated.
"""
packet = rpc_pb2.RPC()
packet.ParseFromString(rpc_message)
msg_sender = str(sender_peer_id)
# Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message
for message in packet.publish:
# Add RPC message to cache
self.mcache.put(message)
decoded_from_id = message.from_id.decode('utf-8')
new_packet = rpc_pb2.RPC()
new_packet.publish.extend([message])
new_packet_serialized = new_packet.SerializeToString()
# Deliver to self if needed
if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()):
id_in_seen_msgs = (message.seqno, message.from_id)
if id_in_seen_msgs not in self.pubsub.seen_messages:
self.pubsub.seen_messages[id_in_seen_msgs] = 1
await self.pubsub.handle_talk(message)
# Deliver to peers
for topic in message.topicIDs:
# If topic has floodsub peers, deliver to floodsub peers
# TODO: This can be done more efficiently. Do it more efficiently.
floodsub_peers_in_topic = []
if topic in self.pubsub.peer_topics:
for peer in self.pubsub.peer_topics[topic]:
if str(peer) in self.peers_floodsub:
floodsub_peers_in_topic.append(peer)
await self.deliver_messages_to_peers(floodsub_peers_in_topic, msg_sender,
decoded_from_id, new_packet_serialized)
# If you are subscribed to topic, send to mesh, otherwise send to fanout
if topic in self.pubsub.my_topics and topic in self.mesh:
await self.deliver_messages_to_peers(self.mesh[topic], msg_sender,
decoded_from_id, new_packet_serialized)
else:
# Send to fanout peers
if topic not in self.fanout:
# If no peers in fanout, choose some peers from gossipsub peers in topic
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
selected = \
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, [])
self.fanout[topic] = selected
# TODO: Is topic DEFINITELY supposed to be in fanout if we are not subscribed?
# I assume there could be short periods between heartbeats where topic may not
# be but we should check that this path gets hit appropriately
await self.deliver_messages_to_peers(self.fanout[topic], msg_sender,
decoded_from_id, new_packet_serialized)
async def join(self, topic):
# Note: the comments here are the near-exact algorithm description from the spec
"""
Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the
subscription announcement
:param topic: topic to join
"""
# Create mesh[topic] if it does not yet exist
if topic not in self.mesh:
self.mesh[topic] = []
if topic in self.fanout and len(self.fanout[topic]) == self.degree:
# If router already has D peers from the fanout peers of a topic
# TODO: Do we remove all peers from fanout[topic]?
# Add them to mesh[topic], and notifies them with a
# GRAFT(topic) control message.
for peer in self.fanout[topic]:
self.mesh[topic].append(peer)
await self.emit_graft(topic, peer)
else:
# Otherwise, if there are less than D peers
# (let this number be x) in the fanout for a topic (or the topic is not in the fanout),
fanout_size = 0
if topic in self.fanout:
fanout_size = len(self.fanout[topic])
# then it still adds them as above (if there are any)
for peer in self.fanout[topic]:
self.mesh[topic].append(peer)
await self.emit_graft(topic, peer)
if topic in self.peers_gossipsub:
# TODO: Should we have self.fanout[topic] here or [] (as the minus variable)?
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic]
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
selected_peers = \
GossipSub.select_from_minus(self.degree - fanout_size,
gossipsub_peers_in_topic,
self.fanout[topic] if topic in self.fanout else [])
# And likewise adds them to mesh[topic] and notifies them with a
# GRAFT(topic) control message.
for peer in selected_peers:
self.mesh[topic].append(peer)
await self.emit_graft(topic, peer)
# TODO: Do we remove all peers from fanout[topic]?
async def leave(self, topic):
# Note: the comments here are the near-exact algorithm description from the spec
"""
Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement.
:param topic: topic to leave
"""
# Notify the peers in mesh[topic] with a PRUNE(topic) message
for peer in self.mesh[topic]:
await self.emit_prune(topic, peer)
# Forget mesh[topic]
self.mesh.pop(topic, None)
# Interface Helper Functions
@staticmethod
def get_peer_type(protocol_id):
# TODO: Do this in a better, more efficient way
if "gossipsub" in protocol_id:
return "gossip"
if "floodsub" in protocol_id:
return "flood"
return "unknown"
async def deliver_messages_to_peers(self, peers, msg_sender, origin_id, serialized_packet):
for peer_id_in_topic in peers:
# Forward to all peers that are not the
# message sender and are not the message origin
if peer_id_in_topic not in (msg_sender, origin_id):
stream = self.pubsub.peers[peer_id_in_topic]
# Publish the packet
await stream.write(serialized_packet)
# Heartbeat
async def heartbeat(self):
"""
Call individual heartbeats.
Note: the heartbeats are called with awaits because each heartbeat depends on the
state changes in the preceding heartbeat
"""
while True:
await self.mesh_heartbeat()
await self.fanout_heartbeat()
await self.gossip_heartbeat()
await asyncio.sleep(self.heartbeat_interval)
async def mesh_heartbeat(self):
# Note: the comments here are the exact pseudocode from the spec
for topic in self.mesh:
num_mesh_peers_in_topic = len(self.mesh[topic])
if num_mesh_peers_in_topic < self.degree_low:
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
selected_peers = GossipSub.select_from_minus(self.degree - num_mesh_peers_in_topic,
self.peers_gossipsub, self.mesh[topic])
for peer in selected_peers:
# Add peer to mesh[topic]
self.mesh[topic].append(peer)
# Emit GRAFT(topic) control message to peer
await self.emit_graft(topic, peer)
if num_mesh_peers_in_topic > self.degree_high:
# Select |mesh[topic]| - D peers from mesh[topic]
selected_peers = GossipSub.select_from_minus(num_mesh_peers_in_topic - self.degree,
self.mesh[topic], [])
for peer in selected_peers:
# Remove peer from mesh[topic]
self.mesh[topic].remove(peer)
# Emit PRUNE(topic) control message to peer
await self.emit_prune(topic, peer)
async def fanout_heartbeat(self):
# Note: the comments here are the exact pseudocode from the spec
for topic in self.fanout:
# If time since last published > ttl
# TODO: there's no way time_since_last_publish gets set anywhere yet
if self.time_since_last_publish[topic] > self.time_to_live:
# Remove topic from fanout
self.fanout.remove(topic)
self.time_since_last_publish.remove(topic)
else:
num_fanout_peers_in_topic = len(self.fanout[topic])
# If |fanout[topic]| < D
if num_fanout_peers_in_topic < self.degree:
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic]
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
selected_peers = \
GossipSub.select_from_minus(self.degree - num_fanout_peers_in_topic,
gossipsub_peers_in_topic, self.fanout[topic])
# Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers)
async def gossip_heartbeat(self):
# pylint: disable=too-many-nested-blocks
for topic in self.mesh:
msg_ids = self.mcache.window(topic)
if msg_ids:
# TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too
if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = \
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, [])
for peer in peers_to_emit_ihave_to:
# TODO: this line is a monster, can hopefully be simplified
if (topic not in self.mesh or (peer not in self.mesh[topic]))\
and (topic not in self.fanout or (peer not in self.fanout[topic])):
msg_ids = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_ids, peer)
# Do the same for fanout, for all topics not already hit in mesh
for topic in self.fanout:
if topic not in self.mesh:
msg_ids = self.mcache.window(topic)
if msg_ids:
# TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in topic and only add if they are gossipsub peers also
if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = \
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, [])
for peer in peers_to_emit_ihave_to:
if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
msg_ids = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_ids, peer)
self.mcache.shift()
@staticmethod
def select_from_minus(num_to_select, pool, minus):
"""
Select at most num_to_select subset of elements from the set (pool - minus) randomly.
:param num_to_select: number of elements to randomly select
:param pool: list of items to select from (excluding elements in minus)
:param minus: elements to be excluded from selection pool
:return: list of selected elements
"""
# Create selection pool, which is selection_pool = pool - minus
if minus:
# Create a new selection pool by removing elements of minus
selection_pool = [x for x in pool if x not in minus]
else:
# Don't create a new selection_pool if we are not subbing anything
selection_pool = pool
# If num_to_select > size(selection_pool), then return selection_pool (which has the most
# possible elements s.t. the number of elements is less than num_to_select)
if num_to_select > len(selection_pool):
return selection_pool
# Random selection
selection = random.sample(selection_pool, num_to_select)
return selection
# RPC handlers
async def handle_ihave(self, ihave_msg, sender_peer_id):
"""
Checks the seen set and requests unknown messages with an IWANT message.
"""
# from_id_bytes = ihave_msg.from_id
from_id_str = sender_peer_id
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache
seen_seqnos_and_peers = [seqno_and_from
for seqno_and_from in self.pubsub.seen_messages.keys()]
# Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list
# of messages we want to request
msg_ids_wanted = [msg_id for msg_id in ihave_msg.messageIDs
if literal_eval(msg_id) not in seen_seqnos_and_peers]
# Request messages with IWANT message
if msg_ids_wanted:
await self.emit_iwant(msg_ids_wanted, from_id_str)
async def handle_iwant(self, iwant_msg, sender_peer_id):
"""
Forwards all request messages that are present in mcache to the requesting peer.
"""
from_id_str = sender_peer_id
msg_ids = [literal_eval(msg) for msg in iwant_msg.messageIDs]
msgs_to_forward = []
for msg_id_iwant in msg_ids:
# Check if the wanted message ID is present in mcache
msg = self.mcache.get(msg_id_iwant)
# Cache hit
if msg:
# Add message to list of messages to forward to requesting peers
msgs_to_forward.append(msg)
# Forward messages to requesting peer
# Should this just be publishing? No
# because then the message will forwarded to peers in the topics contained in the messages.
# We should
# 1) Package these messages into a single packet
packet = rpc_pb2.RPC()
packet.publish.extend(msgs_to_forward)
# 2) Serialize that packet
rpc_msg = packet.SerializeToString()
# 3) Get the stream to this peer
# TODO: Should we pass in from_id or from_id_str here?
peer_stream = self.pubsub.peers[from_id_str]
# 4) And write the packet to the stream
await peer_stream.write(rpc_msg)
async def handle_graft(self, graft_msg, sender_peer_id):
topic = graft_msg.topicID
from_id_str = sender_peer_id
# Add peer to mesh for topic
if topic in self.mesh:
self.mesh[topic].append(from_id_str)
else:
self.mesh[topic] = [from_id_str]
async def handle_prune(self, prune_msg, sender_peer_id):
topic = prune_msg.topicID
from_id_str = sender_peer_id
# Remove peer from mesh for topic, if peer is in topic
if topic in self.mesh and from_id_str in self.mesh[topic]:
self.mesh[topic].remove(from_id_str)
# RPC emitters
async def emit_ihave(self, topic, msg_ids, to_peer):
"""
Emit ihave message, sent to to_peer, for topic and msg_ids
"""
ihave_msg = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(msg_ids)
ihave_msg.topicID = topic
control_msg = rpc_pb2.ControlMessage()
control_msg.ihave.extend([ihave_msg])
await self.emit_control_message(control_msg, to_peer)
async def emit_iwant(self, msg_ids, to_peer):
"""
Emit iwant message, sent to to_peer, for msg_ids
"""
iwant_msg = rpc_pb2.ControlIWant()
iwant_msg.messageIDs.extend(msg_ids)
control_msg = rpc_pb2.ControlMessage()
control_msg.iwant.extend([iwant_msg])
await self.emit_control_message(control_msg, to_peer)
async def emit_graft(self, topic, to_peer):
"""
Emit graft message, sent to to_peer, for topic
"""
graft_msg = rpc_pb2.ControlGraft()
graft_msg.topicID = topic
control_msg = rpc_pb2.ControlMessage()
control_msg.graft.extend([graft_msg])
await self.emit_control_message(control_msg, to_peer)
async def emit_prune(self, topic, to_peer):
"""
Emit graft message, sent to to_peer, for topic
"""
prune_msg = rpc_pb2.ControlPrune()
prune_msg.topicID = topic
control_msg = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg])
await self.emit_control_message(control_msg, to_peer)
async def emit_control_message(self, control_msg, to_peer):
# Add control message to packet
packet = rpc_pb2.RPC()
packet.control.CopyFrom(control_msg)
rpc_msg = packet.SerializeToString()
# Get stream for peer from pubsub
peer_stream = self.pubsub.peers[to_peer]
# Write rpc to stream
await peer_stream.write(rpc_msg)

92
libp2p/pubsub/mcache.py Normal file
View File

@ -0,0 +1,92 @@
class MessageCache:
class CacheEntry:
# pylint: disable=too-few-public-methods
"""
A logical representation of an entry in the mcache's _history_.
"""
def __init__(self, mid, topics):
"""
Constructor.
:param mid: (seqno, from_id) of the msg
:param topics: list of topics this message was sent on
"""
self.mid = mid
self.topics = topics
def __init__(self, window_size, history_size):
"""
Constructor.
:param window_size: Size of the window desired.
:param history_size: Size of the history desired.
:return: the MessageCache
"""
self.window_size = window_size
self.history_size = history_size
# (seqno, from_id) -> rpc message
self.msgs = dict()
# max length of history_size. each item is a list of CacheEntry.
# messages lost upon shift().
self.history = []
for _ in range(history_size):
self.history.append([])
def put(self, msg):
"""
Put a message into the mcache.
:param msg: The rpc message to put in. Should contain seqno and from_id
"""
mid = (msg.seqno, msg.from_id)
self.msgs[mid] = msg
if not self.history[0]:
self.history[0] = []
self.history[0].append(self.CacheEntry(mid, msg.topicIDs))
def get(self, mid):
"""
Get a message from the mcache.
:param mid: (seqno, from_id) of the message to get.
:return: The rpc message associated with this mid
"""
if mid in self.msgs:
return self.msgs[mid]
return None
def window(self, topic):
"""
Get the window for this topic.
:param topic: Topic whose message ids we desire.
:return: List of mids in the current window.
"""
mids = []
for entries_list in self.history[: self.window_size]:
for entry in entries_list:
for entry_topic in entry.topics:
if entry_topic == topic:
mids.append(entry.mid)
return mids
def shift(self):
"""
Shift the window over by 1 position, dropping the last element of the history.
"""
last_entries = self.history[len(self.history) - 1]
for entry in last_entries:
del self.msgs[entry.mid]
i = len(self.history) - 2
while i >= 0:
self.history[i + 1] = self.history[i]
i -= 1
self.history[0] = []

View File

@ -1,5 +1,6 @@
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
import asyncio import asyncio
from lru import LRU from lru import LRU
from .pb import rpc_pb2 from .pb import rpc_pb2
@ -34,8 +35,7 @@ class Pubsub():
for protocol in self.protocols: for protocol in self.protocols:
self.host.set_stream_handler(protocol, self.stream_handler) self.host.set_stream_handler(protocol, self.stream_handler)
# TODO: determine if these need to be asyncio queues, or if could possibly # Use asyncio queues for proper context switching
# be ordinary blocking queues
self.incoming_msgs_from_peers = asyncio.Queue() self.incoming_msgs_from_peers = asyncio.Queue()
self.outgoing_messages = asyncio.Queue() self.outgoing_messages = asyncio.Queue()
@ -44,9 +44,10 @@ class Pubsub():
self.cache_size = 128 self.cache_size = 128
else: else:
self.cache_size = cache_size self.cache_size = cache_size
self.seen_messages = LRU(self.cache_size) self.seen_messages = LRU(self.cache_size)
# Map of topics we are subscribed to to handler functions # Map of topics we are subscribed to blocking queues
# for when the given topic receives a message # for when the given topic receives a message
self.my_topics = {} self.my_topics = {}
@ -96,6 +97,7 @@ class Pubsub():
if id_in_seen_msgs not in self.seen_messages: if id_in_seen_msgs not in self.seen_messages:
should_publish = True should_publish = True
self.seen_messages[id_in_seen_msgs] = 1 self.seen_messages[id_in_seen_msgs] = 1
await self.handle_talk(message) await self.handle_talk(message)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
@ -112,6 +114,10 @@ class Pubsub():
# relay message to peers with router # relay message to peers with router
await self.router.publish(peer_id, incoming) await self.router.publish(peer_id, incoming)
if rpc_incoming.control:
# Pass rpc to router so router could perform custom logic
await self.router.handle_rpc(rpc_incoming, peer_id)
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
@ -180,8 +186,9 @@ class Pubsub():
# Add peer to topic # Add peer to topic
self.peer_topics[sub_message.topicid].append(origin_id) self.peer_topics[sub_message.topicid].append(origin_id)
else: else:
# TODO: Remove peer from topic if sub_message.topicid in self.peer_topics:
pass if origin_id in self.peer_topics[sub_message.topicid]:
self.peer_topics[sub_message.topicid].remove(origin_id)
async def handle_talk(self, publish_message): async def handle_talk(self, publish_message):
""" """
@ -217,7 +224,7 @@ class Pubsub():
await self.message_all_peers(packet.SerializeToString()) await self.message_all_peers(packet.SerializeToString())
# Tell router we are joining this topic # Tell router we are joining this topic
self.router.join(topic_id) await self.router.join(topic_id)
# Return the asyncio queue for messages on this topic # Return the asyncio queue for messages on this topic
return self.my_topics[topic_id] return self.my_topics[topic_id]
@ -243,7 +250,7 @@ class Pubsub():
await self.message_all_peers(packet.SerializeToString()) await self.message_all_peers(packet.SerializeToString())
# Tell router we are leaving this topic # Tell router we are leaving this topic
self.router.leave(topic_id) await self.router.leave(topic_id)
async def message_all_peers(self, rpc_msg): async def message_all_peers(self, rpc_msg):
""" """

View File

@ -31,10 +31,13 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def handle_rpc(self, rpc): def handle_rpc(self, rpc, sender_peer_id):
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
TODO: Check if this interface is ok. It's not the exact same as the go code, but the go
code is really confusing with the msg origin, they specify `rpc.from` even when the rpc
shouldn't have a from
:param rpc: rpc message :param rpc: rpc message
""" """

View File

@ -25,7 +25,8 @@ setuptools.setup(
"rpcudp", "rpcudp",
"grpcio", "grpcio",
"grpcio-tools", "grpcio-tools",
"lru-dict>=1.1.6" "lru-dict>=1.1.6",
"aio_timers"
], ],
packages=["libp2p"], packages=["libp2p"],
zip_safe=False, zip_safe=False,

View File

@ -613,4 +613,4 @@ async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj():
} }
] ]
} }
await perform_test_from_obj(test_obj) await perform_test_from_obj(test_obj)

View File

@ -0,0 +1,271 @@
import asyncio
import pytest
import random
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub
from utils import message_id_generator, generate_RPC_packet, \
create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \
connect
from tests.utils import cleanup
SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"]
@pytest.mark.asyncio
async def test_dense():
# Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 10
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5)
# All pubsub subscribe to foobar
queues = []
for pubsub in pubsubs:
q = await pubsub.subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
for i in range(num_msgs):
msg_content = "foo " + str(i)
# randomly pick a message origin
origin_idx = random.randint(0, num_hosts - 1)
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString())
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
items = []
for queue in queues:
msg = await queue.get()
assert msg.data == packet.publish[0].data
items.append(msg.data)
await cleanup()
@pytest.mark.asyncio
async def test_fanout():
# Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 10
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5)
# All pubsub subscribe to foobar
queues = []
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
# Send messages with origin not subscribed
for i in range(num_msgs):
msg_content = "foo " + str(i)
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString())
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString()
# Subscribe message origin
queues.append(await pubsubs[0].subscribe("foobar"))
# Send messages again
for i in range(num_msgs):
msg_content = "foo " + str(i)
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString())
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString()
await cleanup()
@pytest.mark.asyncio
async def test_fanout_maintenance():
# Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 10
num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5)
# All pubsub subscribe to foobar
queues = []
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
# Sparsely connect libp2p hosts in random way
await dense_connect(libp2p_hosts)
# Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2)
# Send messages with origin not subscribed
for i in range(num_msgs):
msg_content = "foo " + str(i)
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString())
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString()
for sub in pubsubs:
await sub.unsubscribe('foobar')
queues = []
await asyncio.sleep(2)
# Resub and repeat
for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar")
# Add each blocking queue to an array of blocking queues
queues.append(q)
await asyncio.sleep(2)
# Check messages can still be sent
for i in range(num_msgs):
msg_content = "foo " + str(i)
# Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString())
await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message
for queue in queues:
msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString()
await cleanup()
@pytest.mark.asyncio
async def test_gossip_propagation():
# Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \
1, 0, 2, 30, 50, 100, 0.5)
node1, node2 = libp2p_hosts[0], libp2p_hosts[1]
sub1, sub2 = pubsubs[0], pubsubs[1]
gsub1, gsub2 = gossipsubs[0], gossipsubs[1]
node1_queue = await sub1.subscribe('foo')
# node 1 publish to topic
msg_content = 'foo_msg'
node1_id = str(node1.get_id())
# Generate message packet
packet = generate_RPC_packet(node1_id, ["foo"], msg_content, next_msg_id_func())
# publish from the randomly chosen host
await gsub1.publish(node1_id, packet.SerializeToString())
# now node 2 subscribes
node2_queue = await sub2.subscribe('foo')
await connect(node2, node1)
# wait for gossip heartbeat
await asyncio.sleep(2)
# should be able to read message
msg = await node2_queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString()
await cleanup()

View File

@ -0,0 +1,519 @@
import asyncio
import multiaddr
import pytest
from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub
from utils import message_id_generator, generate_RPC_packet
from tests.utils import cleanup
# pylint: disable=too-many-locals
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
@pytest.mark.asyncio
async def test_init():
node = await new_node(transport_opt=["/ip4/127.1/tcp/0"])
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.1/tcp/0"))
supported_protocols = ["/gossipsub/1.0.0"]
gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30)
pubsub = Pubsub(node, gossipsub, "a")
# Did it work?
assert gossipsub and pubsub
await cleanup()
async def perform_test_from_obj(obj):
"""
Perform a floodsub test from a test obj.
test obj are composed as follows:
{
"supported_protocols": ["supported/protocol/1.0.0",...],
"adj_list": {
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
...
},
"topic_map": {
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
},
"messages": [
{
"topics": ["topic1_for_message", "topic2_for_message", ...],
"data": "some contents of the message (newlines are not supported)",
"node_id": "message sender node id"
},
...
]
}
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work)
"""
# Step 1) Create graph
adj_list = obj["adj_list"]
node_map = {}
gossipsub_map = {}
pubsub_map = {}
supported_protocols = obj["supported_protocols"]
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[start_node_id] = node
gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30)
gossipsub_map[start_node_id] = gossipsub
pubsub = Pubsub(node, gossipsub, start_node_id)
pubsub_map[start_node_id] = pubsub
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[neighbor_id] = neighbor_node
gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30)
gossipsub_map[neighbor_id] = gossipsub
pubsub = Pubsub(neighbor_node, gossipsub, neighbor_id)
pubsub_map[neighbor_id] = pubsub
# Connect node and neighbor
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
tasks_connect.append(asyncio.sleep(2))
await asyncio.gather(*tasks_connect)
# Allow time for graph creation before continuing
# await asyncio.sleep(0.25)
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic in topic_map:
for node_id in topic_map[topic]:
"""
# Subscribe node to topic
q = await pubsub_map[node_id].subscribe(topic)
# Create topic-queue map for node_id if one does not yet exist
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
"""
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
for i in range(len(responses) - 1):
q = responses[i]
node_id, topic = tasks_topic_data[i]
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
# Allow time for subscribing before continuing
# await asyncio.sleep(0.01)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
next_msg_id_func = message_id_generator(0)
for msg in messages:
topics = msg["topics"]
data = msg["data"]
node_id = msg["node_id"]
# Get actual id for sender node (not the id from the test obj)
actual_node_id = str(node_map[node_id].get_id())
# Create correctly formatted message
msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func())
# Publish message
tasks_publish.append(asyncio.ensure_future(gossipsub_map[node_id].publish(\
actual_node_id, msg_talk.SerializeToString())))
# For each topic in topics, add topic, msg_talk tuple to ordered test list
# TODO: Update message sender to be correct message sender before
# adding msg_talk to this list
for topic in topics:
topics_in_msgs_ordered.append((topic, msg_talk))
# Allow time for publishing before continuing
# await asyncio.sleep(0.4)
tasks_publish.append(asyncio.sleep(2))
await asyncio.gather(*tasks_publish)
# Step 4) Check that all messages were received correctly.
# TODO: Check message sender too
for i in range(len(topics_in_msgs_ordered)):
topic, actual_msg = topics_in_msgs_ordered[i]
# Look at each node in each topic
for node_id in topic_map[topic]:
# Get message from subscription queue
msg_on_node = await queues_map[node_id][topic].get()
assert actual_msg.publish[0].SerializeToString() == msg_on_node.SerializeToString()
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.asyncio
async def test_simple_two_nodes_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_two_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"],
"B": ["C"]
},
"topic_map": {
"topic1": ["B", "C"],
"topic2": ["B", "C"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
},
{
"topics": ["topic2"],
"data": "Alex is tall",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_two_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
},
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_one_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
"space": ["2", "3", "4", "5", "6", "7"],
"onions": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "1"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
"space": ["1", "2", "3", "4", "5", "6", "7"],
"onions": ["1", "2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "4"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "7"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3"],
"school": ["1", "2", "3"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3", "4"],
"2": ["1", "3", "4"],
"3": ["1", "2", "4"],
"4": ["1", "2", "3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4"],
"school": ["1", "2", "3", "4"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar2",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar3",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic3",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2"],
"2": ["3"],
"3": ["4"],
"4": ["5"],
"5": ["1"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5"],
"school": ["1", "2", "3", "4", "5"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar2",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar3",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic3",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)

129
tests/pubsub/test_mcache.py Normal file
View File

@ -0,0 +1,129 @@
import pytest
from libp2p.pubsub.mcache import MessageCache
class Msg:
def __init__(self, topicIDs, seqno, from_id):
self.topicIDs = topicIDs
self.seqno = seqno,
self.from_id = from_id
@pytest.mark.asyncio
async def test_mcache():
# Ported from:
# https://github.com/libp2p/go-libp2p-pubsub
# /blob/51b7501433411b5096cac2b4994a36a68515fc03/mcache_test.go
mcache = MessageCache(3, 5)
msgs = []
for i in range(60):
msgs.append(Msg(["test"], i, "test"))
for i in range(10):
mcache.put(msgs[i])
for i in range(10):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
get_msg = mcache.get(mid)
# successful read
assert get_msg == msg
gids = mcache.window('test')
assert len(gids) == 10
for i in range(10):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
assert mid == gids[i]
mcache.shift()
for i in range(10, 20):
mcache.put(msgs[i])
for i in range(20):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
get_msg = mcache.get(mid)
assert get_msg == msg
gids = mcache.window('test')
assert len(gids) == 20
for i in range(10):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
assert mid == gids[10 + i]
for i in range(10, 20):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
assert mid == gids[i - 10]
mcache.shift()
for i in range(20, 30):
mcache.put(msgs[i])
mcache.shift()
for i in range(30, 40):
mcache.put(msgs[i])
mcache.shift()
for i in range(40, 50):
mcache.put(msgs[i])
mcache.shift()
for i in range(50, 60):
mcache.put(msgs[i])
assert len(mcache.msgs) == 50
for i in range(10):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
get_msg = mcache.get(mid)
# Should be evicted from cache
assert not get_msg
for i in range(10, 60):
msg = msgs[i]
mid = (msg.seqno, msg.from_id)
get_msg = mcache.get(mid)
assert get_msg == msg
gids = mcache.window('test')
assert len(gids) == 30
for i in range(10):
msg = msgs[50 + i]
mid = (msg.seqno, msg.from_id)
assert mid == gids[i]
for i in range(10, 20):
msg = msgs[30 + i]
mid = (msg.seqno, msg.from_id)
assert mid == gids[i]
for i in range(20, 30):
msg = msgs[10 + i]
mid = (msg.seqno, msg.from_id)
assert mid == gids[i]

View File

@ -1,6 +1,13 @@
import asyncio
import multiaddr
import uuid import uuid
import random
import struct import struct
from libp2p import new_node
from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pb import rpc_pb2
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.gossipsub import GossipSub
def message_id_generator(start_val): def message_id_generator(start_val):
@ -42,3 +49,76 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
packet.publish.extend([message]) packet.publish.extend([message])
return packet return packet
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
async def create_libp2p_hosts(num_hosts):
"""
Create libp2p hosts
:param num_hosts: number of hosts to create
"""
hosts = []
tasks_create = []
for i in range(0, num_hosts):
# Create node
tasks_create.append(asyncio.ensure_future(new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])))
hosts = await asyncio.gather(*tasks_create)
tasks_listen = []
for node in hosts:
# Start listener
tasks_listen.append(asyncio.ensure_future(node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))))
await asyncio.gather(*tasks_listen)
return hosts
def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, degree, degree_low, \
degree_high, time_to_live, gossip_window, gossip_history, heartbeat_interval):
pubsubs = []
gossipsubs = []
for node in libp2p_hosts:
gossipsub = GossipSub(supported_protocols, degree,
degree_low, degree_high, time_to_live,
gossip_window, gossip_history,
heartbeat_interval)
pubsub = Pubsub(node, gossipsub, "a")
pubsubs.append(pubsub)
gossipsubs.append(gossipsub)
return pubsubs, gossipsubs
async def sparse_connect(hosts):
await connect_some(hosts, 3)
async def dense_connect(hosts):
await connect_some(hosts, 10)
async def connect_some(hosts, degree):
for i, host in enumerate(hosts):
for j, host2 in enumerate(hosts):
if i != j and i < j:
await connect(host, host2)
# TODO: USE THE CODE BELOW
# for i, host in enumerate(hosts):
# j = 0
# while j < degree:
# n = random.randint(0, len(hosts) - 1)
# if n == i:
# j -= 1
# continue
# neighbor = hosts[n]
# await connect(host, neighbor)
# j += 1