Merge pull request #187 from mhchia/feature/pubsub-publish

Add `Pubsub.publish`
This commit is contained in:
Kevin Mai-Husan Chia 2019-07-29 12:29:34 +08:00 committed by GitHub
commit f0046fa3e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1003 additions and 1348 deletions

View File

@ -195,6 +195,15 @@ class Swarm(INetwork):
def add_router(self, router): def add_router(self, router):
self.router = router self.router = router
# TODO: `tear_down`
async def tear_down(self) -> None:
# pylint: disable=line-too-long
# Reference: https://github.com/libp2p/go-libp2p-swarm/blob/8be680aef8dea0a4497283f2f98470c2aeae6b65/swarm.go#L118 # noqa: E501
pass
# TODO: `disconnect`?
def create_generic_protocol_handler(swarm): def create_generic_protocol_handler(swarm):
""" """
Create a generic protocol handler from the given swarm. We use swarm Create a generic protocol handler from the given swarm. We use swarm

View File

@ -16,6 +16,9 @@ class ID:
def __init__(self, id_str): def __init__(self, id_str):
self._id_str = id_str self._id_str = id_str
def to_bytes(self) -> bytes:
return self._id_str
def get_raw_id(self): def get_raw_id(self):
return self._id_str return self._id_str

View File

@ -1,5 +1,10 @@
from typing import (
Iterable,
)
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
id_b58_decode,
) )
from .pb import rpc_pb2 from .pb import rpc_pb2
@ -46,7 +51,7 @@ class FloodSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
async def publish(self, sender_peer_id: ID, rpc_message: rpc_pb2.Message) -> None: async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens This is where the "flooding" part of floodsub happens
@ -57,38 +62,23 @@ class FloodSub(IPubsubRouter):
so that seen messages are not further forwarded. so that seen messages are not further forwarded.
It also never forwards a message back to the source It also never forwards a message back to the source
or the peer that forwarded the message. or the peer that forwarded the message.
:param sender_peer_id: peer_id of message sender :param msg_forwarder: peer ID of the peer who forwards the message to us
:param rpc_message: pubsub message in RPC string format :param pubsub_msg: pubsub message in protobuf.
""" """
packet = rpc_pb2.RPC()
packet.ParseFromString(rpc_message)
msg_sender = str(sender_peer_id)
# Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message
for message in packet.publish:
decoded_from_id = message.from_id.decode('utf-8')
if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()):
id_in_seen_msgs = (message.seqno, message.from_id)
if id_in_seen_msgs not in self.pubsub.seen_messages: peers_gen = self._get_peers_to_send(
self.pubsub.seen_messages[id_in_seen_msgs] = 1 pubsub_msg.topicIDs,
msg_forwarder=msg_forwarder,
await self.pubsub.handle_talk(message) origin=ID(pubsub_msg.from_id),
)
# Deliver to self and peers rpc_msg = rpc_pb2.RPC(
for topic in message.topicIDs: publish=[pubsub_msg],
if topic in self.pubsub.peer_topics: )
for peer_id_in_topic in self.pubsub.peer_topics[topic]: for peer_id in peers_gen:
# Forward to all known peers in the topic that are not the stream = self.pubsub.peers[str(peer_id)]
# message sender and are not the message origin # FIXME: We should add a `WriteMsg` similar to write delimited messages.
if peer_id_in_topic not in (msg_sender, decoded_from_id): # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
stream = self.pubsub.peers[peer_id_in_topic] await stream.write(rpc_msg.SerializeToString())
# Create new packet with just publish message
new_packet = rpc_pb2.RPC()
new_packet.publish.extend([message])
# Publish the packet
await stream.write(new_packet.SerializeToString())
async def join(self, topic): async def join(self, topic):
""" """
@ -104,3 +94,26 @@ class FloodSub(IPubsubRouter):
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.
:param topic: topic to leave :param topic: topic to leave
""" """
def _get_peers_to_send(
self,
topic_ids: Iterable[str],
msg_forwarder: ID,
origin: ID) -> Iterable[ID]:
"""
Get the eligible peers to send the data to.
:param msg_forwarder: peer ID of the peer who forwards the message to us.
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
for topic in topic_ids:
if topic not in self.pubsub.peer_topics:
continue
for peer_id_str in self.pubsub.peer_topics[topic]:
peer_id = id_b58_decode(peer_id_str)
if peer_id in (msg_forwarder, origin):
continue
# FIXME: Should change `self.pubsub.peers` to Dict[PeerID, ...]
if str(peer_id) not in self.pubsub.peers:
continue
yield peer_id

View File

@ -1,10 +1,22 @@
import random
import asyncio import asyncio
import random
from typing import (
Iterable,
List,
MutableSet,
Sequence,
)
from ast import literal_eval from ast import literal_eval
from libp2p.peer.id import (
ID,
id_b58_decode,
)
from .mcache import MessageCache
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
from .mcache import MessageCache
class GossipSub(IPubsubRouter): class GossipSub(IPubsubRouter):
@ -20,8 +32,8 @@ class GossipSub(IPubsubRouter):
# Store target degree, upper degree bound, and lower degree bound # Store target degree, upper degree bound, and lower degree bound
self.degree = degree self.degree = degree
self.degree_high = degree_high
self.degree_low = degree_low self.degree_low = degree_low
self.degree_high = degree_high
# Store time to live (for topics in fanout) # Store time to live (for topics in fanout)
self.time_to_live = time_to_live self.time_to_live = time_to_live
@ -91,6 +103,7 @@ class GossipSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
control_message = rpc.control control_message = rpc.control
sender_peer_id = str(sender_peer_id)
# Relay each rpc control to the appropriate handler # Relay each rpc control to the appropriate handler
if control_message.ihave: if control_message.ihave:
@ -106,70 +119,77 @@ class GossipSub(IPubsubRouter):
for prune in control_message.prune: for prune in control_message.prune:
await self.handle_prune(prune, sender_peer_id) await self.handle_prune(prune, sender_peer_id)
async def publish(self, sender_peer_id, rpc_message): async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
""" """
self.mcache.put(pubsub_msg)
packet = rpc_pb2.RPC() peers_gen = self._get_peers_to_send(
packet.ParseFromString(rpc_message) pubsub_msg.topicIDs,
msg_sender = str(sender_peer_id) msg_forwarder=msg_forwarder,
origin=ID(pubsub_msg.from_id),
)
rpc_msg = rpc_pb2.RPC(
publish=[pubsub_msg],
)
for peer_id in peers_gen:
stream = self.pubsub.peers[str(peer_id)]
# FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
await stream.write(rpc_msg.SerializeToString())
# Deliver to self if self was origin def _get_peers_to_send(
# Note: handle_talk checks if self is subscribed to topics in message self,
for message in packet.publish: topic_ids: Iterable[str],
# Add RPC message to cache msg_forwarder: ID,
self.mcache.put(message) origin: ID) -> Iterable[ID]:
"""
Get the eligible peers to send the data to.
:param msg_forwarder: the peer id of the peer who forwards the message to me.
:param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to.
"""
send_to: MutableSet[ID] = set()
for topic in topic_ids:
if topic not in self.pubsub.peer_topics:
continue
decoded_from_id = message.from_id.decode('utf-8') # floodsub peers
new_packet = rpc_pb2.RPC() for peer_id_str in self.pubsub.peer_topics[topic]:
new_packet.publish.extend([message]) # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go.
new_packet_serialized = new_packet.SerializeToString() # This will improve the efficiency when searching for a peer's protocol id.
if peer_id_str in self.peers_floodsub:
peer_id = id_b58_decode(peer_id_str)
send_to.add(peer_id)
# Deliver to self if needed # gossipsub peers
if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): # FIXME: Change `str` to `ID`
id_in_seen_msgs = (message.seqno, message.from_id) in_topic_gossipsub_peers: List[str] = None
# TODO: Do we need to check `topic in self.pubsub.my_topics`?
if topic in self.mesh:
in_topic_gossipsub_peers = self.mesh[topic]
else:
# TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not
# subscribed?
# I assume there could be short periods between heartbeats where topic may not
# be but we should check that this path gets hit appropriately
if id_in_seen_msgs not in self.pubsub.seen_messages: # pylint: disable=len-as-condition
self.pubsub.seen_messages[id_in_seen_msgs] = 1 if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
# If no peers in fanout, choose some peers from gossipsub peers in topic.
await self.pubsub.handle_talk(message) self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
topic,
# Deliver to peers self.degree,
for topic in message.topicIDs: [],
# If topic has floodsub peers, deliver to floodsub peers )
# TODO: This can be done more efficiently. Do it more efficiently. in_topic_gossipsub_peers = self.fanout[topic]
floodsub_peers_in_topic = [] for peer_id_str in in_topic_gossipsub_peers:
if topic in self.pubsub.peer_topics: send_to.add(id_b58_decode(peer_id_str))
for peer in self.pubsub.peer_topics[topic]: # Excludes `msg_forwarder` and `origin`
if str(peer) in self.peers_floodsub: yield from send_to.difference([msg_forwarder, origin])
floodsub_peers_in_topic.append(peer)
await self.deliver_messages_to_peers(floodsub_peers_in_topic, msg_sender,
decoded_from_id, new_packet_serialized)
# If you are subscribed to topic, send to mesh, otherwise send to fanout
if topic in self.pubsub.my_topics and topic in self.mesh:
await self.deliver_messages_to_peers(self.mesh[topic], msg_sender,
decoded_from_id, new_packet_serialized)
else:
# Send to fanout peers
if topic not in self.fanout:
# If no peers in fanout, choose some peers from gossipsub peers in topic
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
selected = \
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, [])
self.fanout[topic] = selected
# TODO: Is topic DEFINITELY supposed to be in fanout if we are not subscribed?
# I assume there could be short periods between heartbeats where topic may not
# be but we should check that this path gets hit appropriately
await self.deliver_messages_to_peers(self.fanout[topic], msg_sender,
decoded_from_id, new_packet_serialized)
async def join(self, topic): async def join(self, topic):
# Note: the comments here are the near-exact algorithm description from the spec # Note: the comments here are the near-exact algorithm description from the spec
@ -192,13 +212,11 @@ class GossipSub(IPubsubRouter):
# in the fanout for a topic (or the topic is not in the fanout). # in the fanout for a topic (or the topic is not in the fanout).
# Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. # Selects the remaining number of peers (D-x) from peers.gossipsub[topic].
if topic in self.pubsub.peer_topics: if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
if peer in self.peers_gossipsub] topic,
selected_peers = \ self.degree - fanout_size,
GossipSub.select_from_minus(self.degree - fanout_size, fanout_peers,
gossipsub_peers_in_topic, )
fanout_peers)
# Combine fanout peers with selected peers # Combine fanout peers with selected peers
fanout_peers += selected_peers fanout_peers += selected_peers
@ -272,14 +290,11 @@ class GossipSub(IPubsubRouter):
num_mesh_peers_in_topic = len(self.mesh[topic]) num_mesh_peers_in_topic = len(self.mesh[topic])
if num_mesh_peers_in_topic < self.degree_low: if num_mesh_peers_in_topic < self.degree_low:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic]
selected_peers = GossipSub.select_from_minus( selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - num_mesh_peers_in_topic, self.degree - num_mesh_peers_in_topic,
gossipsub_peers_in_topic, self.mesh[topic],
self.mesh[topic]
) )
fanout_peers_not_in_mesh = [ fanout_peers_not_in_mesh = [
@ -320,12 +335,11 @@ class GossipSub(IPubsubRouter):
# If |fanout[topic]| < D # If |fanout[topic]| < D
if num_fanout_peers_in_topic < self.degree: if num_fanout_peers_in_topic < self.degree:
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic]
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
if peer in self.peers_gossipsub] topic,
selected_peers = \ self.degree - num_fanout_peers_in_topic,
GossipSub.select_from_minus(self.degree - num_fanout_peers_in_topic, self.fanout[topic],
gossipsub_peers_in_topic, self.fanout[topic]) )
# Add the peers to fanout[topic] # Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers) self.fanout[topic].extend(selected_peers)
@ -337,12 +351,12 @@ class GossipSub(IPubsubRouter):
# TODO: Make more efficient, possibly using a generator? # TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in a topic and only add them if they are gossipsub peers too # Get all pubsub peers in a topic and only add them if they are gossipsub peers too
if topic in self.pubsub.peer_topics: if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D peers from peers.gossipsub[topic] # Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = \ peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) topic,
self.degree,
[],
)
for peer in peers_to_emit_ihave_to: for peer in peers_to_emit_ihave_to:
# TODO: this line is a monster, can hopefully be simplified # TODO: this line is a monster, can hopefully be simplified
@ -351,6 +365,7 @@ class GossipSub(IPubsubRouter):
msg_ids = [str(msg) for msg in msg_ids] msg_ids = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_ids, peer) await self.emit_ihave(topic, msg_ids, peer)
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
# Do the same for fanout, for all topics not already hit in mesh # Do the same for fanout, for all topics not already hit in mesh
for topic in self.fanout: for topic in self.fanout:
if topic not in self.mesh: if topic not in self.mesh:
@ -359,12 +374,12 @@ class GossipSub(IPubsubRouter):
# TODO: Make more efficient, possibly using a generator? # TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in topic and only add if they are gossipsub peers also # Get all pubsub peers in topic and only add if they are gossipsub peers also
if topic in self.pubsub.peer_topics: if topic in self.pubsub.peer_topics:
gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic]
if peer in self.peers_gossipsub]
# Select D peers from peers.gossipsub[topic] # Select D peers from peers.gossipsub[topic]
peers_to_emit_ihave_to = \ peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) topic,
self.degree,
[],
)
for peer in peers_to_emit_ihave_to: for peer in peers_to_emit_ihave_to:
if peer not in self.mesh[topic] and peer not in self.fanout[topic]: if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
@ -400,6 +415,22 @@ class GossipSub(IPubsubRouter):
return selection return selection
def _get_in_topic_gossipsub_peers_from_minus(
self,
topic: str,
num_to_select: int,
minus: Sequence[ID]) -> List[ID]:
gossipsub_peers_in_topic = [
peer_str
for peer_str in self.pubsub.peer_topics[topic]
if peer_str in self.peers_gossipsub
]
return self.select_from_minus(
num_to_select,
gossipsub_peers_in_topic,
list(minus),
)
# RPC handlers # RPC handlers
async def handle_ihave(self, ihave_msg, sender_peer_id): async def handle_ihave(self, ihave_msg, sender_peer_id):

View File

@ -45,7 +45,8 @@ class Pubsub:
outgoing_messages: asyncio.Queue() outgoing_messages: asyncio.Queue()
seen_messages: LRU seen_messages: LRU
my_topics: Dict[str, asyncio.Queue] my_topics: Dict[str, asyncio.Queue]
peer_topics: Dict[str, List[ID]] # FIXME: Should be changed to `Dict[str, List[ID]]`
peer_topics: Dict[str, List[str]]
# FIXME: Should be changed to `Dict[ID, INetStream]` # FIXME: Should be changed to `Dict[ID, INetStream]`
peers: Dict[str, INetStream] peers: Dict[str, INetStream]
# NOTE: Be sure it is increased atomically everytime. # NOTE: Be sure it is increased atomically everytime.
@ -127,26 +128,21 @@ class Pubsub:
messages from other nodes messages from other nodes
:param stream: stream to continously read from :param stream: stream to continously read from
""" """
peer_id = stream.mplex_conn.peer_id
# TODO check on types here
peer_id = str(stream.mplex_conn.peer_id)
while True: while True:
incoming = (await stream.read()) incoming = (await stream.read())
rpc_incoming = rpc_pb2.RPC() rpc_incoming = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming) rpc_incoming.ParseFromString(incoming)
should_publish = False
if rpc_incoming.publish: if rpc_incoming.publish:
# deal with RPC.publish # deal with RPC.publish
for message in rpc_incoming.publish: for msg in rpc_incoming.publish:
id_in_seen_msgs = (message.seqno, message.from_id) if not self._is_subscribed_to_msg(msg):
if id_in_seen_msgs not in self.seen_messages: continue
should_publish = True # TODO(mhchia): This will block this read_stream loop until all data are pushed.
self.seen_messages[id_in_seen_msgs] = 1 # Should investigate further if this is an issue.
await self.push_msg(msg_forwarder=peer_id, msg=msg)
await self.handle_talk(message)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
# deal with RPC.subscriptions # deal with RPC.subscriptions
@ -157,10 +153,6 @@ class Pubsub:
for message in rpc_incoming.subscriptions: for message in rpc_incoming.subscriptions:
self.handle_subscription(peer_id, message) self.handle_subscription(peer_id, message)
if should_publish:
# relay message to peers with router
await self.router.publish(peer_id, incoming)
if rpc_incoming.control: if rpc_incoming.control:
# Pass rpc to router so router could perform custom logic # Pass rpc to router so router could perform custom logic
await self.router.handle_rpc(rpc_incoming, peer_id) await self.router.handle_rpc(rpc_incoming, peer_id)
@ -227,6 +219,7 @@ class Pubsub:
:param origin_id: id of the peer who subscribe to the message :param origin_id: id of the peer who subscribe to the message
:param sub_message: RPC.SubOpts :param sub_message: RPC.SubOpts
""" """
origin_id = str(origin_id)
if sub_message.subscribe: if sub_message.subscribe:
if sub_message.topicid not in self.peer_topics: if sub_message.topicid not in self.peer_topics:
self.peer_topics[sub_message.topicid] = [origin_id] self.peer_topics[sub_message.topicid] = [origin_id]
@ -319,3 +312,64 @@ class Pubsub:
for _, stream in self.peers.items(): for _, stream in self.peers.items():
# Write message to stream # Write message to stream
await stream.write(rpc_msg) await stream.write(rpc_msg)
async def publish(self, topic_id: str, data: bytes) -> None:
"""
Publish data to a topic
:param topic_id: topic which we are going to publish the data to
:param data: data which we are publishing
"""
msg = rpc_pb2.Message(
data=data,
topicIDs=[topic_id],
# Origin is ourself.
from_id=self.host.get_id().to_bytes(),
seqno=self._next_seqno(),
)
# TODO: Sign with our signing key
await self.push_msg(self.host.get_id(), msg)
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
"""
Push a pubsub message to others.
:param msg_forwarder: the peer who forward us the message.
:param msg: the message we are going to push out.
"""
# TODO: - Check if the `source` is in the blacklist. If yes, reject.
# TODO: - Check if the `from` is in the blacklist. If yes, reject.
# TODO: - Check if signing is required and if so signature should be attached.
if self._is_msg_seen(msg):
return
# TODO: - Validate the message. If failed, reject it.
self._mark_msg_seen(msg)
await self.handle_talk(msg)
await self.router.publish(msg_forwarder, msg)
def _next_seqno(self) -> bytes:
"""
Make the next message sequence id.
"""
self.counter += 1
return self.counter.to_bytes(8, 'big')
def _is_msg_seen(self, msg: rpc_pb2.Message) -> bool:
msg_id = get_msg_id(msg)
return msg_id in self.seen_messages
def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None:
msg_id = get_msg_id(msg)
# FIXME: Mapping `msg_id` to `1` is quite awkward. Should investigate if there is a
# more appropriate way.
self.seen_messages[msg_id] = 1
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
if len(self.my_topics) == 0:
return False
return all([topic in self.my_topics for topic in msg.topicIDs])

View File

@ -42,11 +42,11 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def publish(self, sender_peer_id, rpc_message): async def publish(self, msg_forwarder, pubsub_msg):
""" """
Invoked to forward a new message that has been validated Invoked to forward a new message that has been validated
:param sender_peer_id: peer_id of message sender :param msg_forwarder: peer_id of message sender
:param rpc_message: message to forward :param pubsub_msg: pubsub message to forward
""" """
@abstractmethod @abstractmethod

0
tests/pubsub/__init__.py Normal file
View File

7
tests/pubsub/configs.py Normal file
View File

@ -0,0 +1,7 @@
import multiaddr
FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0"
GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0"
LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")

View File

@ -1,13 +1,18 @@
import asyncio import asyncio
import multiaddr
import uuid import uuid
from utils import message_id_generator, generate_RPC_packet import multiaddr
from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] from libp2p import new_node
from libp2p.host.host_interface import IHost
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub
from .configs import FLOODSUB_PROTOCOL_ID
from .utils import message_id_generator
SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
CRYPTO_TOPIC = "ethereum" CRYPTO_TOPIC = "ethereum"
# Message format: # Message format:
@ -17,14 +22,25 @@ CRYPTO_TOPIC = "ethereum"
# Ex. set,rob,5 # Ex. set,rob,5
# Determine message type by looking at first item before first comma # Determine message type by looking at first item before first comma
class DummyAccountNode():
class DummyAccountNode:
""" """
Node which has an internal balance mapping, meant to serve as Node which has an internal balance mapping, meant to serve as
a dummy crypto blockchain. There is no actual blockchain, just a simple a dummy crypto blockchain. There is no actual blockchain, just a simple
map indicating how much crypto each user in the mappings holds map indicating how much crypto each user in the mappings holds
""" """
libp2p_node: IHost
pubsub: Pubsub
floodsub: FloodSub
def __init__(self): def __init__(
self,
libp2p_node: IHost,
pubsub: Pubsub,
floodsub: FloodSub):
self.libp2p_node = libp2p_node
self.pubsub = pubsub
self.floodsub = floodsub
self.balances = {} self.balances = {}
self.next_msg_id_func = message_id_generator(0) self.next_msg_id_func = message_id_generator(0)
self.node_id = str(uuid.uuid1()) self.node_id = str(uuid.uuid1())
@ -38,16 +54,21 @@ class DummyAccountNode():
We use create as this serves as a factory function and allows us We use create as this serves as a factory function and allows us
to use async await, unlike the init function to use async await, unlike the init function
""" """
self = DummyAccountNode()
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
self.libp2p_node = libp2p_node floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
pubsub = Pubsub(
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) libp2p_node,
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") floodsub,
return self "a",
)
return cls(
libp2p_node=libp2p_node,
pubsub=pubsub,
floodsub=floodsub,
)
async def handle_incoming_msgs(self): async def handle_incoming_msgs(self):
""" """
@ -78,10 +99,8 @@ class DummyAccountNode():
:param dest_user: user to send crypto to :param dest_user: user to send crypto to
:param amount: amount of crypto to send :param amount: amount of crypto to send
""" """
my_id = str(self.libp2p_node.get_id())
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode())
await self.floodsub.publish(my_id, packet.SerializeToString())
async def publish_set_crypto(self, user, amount): async def publish_set_crypto(self, user, amount):
""" """
@ -89,18 +108,15 @@ class DummyAccountNode():
:param user: user to set crypto for :param user: user to set crypto for
:param amount: amount of crypto :param amount: amount of crypto
""" """
my_id = str(self.libp2p_node.get_id())
msg_contents = "set," + user + "," + str(amount) msg_contents = "set," + user + "," + str(amount)
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode())
await self.floodsub.publish(my_id, packet.SerializeToString())
def handle_send_crypto(self, source_user, dest_user, amount): def handle_send_crypto(self, source_user, dest_user, amount):
""" """
Handle incoming send_crypto message Handle incoming send_crypto message
:param source_user: user to send crypto from :param source_user: user to send crypto from
:param dest_user: user to send crypto to :param dest_user: user to send crypto to
:param amount: amount of crypto to send :param amount: amount of crypto to send
""" """
if source_user in self.balances: if source_user in self.balances:
self.balances[source_user] -= amount self.balances[source_user] -= amount
@ -130,4 +146,3 @@ class DummyAccountNode():
return self.balances[user] return self.balances[user]
else: else:
return -1 return -1

View File

@ -0,0 +1,448 @@
import asyncio
import pytest
from libp2p import new_node
from libp2p.peer.id import ID
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import (
cleanup,
connect,
)
from .configs import (
FLOODSUB_PROTOCOL_ID,
LISTEN_MADDR,
)
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
FLOODSUB_PROTOCOL_TEST_CASES = [
{
"name": "simple_two_nodes",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": b"foo",
"node_id": "A"
}
]
},
{
"name": "three_nodes_two_topics",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"A": ["B"],
"B": ["C"],
},
"topic_map": {
"topic1": ["B", "C"],
"topic2": ["B", "C"],
},
"messages": [
{
"topics": ["topic1"],
"data": b"foo",
"node_id": "A",
},
{
"topics": ["topic2"],
"data": b"Alex is tall",
"node_id": "A",
}
]
},
{
"name": "two_nodes_one_topic_single_subscriber_is_sender",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"A": ["B"],
},
"topic_map": {
"topic1": ["B"],
},
"messages": [
{
"topics": ["topic1"],
"data": b"Alex is tall",
"node_id": "B",
}
]
},
{
"name": "two_nodes_one_topic_two_msgs",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"A": ["B"],
},
"topic_map": {
"topic1": ["B"],
},
"messages": [
{
"topics": ["topic1"],
"data": b"Alex is tall",
"node_id": "B",
},
{
"topics": ["topic1"],
"data": b"foo",
"node_id": "A",
}
]
},
{
"name": "seven_nodes_tree_one_topics",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"],
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
},
"messages": [
{
"topics": ["astrophysics"],
"data": b"e=mc^2",
"node_id": "1",
}
]
},
{
"name": "seven_nodes_tree_three_topics",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"],
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
"space": ["2", "3", "4", "5", "6", "7"],
"onions": ["2", "3", "4", "5", "6", "7"],
},
"messages": [
{
"topics": ["astrophysics"],
"data": b"e=mc^2",
"node_id": "1",
},
{
"topics": ["space"],
"data": b"foobar",
"node_id": "1",
},
{
"topics": ["onions"],
"data": b"I am allergic",
"node_id": "1",
}
]
},
{
"name": "seven_nodes_tree_three_topics_diff_origin",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"],
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
"space": ["1", "2", "3", "4", "5", "6", "7"],
"onions": ["1", "2", "3", "4", "5", "6", "7"],
},
"messages": [
{
"topics": ["astrophysics"],
"data": b"e=mc^2",
"node_id": "1",
},
{
"topics": ["space"],
"data": b"foobar",
"node_id": "4",
},
{
"topics": ["onions"],
"data": b"I am allergic",
"node_id": "7",
}
]
},
{
"name": "three_nodes_clique_two_topic_diff_origin",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2", "3"],
"2": ["3"],
},
"topic_map": {
"astrophysics": ["1", "2", "3"],
"school": ["1", "2", "3"],
},
"messages": [
{
"topics": ["astrophysics"],
"data": b"e=mc^2",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic",
"node_id": "1",
}
]
},
{
"name": "four_nodes_clique_two_topic_diff_origin_many_msgs",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2", "3", "4"],
"2": ["1", "3", "4"],
"3": ["1", "2", "4"],
"4": ["1", "2", "3"],
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4"],
"school": ["1", "2", "3", "4"],
},
"messages": [
{
"topics": ["astrophysics"],
"data": b"e=mc^2",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar2",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic2",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar3",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic3",
"node_id": "1",
}
]
},
{
"name": "five_nodes_ring_two_topic_diff_origin_many_msgs",
"supported_protocols": SUPPORTED_PROTOCOLS,
"adj_list": {
"1": ["2"],
"2": ["3"],
"3": ["4"],
"4": ["5"],
"5": ["1"],
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5"],
"school": ["1", "2", "3", "4", "5"],
},
"messages": [
{
"topics": ["astrophysics"],
"data": b"e=mc^2",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar2",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic2",
"node_id": "1",
},
{
"topics": ["school"],
"data": b"foobar3",
"node_id": "2",
},
{
"topics": ["astrophysics"],
"data": b"I am allergic3",
"node_id": "1",
}
]
}
]
# pylint: disable=invalid-name
floodsub_protocol_pytest_params = [
pytest.param(test_case, id=test_case["name"])
for test_case in FLOODSUB_PROTOCOL_TEST_CASES
]
# pylint: disable=too-many-locals
async def perform_test_from_obj(obj, router_factory):
"""
Perform pubsub tests from a test obj.
test obj are composed as follows:
{
"supported_protocols": ["supported/protocol/1.0.0",...],
"adj_list": {
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
...
},
"topic_map": {
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
},
"messages": [
{
"topics": ["topic1_for_message", "topic2_for_message", ...],
"data": b"some contents of the message (newlines are not supported)",
"node_id": "message sender node id"
},
...
]
}
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work)
"""
# Step 1) Create graph
adj_list = obj["adj_list"]
node_map = {}
pubsub_map = {}
async def add_node(node_id: str) -> None:
node = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node.get_network().listen(LISTEN_MADDR)
node_map[node_id] = node
pubsub_router = router_factory(protocols=obj["supported_protocols"])
pubsub = Pubsub(node, pubsub_router, ID(node_id.encode()))
pubsub_map[node_id] = pubsub
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
await add_node(start_node_id)
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
await add_node(neighbor_id)
tasks_connect.append(
connect(node_map[start_node_id], node_map[neighbor_id])
)
# Connect nodes and wait at least for 2 seconds
await asyncio.gather(*tasks_connect, asyncio.sleep(2))
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic, node_ids in topic_map.items():
for node_id in node_ids:
tasks_topic.append(pubsub_map[node_id].subscribe(topic))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
for i in range(len(responses) - 1):
node_id, topic = tasks_topic_data[i]
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = responses[i]
# Allow time for subscribing before continuing
await asyncio.sleep(0.01)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
for msg in messages:
topics = msg["topics"]
data = msg["data"]
node_id = msg["node_id"]
# Publish message
# TODO: Should be single RPC package with several topics
for topic in topics:
tasks_publish.append(
pubsub_map[node_id].publish(
topic,
data,
)
)
# For each topic in topics, add (topic, node_id, data) tuple to ordered test list
for topic in topics:
topics_in_msgs_ordered.append((topic, node_id, data))
# Allow time for publishing before continuing
await asyncio.gather(*tasks_publish, asyncio.sleep(2))
# Step 4) Check that all messages were received correctly.
for topic, origin_node_id, data in topics_in_msgs_ordered:
# Look at each node in each topic
for node_id in topic_map[topic]:
# Get message from subscription queue
msg = await queues_map[node_id][topic].get()
assert data == msg.data
# Check the message origin
assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id
# Success, terminate pending tasks.
await cleanup()

View File

@ -1,35 +1,31 @@
import asyncio import asyncio
import multiaddr from threading import Thread
import pytest import pytest
from threading import Thread from tests.utils import (
from tests.utils import cleanup cleanup,
from libp2p import new_node connect,
from libp2p.peer.peerinfo import info_from_p2p_addr )
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from .dummy_account_node import DummyAccountNode
from dummy_account_node import DummyAccountNode
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
async def connect(node1, node2):
# node1 connects to node2
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
def create_setup_in_new_thread_func(dummy_node): def create_setup_in_new_thread_func(dummy_node):
def setup_in_new_thread(): def setup_in_new_thread():
asyncio.ensure_future(dummy_node.setup_crypto_networking()) asyncio.ensure_future(dummy_node.setup_crypto_networking())
return setup_in_new_thread return setup_in_new_thread
async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
""" """
Helper function to allow for easy construction of custom tests for dummy account nodes Helper function to allow for easy construction of custom tests for dummy account nodes
in various network topologies in various network topologies
:param num_nodes: number of nodes in the test :param num_nodes: number of nodes in the test
:param adjacency_map: adjacency map defining each node and its list of neighbors :param adjacency_map: adjacency map defining each node and its list of neighbors
:param action_func: function to execute that includes actions by the nodes, :param action_func: function to execute that includes actions by the nodes,
such as send crypto and set crypto such as send crypto and set crypto
:param assertion_func: assertions for testing the results of the actions are correct :param assertion_func: assertions for testing the results of the actions are correct
""" """
@ -73,6 +69,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes(): async def test_simple_two_nodes():
num_nodes = 2 num_nodes = 2
@ -86,6 +83,7 @@ async def test_simple_two_nodes():
await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_three_nodes_line_topography(): async def test_simple_three_nodes_line_topography():
num_nodes = 3 num_nodes = 3
@ -99,6 +97,7 @@ async def test_simple_three_nodes_line_topography():
await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_three_nodes_triangle_topography(): async def test_simple_three_nodes_triangle_topography():
num_nodes = 3 num_nodes = 3
@ -112,6 +111,7 @@ async def test_simple_three_nodes_triangle_topography():
await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_seven_nodes_tree_topography(): async def test_simple_seven_nodes_tree_topography():
num_nodes = 7 num_nodes = 7
@ -125,6 +125,7 @@ async def test_simple_seven_nodes_tree_topography():
await perform_test(num_nodes, adj_map, action_func, assertion_func) await perform_test(num_nodes, adj_map, action_func, assertion_func)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_set_then_send_from_root_seven_nodes_tree_topography(): async def test_set_then_send_from_root_seven_nodes_tree_topography():
num_nodes = 7 num_nodes = 7

View File

@ -2,615 +2,132 @@ import asyncio
import multiaddr import multiaddr
import pytest import pytest
from tests.utils import cleanup
from libp2p import new_node from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.id import ID
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from utils import message_id_generator, generate_RPC_packet from libp2p.pubsub.pubsub import Pubsub
from tests.utils import (
cleanup,
connect,
)
from .configs import (
FLOODSUB_PROTOCOL_ID,
LISTEN_MADDR,
)
from .floodsub_integration_test_settings import (
perform_test_from_obj,
floodsub_protocol_pytest_params,
)
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes(): async def test_simple_two_nodes():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_a = await new_node(transport_opt=[str(LISTEN_MADDR)])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_b = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) await node_a.get_network().listen(LISTEN_MADDR)
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) await node_b.get_network().listen(LISTEN_MADDR)
supported_protocols = ["/floodsub/1.0.0"] supported_protocols = [FLOODSUB_PROTOCOL_ID]
topic = "my_topic"
data = b"some data"
floodsub_a = FloodSub(supported_protocols) floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, "a") pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32))
floodsub_b = FloodSub(supported_protocols) floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, "b") pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32))
await connect(node_a, node_b) await connect(node_a, node_b)
await asyncio.sleep(0.25)
qb = await pubsub_b.subscribe("my_topic")
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
node_a_id = str(node_a.get_id()) sub_b = await pubsub_b.subscribe(topic)
# Sleep to let a know of b's subscription
next_msg_id_func = message_id_generator(0)
msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", next_msg_id_func())
await floodsub_a.publish(node_a_id, msg.SerializeToString())
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
res_b = await qb.get() await pubsub_a.publish(topic, data)
res_b = await sub_b.get()
# Check that the msg received by node_b is the same # Check that the msg received by node_b is the same
# as the message sent by node_a # as the message sent by node_a
assert res_b.SerializeToString() == msg.publish[0].SerializeToString() assert ID(res_b.from_id) == node_a.get_id()
assert res_b.data == data
assert res_b.topicIDs == [topic]
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_lru_cache_two_nodes(): async def test_lru_cache_two_nodes(monkeypatch):
# two nodes with cache_size of 4 # two nodes with cache_size of 4
# node_a send the following messages to node_b # `node_a` send the following messages to node_b
# [1, 1, 2, 1, 3, 1, 4, 1, 5, 1] message_indices = [1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
# node_b should only receive the following # `node_b` should only receive the following
# [1, 2, 3, 4, 5, 1] expected_received_indices = [1, 2, 3, 4, 5, 1]
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) node_a = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) node_b = await new_node(transport_opt=[str(LISTEN_MADDR)])
supported_protocols = ["/floodsub/1.0.0"] await node_a.get_network().listen(LISTEN_MADDR)
await node_b.get_network().listen(LISTEN_MADDR)
# initialize PubSub with a cache_size of 4 supported_protocols = SUPPORTED_PROTOCOLS
topic = "my_topic"
# Mock `get_msg_id` to make us easier to manipulate `msg_id` by `data`.
def get_msg_id(msg):
# Originally it is `(msg.seqno, msg.from_id)`
return (msg.data, msg.from_id)
import libp2p.pubsub.pubsub
monkeypatch.setattr(libp2p.pubsub.pubsub, "get_msg_id", get_msg_id)
# Initialize Pubsub with a cache_size of 4
cache_size = 4
floodsub_a = FloodSub(supported_protocols) floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, "a", 4) pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32), cache_size)
floodsub_b = FloodSub(supported_protocols) floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, "b", 4) pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32), cache_size)
await connect(node_a, node_b) await connect(node_a, node_b)
await asyncio.sleep(0.25)
qb = await pubsub_b.subscribe("my_topic")
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
node_a_id = str(node_a.get_id()) sub_b = await pubsub_b.subscribe(topic)
# initialize message_id_generator
# store first message
next_msg_id_func = message_id_generator(0)
first_message = generate_RPC_packet(node_a_id, ["my_topic"], "some data 1", next_msg_id_func())
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
await asyncio.sleep(0.25)
print (first_message)
messages = [first_message]
# for the next 5 messages
for i in range(2, 6):
# write first message
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
await asyncio.sleep(0.25)
# generate and write next message
msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data " + str(i), next_msg_id_func())
messages.append(msg)
await floodsub_a.publish(node_a_id, msg.SerializeToString())
await asyncio.sleep(0.25)
# write first message again
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
# check the first five messages in queue def _make_testing_data(i: int) -> bytes:
# should only see 1 first_message num_int_bytes = 4
for i in range(5): if i >= 2**(num_int_bytes * 8):
# Check that the msg received by node_b is the same raise ValueError("integer is too large to be serialized")
# as the message sent by node_a return b"data" + i.to_bytes(num_int_bytes, "big")
res_b = await qb.get()
assert res_b.SerializeToString() == messages[i].publish[0].SerializeToString() for index in message_indices:
await pubsub_a.publish(topic, _make_testing_data(index))
await asyncio.sleep(0.25)
for index in expected_received_indices:
res_b = await sub_b.get()
assert res_b.data == _make_testing_data(index)
assert sub_b.empty()
# the 6th message should be first_message
res_b = await qb.get()
assert res_b.SerializeToString() == first_message.publish[0].SerializeToString()
assert qb.empty()
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()
async def perform_test_from_obj(obj): @pytest.mark.parametrize(
""" "test_case_obj",
Perform a floodsub test from a test obj. floodsub_protocol_pytest_params,
test obj are composed as follows: )
{
"supported_protocols": ["supported/protocol/1.0.0",...],
"adj_list": {
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
...
},
"topic_map": {
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
},
"messages": [
{
"topics": ["topic1_for_message", "topic2_for_message", ...],
"data": "some contents of the message (newlines are not supported)",
"node_id": "message sender node id"
},
...
]
}
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work)
"""
# Step 1) Create graph
adj_list = obj["adj_list"]
node_map = {}
floodsub_map = {}
pubsub_map = {}
supported_protocols = obj["supported_protocols"]
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[start_node_id] = node
floodsub = FloodSub(supported_protocols)
floodsub_map[start_node_id] = floodsub
pubsub = Pubsub(node, floodsub, start_node_id)
pubsub_map[start_node_id] = pubsub
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[neighbor_id] = neighbor_node
floodsub = FloodSub(supported_protocols)
floodsub_map[neighbor_id] = floodsub
pubsub = Pubsub(neighbor_node, floodsub, neighbor_id)
pubsub_map[neighbor_id] = pubsub
# Connect node and neighbor
# await connect(node_map[start_node_id], node_map[neighbor_id])
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
tasks_connect.append(asyncio.sleep(2))
await asyncio.gather(*tasks_connect)
# Allow time for graph creation before continuing
# await asyncio.sleep(0.25)
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic in topic_map:
for node_id in topic_map[topic]:
"""
# Subscribe node to topic
q = await pubsub_map[node_id].subscribe(topic)
# Create topic-queue map for node_id if one does not yet exist
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
"""
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
for i in range(len(responses) - 1):
q = responses[i]
node_id, topic = tasks_topic_data[i]
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
# Allow time for subscribing before continuing
# await asyncio.sleep(0.01)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
next_msg_id_func = message_id_generator(0)
for msg in messages:
topics = msg["topics"]
data = msg["data"]
node_id = msg["node_id"]
# Get actual id for sender node (not the id from the test obj)
actual_node_id = str(node_map[node_id].get_id())
# Create correctly formatted message
msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func())
# Publish message
# await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\
actual_node_id, msg_talk.SerializeToString())))
# For each topic in topics, add topic, msg_talk tuple to ordered test list
# TODO: Update message sender to be correct message sender before
# adding msg_talk to this list
for topic in topics:
topics_in_msgs_ordered.append((topic, msg_talk))
# Allow time for publishing before continuing
# await asyncio.sleep(0.4)
tasks_publish.append(asyncio.sleep(2))
await asyncio.gather(*tasks_publish)
# Step 4) Check that all messages were received correctly.
# TODO: Check message sender too
for i in range(len(topics_in_msgs_ordered)):
topic, actual_msg = topics_in_msgs_ordered[i]
# Look at each node in each topic
for node_id in topic_map[topic]:
# Get message from subscription queue
msg_on_node_str = await queues_map[node_id][topic].get()
assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString()
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes_test_obj(): async def test_gossipsub_run_with_floodsub_tests(test_case_obj):
test_obj = { await perform_test_from_obj(
"supported_protocols": ["/floodsub/1.0.0"], test_case_obj,
"adj_list": { FloodSub,
"A": ["B"] )
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_two_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"],
"B": ["C"]
},
"topic_map": {
"topic1": ["B", "C"],
"topic2": ["B", "C"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
},
{
"topics": ["topic2"],
"data": "Alex is tall",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_two_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
},
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_one_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
"space": ["2", "3", "4", "5", "6", "7"],
"onions": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "1"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
"space": ["1", "2", "3", "4", "5", "6", "7"],
"onions": ["1", "2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "4"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "7"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3"],
"school": ["1", "2", "3"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3", "4"],
"2": ["1", "3", "4"],
"3": ["1", "2", "4"],
"4": ["1", "2", "3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4"],
"school": ["1", "2", "3", "4"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar2",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar3",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic3",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2"],
"2": ["3"],
"3": ["4"],
"4": ["5"],
"5": ["1"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5"],
"school": ["1", "2", "3", "4", "5"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar2",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar3",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic3",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)

View File

@ -1,13 +1,23 @@
import asyncio import asyncio
import pytest
import random import random
from utils import message_id_generator, generate_RPC_packet, \ import pytest
create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \
connect, one_to_all_connect
from tests.utils import cleanup
SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] from tests.utils import (
cleanup,
connect,
)
from .configs import GOSSIPSUB_PROTOCOL_ID
from .utils import (
create_libp2p_hosts,
create_pubsub_and_gossipsub_instances,
dense_connect,
one_to_all_connect,
)
SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID]
@pytest.mark.asyncio @pytest.mark.asyncio
@ -41,13 +51,8 @@ async def test_join():
# Central node publish to the topic so that this topic # Central node publish to the topic so that this topic
# is added to central node's fanout # is added to central node's fanout
next_msg_id_func = message_id_generator(0)
msg_content = ""
host_id = str(libp2p_hosts[central_node_index].get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, [topic], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gossipsubs[central_node_index].publish(host_id, packet.SerializeToString()) await pubsubs[central_node_index].publish(topic, b"data")
# Check that the gossipsub of central node has fanout for the topic # Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout assert topic in gossipsubs[central_node_index].fanout
@ -86,6 +91,8 @@ async def test_leave():
gossipsub = gossipsubs[0] gossipsub = gossipsubs[0]
topic = "test_leave" topic = "test_leave"
assert topic not in gossipsub.mesh
await gossipsub.join(topic) await gossipsub.join(topic)
assert topic in gossipsub.mesh assert topic in gossipsub.mesh
@ -205,14 +212,12 @@ async def test_handle_prune():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_dense(): async def test_dense():
# Create libp2p hosts # Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 10 num_hosts = 10
num_msgs = 5 num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts) libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances # Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \ SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5) 10, 9, 11, 30, 3, 5, 0.5)
@ -231,41 +236,35 @@ async def test_dense():
await asyncio.sleep(2) await asyncio.sleep(2)
for i in range(num_msgs): for i in range(num_msgs):
msg_content = "foo " + str(i) msg_content = b"foo " + i.to_bytes(1, 'big')
# randomly pick a message origin # randomly pick a message origin
origin_idx = random.randint(0, num_hosts - 1) origin_idx = random.randint(0, num_hosts - 1)
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) await pubsubs[origin_idx].publish("foobar", msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
for queue in queues: for queue in queues:
msg = await queue.get() msg = await queue.get()
assert msg.data == packet.publish[0].data assert msg.data == msg_content
await cleanup() await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fanout(): async def test_fanout():
# Create libp2p hosts # Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 10 num_hosts = 10
num_msgs = 5 num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts) libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances # Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \ SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5) 10, 9, 11, 30, 3, 5, 0.5)
# All pubsub subscribe to foobar # All pubsub subscribe to foobar except for `pubsubs[0]`
queues = [] queues = []
for i in range(1, len(pubsubs)): for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar") q = await pubsubs[i].subscribe("foobar")
@ -279,71 +278,61 @@ async def test_fanout():
# Wait 2 seconds for heartbeat to allow mesh to connect # Wait 2 seconds for heartbeat to allow mesh to connect
await asyncio.sleep(2) await asyncio.sleep(2)
topic = "foobar"
# Send messages with origin not subscribed # Send messages with origin not subscribed
for i in range(num_msgs): for i in range(num_msgs):
msg_content = "foo " + str(i) msg_content = b"foo " + i.to_bytes(1, "big")
# Pick the message origin to the node that is not subscribed to 'foobar' # Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0 origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
for queue in queues: for queue in queues:
msg = await queue.get() msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString() assert msg.data == msg_content
# Subscribe message origin # Subscribe message origin
queues.append(await pubsubs[0].subscribe("foobar")) queues.insert(0, await pubsubs[0].subscribe(topic))
# Send messages again # Send messages again
for i in range(num_msgs): for i in range(num_msgs):
msg_content = "foo " + str(i) msg_content = b"bar " + i.to_bytes(1, 'big')
# Pick the message origin to the node that is not subscribed to 'foobar' # Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0 origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
for queue in queues: for queue in queues:
msg = await queue.get() msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString() assert msg.data == msg_content
await cleanup() await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fanout_maintenance(): async def test_fanout_maintenance():
# Create libp2p hosts # Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 10 num_hosts = 10
num_msgs = 5 num_msgs = 5
libp2p_hosts = await create_libp2p_hosts(num_hosts) libp2p_hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances # Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \
SUPPORTED_PROTOCOLS, \ SUPPORTED_PROTOCOLS, \
10, 9, 11, 30, 3, 5, 0.5) 10, 9, 11, 30, 3, 5, 0.5)
# All pubsub subscribe to foobar # All pubsub subscribe to foobar
queues = [] queues = []
topic = "foobar"
for i in range(1, len(pubsubs)): for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar") q = await pubsubs[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues # Add each blocking queue to an array of blocking queues
queues.append(q) queues.append(q)
@ -356,27 +345,22 @@ async def test_fanout_maintenance():
# Send messages with origin not subscribed # Send messages with origin not subscribed
for i in range(num_msgs): for i in range(num_msgs):
msg_content = "foo " + str(i) msg_content = b"foo " + i.to_bytes(1, 'big')
# Pick the message origin to the node that is not subscribed to 'foobar' # Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0 origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
for queue in queues: for queue in queues:
msg = await queue.get() msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString() assert msg.data == msg_content
for sub in pubsubs: for sub in pubsubs:
await sub.unsubscribe('foobar') await sub.unsubscribe(topic)
queues = [] queues = []
@ -384,7 +368,7 @@ async def test_fanout_maintenance():
# Resub and repeat # Resub and repeat
for i in range(1, len(pubsubs)): for i in range(1, len(pubsubs)):
q = await pubsubs[i].subscribe("foobar") q = await pubsubs[i].subscribe(topic)
# Add each blocking queue to an array of blocking queues # Add each blocking queue to an array of blocking queues
queues.append(q) queues.append(q)
@ -393,65 +377,61 @@ async def test_fanout_maintenance():
# Check messages can still be sent # Check messages can still be sent
for i in range(num_msgs): for i in range(num_msgs):
msg_content = "foo " + str(i) msg_content = b"bar " + i.to_bytes(1, 'big')
# Pick the message origin to the node that is not subscribed to 'foobar' # Pick the message origin to the node that is not subscribed to 'foobar'
origin_idx = 0 origin_idx = 0
origin_host = libp2p_hosts[origin_idx]
host_id = str(origin_host.get_id())
# Generate message packet
packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) await pubsubs[origin_idx].publish(topic, msg_content)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Assert that all blocking queues receive the message # Assert that all blocking queues receive the message
for queue in queues: for queue in queues:
msg = await queue.get() msg = await queue.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString() assert msg.data == msg_content
await cleanup() await cleanup()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gossip_propagation(): async def test_gossip_propagation():
# Create libp2p hosts # Create libp2p hosts
next_msg_id_func = message_id_generator(0)
num_hosts = 2 num_hosts = 2
libp2p_hosts = await create_libp2p_hosts(num_hosts) hosts = await create_libp2p_hosts(num_hosts)
# Create pubsub, gossipsub instances # Create pubsub, gossipsub instances
pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ pubsubs, _ = create_pubsub_and_gossipsub_instances(
SUPPORTED_PROTOCOLS, \ hosts,
1, 0, 2, 30, 50, 100, 0.5) SUPPORTED_PROTOCOLS,
node1, node2 = libp2p_hosts[0], libp2p_hosts[1] 1,
sub1, sub2 = pubsubs[0], pubsubs[1] 0,
gsub1, gsub2 = gossipsubs[0], gossipsubs[1] 2,
30,
50,
100,
0.5,
)
node1_queue = await sub1.subscribe('foo') topic = "foo"
await pubsubs[0].subscribe(topic)
# node 1 publish to topic # node 0 publish to topic
msg_content = 'foo_msg' msg_content = b'foo_msg'
node1_id = str(node1.get_id())
# Generate message packet
packet = generate_RPC_packet(node1_id, ["foo"], msg_content, next_msg_id_func())
# publish from the randomly chosen host # publish from the randomly chosen host
await gsub1.publish(node1_id, packet.SerializeToString()) await pubsubs[0].publish(topic, msg_content)
# now node 2 subscribes # now node 1 subscribes
node2_queue = await sub2.subscribe('foo') queue_1 = await pubsubs[1].subscribe(topic)
await connect(node2, node1) await connect(hosts[0], hosts[1])
# wait for gossip heartbeat # wait for gossip heartbeat
await asyncio.sleep(2) await asyncio.sleep(2)
# should be able to read message # should be able to read message
msg = await node2_queue.get() msg = await queue_1.get()
assert msg.SerializeToString() == packet.publish[0].SerializeToString() assert msg.data == msg_content
await cleanup() await cleanup()

View File

@ -1,35 +1,31 @@
import asyncio import functools
import multiaddr
import pytest import pytest
from libp2p import new_node from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.gossipsub import GossipSub from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from utils import message_id_generator, generate_RPC_packet
from tests.utils import cleanup from tests.utils import cleanup
from .configs import (
FLOODSUB_PROTOCOL_ID,
LISTEN_MADDR,
)
from .floodsub_integration_test_settings import (
perform_test_from_obj,
floodsub_protocol_pytest_params,
)
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_init(): async def test_gossipsub_initialize_with_floodsub_protocol():
node = await new_node(transport_opt=["/ip4/127.1/tcp/0"]) node = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.1/tcp/0")) await node.get_network().listen(LISTEN_MADDR)
supported_protocols = ["/gossipsub/1.0.0"] gossipsub = GossipSub([FLOODSUB_PROTOCOL_ID], 3, 2, 4, 30)
gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30)
pubsub = Pubsub(node, gossipsub, "a") pubsub = Pubsub(node, gossipsub, "a")
# Did it work? # Did it work?
@ -37,483 +33,20 @@ async def test_init():
await cleanup() await cleanup()
async def perform_test_from_obj(obj):
"""
Perform a floodsub test from a test obj.
test obj are composed as follows:
{
"supported_protocols": ["supported/protocol/1.0.0",...],
"adj_list": {
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
...
},
"topic_map": {
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
},
"messages": [
{
"topics": ["topic1_for_message", "topic2_for_message", ...],
"data": "some contents of the message (newlines are not supported)",
"node_id": "message sender node id"
},
...
]
}
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work)
"""
# Step 1) Create graph
adj_list = obj["adj_list"]
node_map = {}
gossipsub_map = {}
pubsub_map = {}
supported_protocols = obj["supported_protocols"]
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[start_node_id] = node
gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30)
gossipsub_map[start_node_id] = gossipsub
pubsub = Pubsub(node, gossipsub, start_node_id)
pubsub_map[start_node_id] = pubsub
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
node_map[neighbor_id] = neighbor_node
gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30)
gossipsub_map[neighbor_id] = gossipsub
pubsub = Pubsub(neighbor_node, gossipsub, neighbor_id)
pubsub_map[neighbor_id] = pubsub
# Connect node and neighbor
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
tasks_connect.append(asyncio.sleep(2))
await asyncio.gather(*tasks_connect)
# Allow time for graph creation before continuing
# await asyncio.sleep(0.25)
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic in topic_map:
for node_id in topic_map[topic]:
"""
# Subscribe node to topic
q = await pubsub_map[node_id].subscribe(topic)
# Create topic-queue map for node_id if one does not yet exist
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
"""
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
for i in range(len(responses) - 1):
q = responses[i]
node_id, topic = tasks_topic_data[i]
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = q
# Allow time for subscribing before continuing
# await asyncio.sleep(0.01)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
next_msg_id_func = message_id_generator(0)
for msg in messages:
topics = msg["topics"]
data = msg["data"]
node_id = msg["node_id"]
# Get actual id for sender node (not the id from the test obj)
actual_node_id = str(node_map[node_id].get_id())
# Create correctly formatted message
msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func())
# Publish message
tasks_publish.append(asyncio.ensure_future(gossipsub_map[node_id].publish(\
actual_node_id, msg_talk.SerializeToString())))
# For each topic in topics, add topic, msg_talk tuple to ordered test list
# TODO: Update message sender to be correct message sender before
# adding msg_talk to this list
for topic in topics:
topics_in_msgs_ordered.append((topic, msg_talk))
# Allow time for publishing before continuing
# await asyncio.sleep(0.4)
tasks_publish.append(asyncio.sleep(2))
await asyncio.gather(*tasks_publish)
# Step 4) Check that all messages were received correctly.
# TODO: Check message sender too
for i in range(len(topics_in_msgs_ordered)):
topic, actual_msg = topics_in_msgs_ordered[i]
# Look at each node in each topic
for node_id in topic_map[topic]:
# Get message from subscription queue
msg_on_node = await queues_map[node_id][topic].get()
assert actual_msg.publish[0].SerializeToString() == msg_on_node.SerializeToString()
# Success, terminate pending tasks.
await cleanup()
@pytest.mark.parametrize(
"test_case_obj",
floodsub_protocol_pytest_params,
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_two_nodes_test_obj(): async def test_gossipsub_run_with_floodsub_tests(test_case_obj):
test_obj = { await perform_test_from_obj(
"supported_protocols": ["/floodsub/1.0.0"], test_case_obj,
"adj_list": { functools.partial(
"A": ["B"] GossipSub,
}, degree=3,
"topic_map": { degree_low=2,
"topic1": ["B"] degree_high=4,
}, time_to_live=30,
"messages": [ )
{ )
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_two_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"],
"B": ["C"]
},
"topic_map": {
"topic1": ["B", "C"],
"topic2": ["B", "C"]
},
"messages": [
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
},
{
"topics": ["topic2"],
"data": "Alex is tall",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_two_nodes_one_topic_two_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"A": ["B"]
},
"topic_map": {
"topic1": ["B"]
},
"messages": [
{
"topics": ["topic1"],
"data": "Alex is tall",
"node_id": "B"
},
{
"topics": ["topic1"],
"data": "foo",
"node_id": "A"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_one_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["2", "3", "4", "5", "6", "7"],
"space": ["2", "3", "4", "5", "6", "7"],
"onions": ["2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "1"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_seven_nodes_tree_three_topics_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["4", "5"],
"3": ["6", "7"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
"space": ["1", "2", "3", "4", "5", "6", "7"],
"onions": ["1", "2", "3", "4", "5", "6", "7"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["space"],
"data": "foobar",
"node_id": "4"
},
{
"topics": ["onions"],
"data": "I am allergic",
"node_id": "7"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3"],
"2": ["3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3"],
"school": ["1", "2", "3"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2", "3", "4"],
"2": ["1", "3", "4"],
"3": ["1", "2", "4"],
"4": ["1", "2", "3"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4"],
"school": ["1", "2", "3", "4"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar2",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar3",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic3",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)
@pytest.mark.asyncio
async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj():
test_obj = {
"supported_protocols": ["/floodsub/1.0.0"],
"adj_list": {
"1": ["2"],
"2": ["3"],
"3": ["4"],
"4": ["5"],
"5": ["1"]
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4", "5"],
"school": ["1", "2", "3", "4", "5"]
},
"messages": [
{
"topics": ["astrophysics"],
"data": "e=mc^2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar2",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic2",
"node_id": "1"
},
{
"topics": ["school"],
"data": "foobar3",
"node_id": "2"
},
{
"topics": ["astrophysics"],
"data": "I am allergic3",
"node_id": "1"
}
]
}
await perform_test_from_obj(test_obj)

View File

@ -1,14 +1,17 @@
import pytest import pytest
from libp2p.pubsub.mcache import MessageCache from libp2p.pubsub.mcache import MessageCache
class Msg: class Msg:
def __init__(self, topicIDs, seqno, from_id): def __init__(self, topicIDs, seqno, from_id):
# pylint: disable=invalid-name
self.topicIDs = topicIDs self.topicIDs = topicIDs
self.seqno = seqno, self.seqno = seqno
self.from_id = from_id self.from_id = from_id
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_mcache(): async def test_mcache():
# Ported from: # Ported from:

View File

@ -5,7 +5,10 @@ from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] from .configs import FLOODSUB_PROTOCOL_ID
SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
TESTING_TOPIC = "TEST_SUBSCRIBE" TESTING_TOPIC = "TEST_SUBSCRIBE"

View File

@ -1,13 +1,18 @@
import asyncio import asyncio
import multiaddr
import uuid
import random
import struct import struct
from typing import (
Sequence,
)
import multiaddr
from libp2p import new_node from libp2p import new_node
from libp2p.pubsub.pb import rpc_pb2 from libp2p.peer.id import ID
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.gossipsub import GossipSub from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import connect
def message_id_generator(start_val): def message_id_generator(start_val):
@ -17,6 +22,7 @@ def message_id_generator(start_val):
:return: message id :return: message id
""" """
val = start_val val = start_val
def generator(): def generator():
# Allow manipulation of val within closure # Allow manipulation of val within closure
nonlocal val nonlocal val
@ -29,6 +35,20 @@ def message_id_generator(start_val):
return generator return generator
def make_pubsub_msg(
origin_id: ID,
topic_ids: Sequence[str],
data: bytes,
seqno: bytes) -> rpc_pb2.Message:
return rpc_pb2.Message(
from_id=origin_id.to_bytes(),
seqno=seqno,
data=data,
topicIDs=list(topic_ids),
)
def generate_RPC_packet(origin_id, topics, msg_content, msg_id): def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
""" """
Generate RPC packet to send over wire Generate RPC packet to send over wire
@ -42,7 +62,7 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
from_id=origin_id.encode('utf-8'), from_id=origin_id.encode('utf-8'),
seqno=msg_id, seqno=msg_id,
data=msg_content.encode('utf-8'), data=msg_content.encode('utf-8'),
) )
for topic in topics: for topic in topics:
message.topicIDs.extend([topic.encode('utf-8')]) message.topicIDs.extend([topic.encode('utf-8')])
@ -50,13 +70,6 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id):
packet.publish.extend([message]) packet.publish.extend([message])
return packet return packet
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
async def create_libp2p_hosts(num_hosts): async def create_libp2p_hosts(num_hosts):
""" """
@ -78,8 +91,17 @@ async def create_libp2p_hosts(num_hosts):
return hosts return hosts
def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, degree, degree_low, \
degree_high, time_to_live, gossip_window, gossip_history, heartbeat_interval): def create_pubsub_and_gossipsub_instances(
libp2p_hosts,
supported_protocols,
degree,
degree_low,
degree_high,
time_to_live,
gossip_window,
gossip_history,
heartbeat_interval):
pubsubs = [] pubsubs = []
gossipsubs = [] gossipsubs = []
for node in libp2p_hosts: for node in libp2p_hosts:
@ -93,6 +115,10 @@ def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, deg
return pubsubs, gossipsubs return pubsubs, gossipsubs
# FIXME: There is no difference between `sparse_connect` and `dense_connect`,
# before `connect_some` is fixed.
async def sparse_connect(hosts): async def sparse_connect(hosts):
await connect_some(hosts, 3) await connect_some(hosts, 3)
@ -101,6 +127,7 @@ async def dense_connect(hosts):
await connect_some(hosts, 10) await connect_some(hosts, 10)
# FIXME: `degree` is not used at all
async def connect_some(hosts, degree): async def connect_some(hosts, degree):
for i, host in enumerate(hosts): for i, host in enumerate(hosts):
for j, host2 in enumerate(hosts): for j, host2 in enumerate(hosts):
@ -123,6 +150,7 @@ async def connect_some(hosts, degree):
# j += 1 # j += 1
async def one_to_all_connect(hosts, central_host_index): async def one_to_all_connect(hosts, central_host_index):
for i, host in enumerate(hosts): for i, host in enumerate(hosts):
if i != central_host_index: if i != central_host_index:

View File

@ -3,6 +3,16 @@ import asyncio
import multiaddr import multiaddr
from libp2p import new_node from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
async def connect(node1, node2):
"""
Connect node1 to node2
"""
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
async def cleanup(): async def cleanup():