[WIP] PubSub and FloodSub development (#133)
* Add notifee interface * Add notify function to network interface * Implement notify feature * Add tests for notify * Make notifee functions all async * Fix linting issue * Fix linting issue * Scaffold pubsub router interface * Scaffold pubsub directory * Store peer_id in muxed connection * Implement pubsub notifee * Remove outdated files * Implement pubsub first attempt * Prepare pubsub for floodsub * Add mplex conn to net stream and add conn in notify tests * Implement floodsub * Use NetStream in generic protocol handler * Debugging async issues * Modify test to perform proper assert. Test passes * Remove callbacks. Reduce sleep time * Add simple three node test * Clean up code. Add message classes * Add test for two topics * Add conn to net stream and conn tests * Refactor test setup to remove duplicate code * Fix linting issues * Fix linting issue * Fix linting issue * Fix outstanding unrelated lint issue in multiselect_client * Add connect function * Remove debug prints * Remove debug prints from floodsub * Use MessageTalk in place of direct message breakdown * Remove extra prints * Remove outdated function * Add message to queues for all topics in message * Debugging * Add message self delivery * Increase read timeout to 5 to get pubsub tests passing * Refactor testing helper func. Add tests * Add tests and increase timeout to get tests passing * Add dummy account demo scaffolding * Attempt to use threads. Test fails * Implement basic dummy node tests using threads * Add generic testing function * Add simple seven node tree test * Add more complex seven node tree tests * Add five node ring tests * Remove unnecessary get_message_type func * Add documentation to classes * Add message id to messages * Add documentation to test helper func * Add docs to dummy account node helper func * Add more docs to dummy account node test helper func * fixed linting errors in floodsub * small notify bugfix * move pubsub into libp2p * fixed pubsub linting * fixing pubsub test failures * linting
This commit is contained in:
parent
fee905ace2
commit
9c6d441f9f
|
@ -70,7 +70,8 @@ class Swarm(INetwork):
|
||||||
raw_conn = await self.transport.dial(multiaddr, self.self_id)
|
raw_conn = await self.transport.dial(multiaddr, self.self_id)
|
||||||
|
|
||||||
# Use upgrader to upgrade raw conn to muxed conn
|
# Use upgrader to upgrade raw conn to muxed conn
|
||||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn, self.generic_protocol_handler)
|
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
|
||||||
|
self.generic_protocol_handler, peer_id)
|
||||||
|
|
||||||
# Store muxed connection in connections
|
# Store muxed connection in connections
|
||||||
self.connections[peer_id] = muxed_conn
|
self.connections[peer_id] = muxed_conn
|
||||||
|
@ -145,7 +146,7 @@ class Swarm(INetwork):
|
||||||
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
|
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
|
||||||
multiaddr.value_for_protocol('tcp'), reader, writer, False)
|
multiaddr.value_for_protocol('tcp'), reader, writer, False)
|
||||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
|
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
|
||||||
self.generic_protocol_handler)
|
self.generic_protocol_handler, peer_id)
|
||||||
|
|
||||||
# Store muxed_conn with peer id
|
# Store muxed_conn with peer id
|
||||||
self.connections[peer_id] = muxed_conn
|
self.connections[peer_id] = muxed_conn
|
||||||
|
@ -197,14 +198,17 @@ def create_generic_protocol_handler(swarm):
|
||||||
|
|
||||||
async def generic_protocol_handler(muxed_stream):
|
async def generic_protocol_handler(muxed_stream):
|
||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
_, handler = await multiselect.negotiate(muxed_stream)
|
protocol, handler = await multiselect.negotiate(muxed_stream)
|
||||||
|
|
||||||
|
net_stream = NetStream(muxed_stream)
|
||||||
|
net_stream.set_protocol(protocol)
|
||||||
|
|
||||||
# Call notifiers since event occurred
|
# Call notifiers since event occurred
|
||||||
for notifee in swarm.notifees:
|
for notifee in swarm.notifees:
|
||||||
await notifee.opened_stream(swarm, muxed_stream)
|
await notifee.opened_stream(swarm, net_stream)
|
||||||
|
|
||||||
# Give to stream handler
|
# Give to stream handler
|
||||||
asyncio.ensure_future(handler(muxed_stream))
|
asyncio.ensure_future(handler(net_stream))
|
||||||
|
|
||||||
return generic_protocol_handler
|
return generic_protocol_handler
|
||||||
|
|
||||||
|
|
0
libp2p/pubsub/__init__.py
Normal file
0
libp2p/pubsub/__init__.py
Normal file
98
libp2p/pubsub/floodsub.py
Normal file
98
libp2p/pubsub/floodsub.py
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
from .pubsub_router_interface import IPubsubRouter
|
||||||
|
from .message import create_message_talk
|
||||||
|
|
||||||
|
class FloodSub(IPubsubRouter):
|
||||||
|
|
||||||
|
def __init__(self, protocols):
|
||||||
|
self.protocols = protocols
|
||||||
|
self.pubsub = None
|
||||||
|
|
||||||
|
def get_protocols(self):
|
||||||
|
"""
|
||||||
|
:return: the list of protocols supported by the router
|
||||||
|
"""
|
||||||
|
return self.protocols
|
||||||
|
|
||||||
|
def attach(self, pubsub):
|
||||||
|
"""
|
||||||
|
Attach is invoked by the PubSub constructor to attach the router to a
|
||||||
|
freshly initialized PubSub instance.
|
||||||
|
:param pubsub: pubsub instance to attach to
|
||||||
|
"""
|
||||||
|
self.pubsub = pubsub
|
||||||
|
|
||||||
|
def add_peer(self, peer_id, protocol_id):
|
||||||
|
"""
|
||||||
|
Notifies the router that a new peer has been connected
|
||||||
|
:param peer_id: id of peer to add
|
||||||
|
"""
|
||||||
|
|
||||||
|
def remove_peer(self, peer_id):
|
||||||
|
"""
|
||||||
|
Notifies the router that a peer has been disconnected
|
||||||
|
:param peer_id: id of peer to remove
|
||||||
|
"""
|
||||||
|
|
||||||
|
def handle_rpc(self, rpc):
|
||||||
|
"""
|
||||||
|
Invoked to process control messages in the RPC envelope.
|
||||||
|
It is invoked after subscriptions and payload messages have been processed
|
||||||
|
:param rpc: rpc message
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def publish(self, sender_peer_id, message):
|
||||||
|
"""
|
||||||
|
Invoked to forward a new message that has been validated.
|
||||||
|
This is where the "flooding" part of floodsub happens
|
||||||
|
|
||||||
|
With flooding, routing is almost trivial: for each incoming message,
|
||||||
|
forward to all known peers in the topic. There is a bit of logic,
|
||||||
|
as the router maintains a timed cache of previous messages,
|
||||||
|
so that seen messages are not further forwarded.
|
||||||
|
It also never forwards a message back to the source
|
||||||
|
or the peer that forwarded the message.
|
||||||
|
:param sender_peer_id: peer_id of message sender
|
||||||
|
:param message: message to forward
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Encode message
|
||||||
|
encoded_msg = message.encode()
|
||||||
|
|
||||||
|
# Get message sender, origin, and topics
|
||||||
|
msg_talk = create_message_talk(message)
|
||||||
|
msg_sender = str(sender_peer_id)
|
||||||
|
msg_origin = msg_talk.origin_id
|
||||||
|
topics = msg_talk.topics
|
||||||
|
|
||||||
|
# Deliver to self if self was origin
|
||||||
|
# Note: handle_talk checks if self is subscribed to topics in message
|
||||||
|
if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()):
|
||||||
|
await self.pubsub.handle_talk(message)
|
||||||
|
|
||||||
|
# Deliver to self and peers
|
||||||
|
for topic in topics:
|
||||||
|
if topic in self.pubsub.peer_topics:
|
||||||
|
for peer_id_in_topic in self.pubsub.peer_topics[topic]:
|
||||||
|
# Forward to all known peers in the topic that are not the
|
||||||
|
# message sender and are not the message origin
|
||||||
|
if peer_id_in_topic not in (msg_sender, msg_origin):
|
||||||
|
stream = self.pubsub.peers[peer_id_in_topic]
|
||||||
|
await stream.write(encoded_msg)
|
||||||
|
else:
|
||||||
|
# Implies publish did not write
|
||||||
|
print("publish did not write")
|
||||||
|
|
||||||
|
def join(self, topic):
|
||||||
|
"""
|
||||||
|
Join notifies the router that we want to receive and
|
||||||
|
forward messages in a topic. It is invoked after the
|
||||||
|
subscription announcement
|
||||||
|
:param topic: topic to join
|
||||||
|
"""
|
||||||
|
|
||||||
|
def leave(self, topic):
|
||||||
|
"""
|
||||||
|
Leave notifies the router that we are no longer interested in a topic.
|
||||||
|
It is invoked after the unsubscription announcement.
|
||||||
|
:param topic: topic to leave
|
||||||
|
"""
|
118
libp2p/pubsub/message.py
Normal file
118
libp2p/pubsub/message.py
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
class MessageTalk():
|
||||||
|
|
||||||
|
"""
|
||||||
|
Object to make parsing talk messages easier, where talk messages are
|
||||||
|
defined as custom messages published to a set of topics
|
||||||
|
"""
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
|
def __init__(self, from_id, origin_id, topics, data, message_id):
|
||||||
|
# pylint: disable=too-many-arguments
|
||||||
|
self.msg_type = "talk"
|
||||||
|
self.from_id = from_id
|
||||||
|
self.origin_id = origin_id
|
||||||
|
self.topics = topics
|
||||||
|
self.data = data
|
||||||
|
self.message_id = message_id
|
||||||
|
|
||||||
|
def to_str(self):
|
||||||
|
"""
|
||||||
|
Convert to string
|
||||||
|
:return: MessageTalk object in string representation
|
||||||
|
"""
|
||||||
|
out = self.msg_type + '\n'
|
||||||
|
out += self.from_id + '\n'
|
||||||
|
out += self.origin_id + '\n'
|
||||||
|
out += self.message_id + '\n'
|
||||||
|
for i in range(len(self.topics)):
|
||||||
|
out += self.topics[i]
|
||||||
|
if i < len(self.topics) - 1:
|
||||||
|
out += ','
|
||||||
|
out += '\n' + self.data
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
class MessageSub():
|
||||||
|
"""
|
||||||
|
Object to make parsing subscription messages easier, where subscription
|
||||||
|
messages are defined as indicating the topics a node wishes to subscribe to
|
||||||
|
or unsubscribe from
|
||||||
|
"""
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
|
def __init__(self, from_id, origin_id, subs_map, message_id):
|
||||||
|
self.msg_type = "subscription"
|
||||||
|
self.from_id = from_id
|
||||||
|
self.origin_id = origin_id
|
||||||
|
self.subs_map = subs_map
|
||||||
|
self.message_id = message_id
|
||||||
|
|
||||||
|
def to_str(self):
|
||||||
|
"""
|
||||||
|
Convert to string
|
||||||
|
:return: MessageSub object in string representation
|
||||||
|
"""
|
||||||
|
out = self.msg_type + '\n'
|
||||||
|
out += self.from_id + '\n'
|
||||||
|
out += self.origin_id + '\n'
|
||||||
|
out += self.message_id
|
||||||
|
|
||||||
|
if self.subs_map:
|
||||||
|
out += '\n'
|
||||||
|
|
||||||
|
keys = list(self.subs_map)
|
||||||
|
|
||||||
|
for i, topic in enumerate(keys):
|
||||||
|
sub = self.subs_map[topic]
|
||||||
|
if sub:
|
||||||
|
out += "sub:"
|
||||||
|
else:
|
||||||
|
out += "unsub:"
|
||||||
|
out += topic
|
||||||
|
if i < len(keys) - 1:
|
||||||
|
out += '\n'
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
def create_message_talk(msg_talk_as_str):
|
||||||
|
"""
|
||||||
|
Create a MessageTalk object from a MessageTalk string representation
|
||||||
|
:param msg_talk_as_str: a MessageTalk object in its string representation
|
||||||
|
:return: MessageTalk object
|
||||||
|
"""
|
||||||
|
msg_comps = msg_talk_as_str.split('\n')
|
||||||
|
from_id = msg_comps[1]
|
||||||
|
origin_id = msg_comps[2]
|
||||||
|
message_id = msg_comps[3]
|
||||||
|
topics = msg_comps[4].split(',')
|
||||||
|
data = msg_comps[5]
|
||||||
|
return MessageTalk(from_id, origin_id, topics, data, message_id)
|
||||||
|
|
||||||
|
def create_message_sub(msg_sub_as_str):
|
||||||
|
"""
|
||||||
|
Create a MessageSub object from a MessageSub string representation
|
||||||
|
:param msg_talk_as_str: a MessageSub object in its string representation
|
||||||
|
:return: MessageSub object
|
||||||
|
"""
|
||||||
|
msg_comps = msg_sub_as_str.split('\n')
|
||||||
|
from_id = msg_comps[1]
|
||||||
|
origin_id = msg_comps[2]
|
||||||
|
message_id = msg_comps[3]
|
||||||
|
|
||||||
|
subs_map = {}
|
||||||
|
for i in range(4, len(msg_comps)):
|
||||||
|
sub_comps = msg_comps[i].split(":")
|
||||||
|
topic = sub_comps[1]
|
||||||
|
if sub_comps[0] == "sub":
|
||||||
|
subs_map[topic] = True
|
||||||
|
else:
|
||||||
|
subs_map[topic] = False
|
||||||
|
return MessageSub(from_id, origin_id, subs_map, message_id)
|
||||||
|
|
||||||
|
def generate_message_id():
|
||||||
|
"""
|
||||||
|
Generate a unique message id
|
||||||
|
:return: messgae id
|
||||||
|
"""
|
||||||
|
return str(uuid.uuid1())
|
294
libp2p/pubsub/pubsub.py
Normal file
294
libp2p/pubsub/pubsub.py
Normal file
|
@ -0,0 +1,294 @@
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from .pubsub_notifee import PubsubNotifee
|
||||||
|
from .message import MessageSub
|
||||||
|
from .message import create_message_talk, create_message_sub
|
||||||
|
from. message import generate_message_id
|
||||||
|
|
||||||
|
|
||||||
|
class Pubsub():
|
||||||
|
"""
|
||||||
|
For now, because I'm on a plane and don't have access to the go repo/protobuf stuff,
|
||||||
|
this is going to be the message format for the two types: subscription and talk
|
||||||
|
subscription indicates subscribing or unsubscribing from a topic
|
||||||
|
talk is sending a message on topic(s)
|
||||||
|
subscription format:
|
||||||
|
subscription
|
||||||
|
'from'
|
||||||
|
<one of 'sub', 'unsub'>:'topicid'
|
||||||
|
<one of 'sub', 'unsub'>:'topicid'
|
||||||
|
...
|
||||||
|
Ex.
|
||||||
|
subscription
|
||||||
|
msg_sender_peer_id
|
||||||
|
origin_peer_id
|
||||||
|
sub:topic1
|
||||||
|
sub:topic2
|
||||||
|
unsub:fav_topic
|
||||||
|
talk format:
|
||||||
|
talk
|
||||||
|
'from'
|
||||||
|
'origin'
|
||||||
|
[topic_ids comma-delimited]
|
||||||
|
'data'
|
||||||
|
Ex.
|
||||||
|
talk
|
||||||
|
msg_sender_peer_id
|
||||||
|
origin_peer_id
|
||||||
|
topic1,topics_are_cool,foo
|
||||||
|
I like tacos
|
||||||
|
"""
|
||||||
|
# pylint: disable=too-many-instance-attributes
|
||||||
|
|
||||||
|
def __init__(self, host, router, my_id):
|
||||||
|
"""
|
||||||
|
Construct a new Pubsub object, which is responsible for handling all
|
||||||
|
Pubsub-related messages and relaying messages as appropriate to the
|
||||||
|
Pubsub router (which is responsible for choosing who to send messages to).
|
||||||
|
Since the logic for choosing peers to send pubsub messages to is
|
||||||
|
in the router, the same Pubsub impl can back floodsub, gossipsub, etc.
|
||||||
|
"""
|
||||||
|
self.host = host
|
||||||
|
self.router = router
|
||||||
|
self.my_id = my_id
|
||||||
|
|
||||||
|
# Attach this new Pubsub object to the router
|
||||||
|
self.router.attach(self)
|
||||||
|
|
||||||
|
# Register a notifee
|
||||||
|
self.peer_queue = asyncio.Queue()
|
||||||
|
self.host.get_network().notify(PubsubNotifee(self.peer_queue))
|
||||||
|
|
||||||
|
# Register stream handlers for each pubsub router protocol to handle
|
||||||
|
# the pubsub streams opened on those protocols
|
||||||
|
self.protocols = self.router.get_protocols()
|
||||||
|
for protocol in self.protocols:
|
||||||
|
self.host.set_stream_handler(protocol, self.stream_handler)
|
||||||
|
|
||||||
|
# TODO: determine if these need to be asyncio queues, or if could possibly
|
||||||
|
# be ordinary blocking queues
|
||||||
|
self.incoming_msgs_from_peers = asyncio.Queue()
|
||||||
|
self.outgoing_messages = asyncio.Queue()
|
||||||
|
|
||||||
|
# TODO: Make seen_messages a cache (LRU cache?)
|
||||||
|
self.seen_messages = []
|
||||||
|
|
||||||
|
# Map of topics we are subscribed to to handler functions
|
||||||
|
# for when the given topic receives a message
|
||||||
|
self.my_topics = {}
|
||||||
|
|
||||||
|
# Map of topic to peers to keep track of what peers are subscribed to
|
||||||
|
self.peer_topics = {}
|
||||||
|
|
||||||
|
# Create peers map, which maps peer_id (as string) to stream (to a given peer)
|
||||||
|
self.peers = {}
|
||||||
|
|
||||||
|
# Call handle peer to keep waiting for updates to peer queue
|
||||||
|
asyncio.ensure_future(self.handle_peer_queue())
|
||||||
|
|
||||||
|
def get_hello_packet(self):
|
||||||
|
"""
|
||||||
|
Generate subscription message with all topics we are subscribed to
|
||||||
|
"""
|
||||||
|
subs_map = {}
|
||||||
|
for topic in self.my_topics:
|
||||||
|
subs_map[topic] = True
|
||||||
|
sub_msg = MessageSub(
|
||||||
|
str(self.host.get_id()),\
|
||||||
|
str(self.host.get_id()), subs_map, generate_message_id()\
|
||||||
|
)
|
||||||
|
return sub_msg.to_str()
|
||||||
|
|
||||||
|
async def continously_read_stream(self, stream):
|
||||||
|
"""
|
||||||
|
Read from input stream in an infinite loop. Process
|
||||||
|
messages from other nodes, which for now are considered MessageTalk
|
||||||
|
and MessageSub messages.
|
||||||
|
TODO: Handle RPC messages instead of my Aspyn's own custom message format
|
||||||
|
:param stream: stream to continously read from
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
incoming = (await stream.read()).decode()
|
||||||
|
msg_comps = incoming.split('\n')
|
||||||
|
msg_type = msg_comps[0]
|
||||||
|
|
||||||
|
msg_sender = msg_comps[1]
|
||||||
|
# msg_origin = msg_comps[2]
|
||||||
|
msg_id = msg_comps[3]
|
||||||
|
print("HIT ME1")
|
||||||
|
if msg_id not in self.seen_messages:
|
||||||
|
print("HIT ME")
|
||||||
|
# Do stuff with incoming unseen message
|
||||||
|
should_publish = True
|
||||||
|
if msg_type == "subscription":
|
||||||
|
self.handle_subscription(incoming)
|
||||||
|
|
||||||
|
# We don't need to relay the subscription to our
|
||||||
|
# peers because a given node only needs its peers
|
||||||
|
# to know that it is subscribed to the topic (doesn't
|
||||||
|
# need everyone to know)
|
||||||
|
should_publish = False
|
||||||
|
elif msg_type == "talk":
|
||||||
|
await self.handle_talk(incoming)
|
||||||
|
|
||||||
|
# Add message id to seen
|
||||||
|
self.seen_messages.append(msg_id)
|
||||||
|
|
||||||
|
# Publish message using router's publish
|
||||||
|
if should_publish:
|
||||||
|
msg = create_message_talk(incoming)
|
||||||
|
|
||||||
|
# Adjust raw_msg to that the message sender
|
||||||
|
# is now our peer_id
|
||||||
|
msg.from_id = str(self.host.get_id())
|
||||||
|
|
||||||
|
await self.router.publish(msg_sender, msg.to_str())
|
||||||
|
|
||||||
|
# Force context switch
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
async def stream_handler(self, stream):
|
||||||
|
"""
|
||||||
|
Stream handler for pubsub. Gets invoked whenever a new stream is created
|
||||||
|
on one of the supported pubsub protocols.
|
||||||
|
:param stream: newly created stream
|
||||||
|
"""
|
||||||
|
# Add peer
|
||||||
|
# Map peer to stream
|
||||||
|
peer_id = stream.mplex_conn.peer_id
|
||||||
|
self.peers[str(peer_id)] = stream
|
||||||
|
self.router.add_peer(peer_id, stream.get_protocol())
|
||||||
|
|
||||||
|
# Send hello packet
|
||||||
|
hello = self.get_hello_packet()
|
||||||
|
await stream.write(hello.encode())
|
||||||
|
# Pass stream off to stream reader
|
||||||
|
asyncio.ensure_future(self.continously_read_stream(stream))
|
||||||
|
|
||||||
|
async def handle_peer_queue(self):
|
||||||
|
"""
|
||||||
|
Continuously read from peer queue and each time a new peer is found,
|
||||||
|
open a stream to the peer using a supported pubsub protocol
|
||||||
|
TODO: Handle failure for when the peer does not support any of the
|
||||||
|
pubsub protocols we support
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
peer_id = await self.peer_queue.get()
|
||||||
|
|
||||||
|
# Open a stream to peer on existing connection
|
||||||
|
# (we know connection exists since that's the only way
|
||||||
|
# an element gets added to peer_queue)
|
||||||
|
stream = await self.host.new_stream(peer_id, self.protocols)
|
||||||
|
|
||||||
|
# Add Peer
|
||||||
|
# Map peer to stream
|
||||||
|
self.peers[str(peer_id)] = stream
|
||||||
|
self.router.add_peer(peer_id, stream.get_protocol())
|
||||||
|
|
||||||
|
# Send hello packet
|
||||||
|
hello = self.get_hello_packet()
|
||||||
|
await stream.write(hello.encode())
|
||||||
|
|
||||||
|
# Pass stream off to stream reader
|
||||||
|
asyncio.ensure_future(self.continously_read_stream(stream))
|
||||||
|
|
||||||
|
# Force context switch
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
def handle_subscription(self, subscription):
|
||||||
|
"""
|
||||||
|
Handle an incoming subscription message from a peer. Update internal
|
||||||
|
mapping to mark the peer as subscribed or unsubscribed to topics as
|
||||||
|
defined in the subscription message
|
||||||
|
:param subscription: raw data constituting a subscription message
|
||||||
|
"""
|
||||||
|
sub_msg = create_message_sub(subscription)
|
||||||
|
if sub_msg.subs_map:
|
||||||
|
print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id)
|
||||||
|
for topic_id in sub_msg.subs_map:
|
||||||
|
# Look at each subscription in the msg individually
|
||||||
|
if sub_msg.subs_map[topic_id]:
|
||||||
|
if topic_id not in self.peer_topics:
|
||||||
|
# Create topic list if it did not yet exist
|
||||||
|
self.peer_topics[topic_id] = [sub_msg.origin_id]
|
||||||
|
elif sub_msg.origin_id not in self.peer_topics[topic_id]:
|
||||||
|
# Add peer to topic
|
||||||
|
self.peer_topics[topic_id].append(sub_msg.origin_id)
|
||||||
|
else:
|
||||||
|
# TODO: Remove peer from topic
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def handle_talk(self, talk):
|
||||||
|
"""
|
||||||
|
Handle incoming Talk message from a peer. A Talk message contains some
|
||||||
|
custom message that is published on a given topic(s)
|
||||||
|
:param talk: raw data constituting a talk message
|
||||||
|
"""
|
||||||
|
msg = create_message_talk(talk)
|
||||||
|
|
||||||
|
# Check if this message has any topics that we are subscribed to
|
||||||
|
for topic in msg.topics:
|
||||||
|
if topic in self.my_topics:
|
||||||
|
# we are subscribed to a topic this message was sent for,
|
||||||
|
# so add message to the subscription output queue
|
||||||
|
# for each topic
|
||||||
|
await self.my_topics[topic].put(talk)
|
||||||
|
|
||||||
|
async def subscribe(self, topic_id):
|
||||||
|
"""
|
||||||
|
Subscribe ourself to a topic
|
||||||
|
:param topic_id: topic_id to subscribe to
|
||||||
|
"""
|
||||||
|
# Map topic_id to blocking queue
|
||||||
|
self.my_topics[topic_id] = asyncio.Queue()
|
||||||
|
|
||||||
|
# Create subscribe message
|
||||||
|
sub_msg = MessageSub(
|
||||||
|
str(self.host.get_id()),\
|
||||||
|
str(self.host.get_id()), {topic_id: True}, generate_message_id()\
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send out subscribe message to all peers
|
||||||
|
await self.message_all_peers(sub_msg.to_str())
|
||||||
|
|
||||||
|
# Tell router we are joining this topic
|
||||||
|
self.router.join(topic_id)
|
||||||
|
|
||||||
|
# Return the asyncio queue for messages on this topic
|
||||||
|
return self.my_topics[topic_id]
|
||||||
|
|
||||||
|
async def unsubscribe(self, topic_id):
|
||||||
|
"""
|
||||||
|
Unsubscribe ourself from a topic
|
||||||
|
:param topic_id: topic_id to unsubscribe from
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Remove topic_id from map if present
|
||||||
|
if topic_id in self.my_topics:
|
||||||
|
del self.my_topics[topic_id]
|
||||||
|
|
||||||
|
# Create unsubscribe message
|
||||||
|
unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\
|
||||||
|
{topic_id: False}, generate_message_id())
|
||||||
|
|
||||||
|
# Send out unsubscribe message to all peers
|
||||||
|
await self.message_all_peers(unsub_msg.to_str())
|
||||||
|
|
||||||
|
# Tell router we are leaving this topic
|
||||||
|
self.router.leave(topic_id)
|
||||||
|
|
||||||
|
async def message_all_peers(self, raw_msg):
|
||||||
|
"""
|
||||||
|
Broadcast a message to peers
|
||||||
|
:param raw_msg: raw contents of the message to broadcast
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Encode message for sending
|
||||||
|
encoded_msg = raw_msg.encode()
|
||||||
|
|
||||||
|
# Broadcast message
|
||||||
|
for peer in self.peers:
|
||||||
|
stream = self.peers[peer]
|
||||||
|
|
||||||
|
# Write message to stream
|
||||||
|
await stream.write(encoded_msg)
|
40
libp2p/pubsub/pubsub_notifee.py
Normal file
40
libp2p/pubsub/pubsub_notifee.py
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
from libp2p.network.notifee_interface import INotifee
|
||||||
|
|
||||||
|
|
||||||
|
class PubsubNotifee(INotifee):
|
||||||
|
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
|
||||||
|
|
||||||
|
def __init__(self, initiator_peers_queue):
|
||||||
|
"""
|
||||||
|
:param initiator_peers_queue: queue to add new peers to so that pubsub
|
||||||
|
can process new peers after we connect to them
|
||||||
|
"""
|
||||||
|
self.initiator_peers_queue = initiator_peers_queue
|
||||||
|
|
||||||
|
async def opened_stream(self, network, stream):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def closed_stream(self, network, stream):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def connected(self, network, conn):
|
||||||
|
"""
|
||||||
|
Add peer_id to initiator_peers_queue, so that this peer_id can be used to
|
||||||
|
create a stream and we only want to have one pubsub stream with each peer.
|
||||||
|
:param network: network the connection was opened on
|
||||||
|
:param conn: connection that was opened
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Only add peer_id if we are initiator (otherwise we would end up
|
||||||
|
# with two pubsub streams between us and the peer)
|
||||||
|
if conn.initiator:
|
||||||
|
await self.initiator_peers_queue.put(conn.peer_id)
|
||||||
|
|
||||||
|
async def disconnected(self, network, conn):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def listen(self, network, multiaddr):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def listen_close(self, network, multiaddr):
|
||||||
|
pass
|
64
libp2p/pubsub/pubsub_router_interface.py
Normal file
64
libp2p/pubsub/pubsub_router_interface.py
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
class IPubsubRouter(ABC):
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_protocols(self):
|
||||||
|
"""
|
||||||
|
:return: the list of protocols supported by the router
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def attach(self, pubsub):
|
||||||
|
"""
|
||||||
|
Attach is invoked by the PubSub constructor to attach the router to a
|
||||||
|
freshly initialized PubSub instance.
|
||||||
|
:param pubsub: pubsub instance to attach to
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def add_peer(self, peer_id, protocol_id):
|
||||||
|
"""
|
||||||
|
Notifies the router that a new peer has been connected
|
||||||
|
:param peer_id: id of peer to add
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def remove_peer(self, peer_id):
|
||||||
|
"""
|
||||||
|
Notifies the router that a peer has been disconnected
|
||||||
|
:param peer_id: id of peer to remove
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def handle_rpc(self, rpc):
|
||||||
|
"""
|
||||||
|
Invoked to process control messages in the RPC envelope.
|
||||||
|
It is invoked after subscriptions and payload messages have been processed
|
||||||
|
:param rpc: rpc message
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def publish(self, sender_peer_id, message):
|
||||||
|
"""
|
||||||
|
Invoked to forward a new message that has been validated
|
||||||
|
:param sender_peer_id: peer_id of message sender
|
||||||
|
:param message: message to forward
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def join(self, topic):
|
||||||
|
"""
|
||||||
|
Join notifies the router that we want to receive and
|
||||||
|
forward messages in a topic. It is invoked after the
|
||||||
|
subscription announcement
|
||||||
|
:param topic: topic to join
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def leave(self, topic):
|
||||||
|
"""
|
||||||
|
Leave notifies the router that we are no longer interested in a topic.
|
||||||
|
It is invoked after the unsubscription announcement.
|
||||||
|
:param topic: topic to leave
|
||||||
|
"""
|
|
@ -11,14 +11,15 @@ class Mplex(IMuxedConn):
|
||||||
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
|
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, conn, generic_protocol_handler):
|
def __init__(self, conn, generic_protocol_handler, peer_id):
|
||||||
"""
|
"""
|
||||||
create a new muxed connection
|
create a new muxed connection
|
||||||
:param conn: an instance of raw connection
|
:param conn: an instance of raw connection
|
||||||
:param generic_protocol_handler: generic protocol handler
|
:param generic_protocol_handler: generic protocol handler
|
||||||
for new muxed streams
|
for new muxed streams
|
||||||
|
:param peer_id: peer_id of peer the connection is to
|
||||||
"""
|
"""
|
||||||
super(Mplex, self).__init__(conn, generic_protocol_handler)
|
super(Mplex, self).__init__(conn, generic_protocol_handler, peer_id)
|
||||||
|
|
||||||
self.raw_conn = conn
|
self.raw_conn = conn
|
||||||
self.initiator = conn.initiator
|
self.initiator = conn.initiator
|
||||||
|
@ -26,6 +27,9 @@ class Mplex(IMuxedConn):
|
||||||
# Store generic protocol handler
|
# Store generic protocol handler
|
||||||
self.generic_protocol_handler = generic_protocol_handler
|
self.generic_protocol_handler = generic_protocol_handler
|
||||||
|
|
||||||
|
# Set peer_id
|
||||||
|
self.peer_id = peer_id
|
||||||
|
|
||||||
# Mapping from stream ID -> buffer of messages for that stream
|
# Mapping from stream ID -> buffer of messages for that stream
|
||||||
self.buffers = {}
|
self.buffers = {}
|
||||||
|
|
||||||
|
@ -56,7 +60,7 @@ class Mplex(IMuxedConn):
|
||||||
# TODO: pass down timeout from user and use that
|
# TODO: pass down timeout from user and use that
|
||||||
if stream_id in self.buffers:
|
if stream_id in self.buffers:
|
||||||
try:
|
try:
|
||||||
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=3)
|
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=8)
|
||||||
return data
|
return data
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -7,12 +7,13 @@ class IMuxedConn(ABC):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def __init__(self, conn, generic_protocol_handler):
|
def __init__(self, conn, generic_protocol_handler, peer_id):
|
||||||
"""
|
"""
|
||||||
create a new muxed connection
|
create a new muxed connection
|
||||||
:param conn: an instance of raw connection
|
:param conn: an instance of raw connection
|
||||||
:param generic_protocol_handler: generic protocol handler
|
:param generic_protocol_handler: generic protocol handler
|
||||||
for new muxed streams
|
for new muxed streams
|
||||||
|
:param peer_id: peer_id of peer the connection is to
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
|
|
@ -17,11 +17,11 @@ class TransportUpgrader:
|
||||||
def upgrade_security(self):
|
def upgrade_security(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def upgrade_connection(self, conn, generic_protocol_handler):
|
def upgrade_connection(self, conn, generic_protocol_handler, peer_id):
|
||||||
"""
|
"""
|
||||||
upgrade raw connection to muxed connection
|
upgrade raw connection to muxed connection
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# For PoC, no security, default to mplex
|
# For PoC, no security, default to mplex
|
||||||
# TODO do exchange to determine multiplexer
|
# TODO do exchange to determine multiplexer
|
||||||
return Mplex(conn, generic_protocol_handler)
|
return Mplex(conn, generic_protocol_handler, peer_id)
|
||||||
|
|
|
@ -49,6 +49,7 @@ class MyNotifee(INotifee):
|
||||||
async def listen_close(self, network, _multiaddr):
|
async def listen_close(self, network, _multiaddr):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class InvalidNotifee():
|
class InvalidNotifee():
|
||||||
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
|
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
|
||||||
|
|
||||||
|
@ -70,6 +71,36 @@ class InvalidNotifee():
|
||||||
async def listen(self):
|
async def listen(self):
|
||||||
assert False
|
assert False
|
||||||
|
|
||||||
|
|
||||||
|
async def perform_two_host_simple_set_up():
|
||||||
|
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
|
||||||
|
async def my_stream_handler(stream):
|
||||||
|
while True:
|
||||||
|
read_string = (await stream.read()).decode()
|
||||||
|
|
||||||
|
resp = "ack:" + read_string
|
||||||
|
await stream.write(resp.encode())
|
||||||
|
|
||||||
|
node_b.set_stream_handler("/echo/1.0.0", my_stream_handler)
|
||||||
|
|
||||||
|
# Associate the peer with local ip address (see default parameters of Libp2p())
|
||||||
|
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
|
||||||
|
return node_a, node_b
|
||||||
|
|
||||||
|
|
||||||
|
async def perform_two_host_simple_set_up_custom_handler(handler):
|
||||||
|
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
|
||||||
|
node_b.set_stream_handler("/echo/1.0.0", handler)
|
||||||
|
|
||||||
|
# Associate the peer with local ip address (see default parameters of Libp2p())
|
||||||
|
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
|
||||||
|
return node_a, node_b
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_one_notifier():
|
async def test_one_notifier():
|
||||||
node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler)
|
node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler)
|
||||||
|
|
134
tests/pubsub/dummy_account_node.py
Normal file
134
tests/pubsub/dummy_account_node.py
Normal file
|
@ -0,0 +1,134 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.pubsub.message import create_message_talk
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
from libp2p.pubsub.message import MessageTalk
|
||||||
|
from libp2p.pubsub.message import generate_message_id
|
||||||
|
|
||||||
|
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||||
|
CRYPTO_TOPIC = "ethereum"
|
||||||
|
|
||||||
|
# Message format:
|
||||||
|
# Sending crypto: <source>,<dest>,<amount as integer>
|
||||||
|
# Ex. send,aspyn,alex,5
|
||||||
|
# Set crypto: <dest>,<amount as integer>
|
||||||
|
# Ex. set,rob,5
|
||||||
|
# Determine message type by looking at first item before first comma
|
||||||
|
|
||||||
|
class DummyAccountNode():
|
||||||
|
"""
|
||||||
|
Node which has an internal balance mapping, meant to serve as
|
||||||
|
a dummy crypto blockchain. There is no actual blockchain, just a simple
|
||||||
|
map indicating how much crypto each user in the mappings holds
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.balances = {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def create(cls):
|
||||||
|
"""
|
||||||
|
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
|
||||||
|
instance to this new node
|
||||||
|
|
||||||
|
We use create as this serves as a factory function and allows us
|
||||||
|
to use async await, unlike the init function
|
||||||
|
"""
|
||||||
|
self = DummyAccountNode()
|
||||||
|
|
||||||
|
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
|
||||||
|
self.libp2p_node = libp2p_node
|
||||||
|
|
||||||
|
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||||
|
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def handle_incoming_msgs(self):
|
||||||
|
"""
|
||||||
|
Handle all incoming messages on the CRYPTO_TOPIC from peers
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
message_raw = await self.q.get()
|
||||||
|
message = create_message_talk(message_raw)
|
||||||
|
contents = message.data
|
||||||
|
|
||||||
|
msg_comps = contents.split(",")
|
||||||
|
|
||||||
|
if msg_comps[0] == "send":
|
||||||
|
self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3]))
|
||||||
|
elif msg_comps[0] == "set":
|
||||||
|
self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2]))
|
||||||
|
|
||||||
|
async def setup_crypto_networking(self):
|
||||||
|
"""
|
||||||
|
Subscribe to CRYPTO_TOPIC and perform call to function that handles
|
||||||
|
all incoming messages on said topic
|
||||||
|
"""
|
||||||
|
self.q = await self.pubsub.subscribe(CRYPTO_TOPIC)
|
||||||
|
|
||||||
|
asyncio.ensure_future(self.handle_incoming_msgs())
|
||||||
|
|
||||||
|
async def publish_send_crypto(self, source_user, dest_user, amount):
|
||||||
|
"""
|
||||||
|
Create a send crypto message and publish that message to all other nodes
|
||||||
|
:param source_user: user to send crypto from
|
||||||
|
:param dest_user: user to send crypto to
|
||||||
|
:param amount: amount of crypto to send
|
||||||
|
"""
|
||||||
|
my_id = str(self.libp2p_node.get_id())
|
||||||
|
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
|
||||||
|
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
|
||||||
|
await self.floodsub.publish(my_id, msg.to_str())
|
||||||
|
|
||||||
|
async def publish_set_crypto(self, user, amount):
|
||||||
|
"""
|
||||||
|
Create a set crypto message and publish that message to all other nodes
|
||||||
|
:param user: user to set crypto for
|
||||||
|
:param amount: amount of crypto
|
||||||
|
"""
|
||||||
|
my_id = str(self.libp2p_node.get_id())
|
||||||
|
msg_contents = "set," + user + "," + str(amount)
|
||||||
|
msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
|
||||||
|
await self.floodsub.publish(my_id, msg.to_str())
|
||||||
|
|
||||||
|
def handle_send_crypto(self, source_user, dest_user, amount):
|
||||||
|
"""
|
||||||
|
Handle incoming send_crypto message
|
||||||
|
:param source_user: user to send crypto from
|
||||||
|
:param dest_user: user to send crypto to
|
||||||
|
:param amount: amount of crypto to send
|
||||||
|
"""
|
||||||
|
if source_user in self.balances:
|
||||||
|
self.balances[source_user] -= amount
|
||||||
|
else:
|
||||||
|
self.balances[source_user] = -amount
|
||||||
|
|
||||||
|
if dest_user in self.balances:
|
||||||
|
self.balances[dest_user] += amount
|
||||||
|
else:
|
||||||
|
self.balances[dest_user] = amount
|
||||||
|
|
||||||
|
def handle_set_crypto_for_user(self, dest_user, amount):
|
||||||
|
"""
|
||||||
|
Handle incoming set_crypto message
|
||||||
|
:param dest_user: user to set crypto for
|
||||||
|
:param amount: amount of crypto
|
||||||
|
"""
|
||||||
|
self.balances[dest_user] = amount
|
||||||
|
|
||||||
|
def get_balance(self, user):
|
||||||
|
"""
|
||||||
|
Get balance in crypto for a particular user
|
||||||
|
:param user: user to get balance for
|
||||||
|
:return: balance of user
|
||||||
|
"""
|
||||||
|
if user in self.balances:
|
||||||
|
return self.balances[user]
|
||||||
|
else:
|
||||||
|
return -1
|
||||||
|
|
189
tests/pubsub/test_dummyaccount_demo.py
Normal file
189
tests/pubsub/test_dummyaccount_demo.py
Normal file
|
@ -0,0 +1,189 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from threading import Thread
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
from libp2p.pubsub.message import MessageTalk
|
||||||
|
from libp2p.pubsub.message import create_message_talk
|
||||||
|
from dummy_account_node import DummyAccountNode
|
||||||
|
|
||||||
|
# pylint: disable=too-many-locals
|
||||||
|
|
||||||
|
async def connect(node1, node2):
|
||||||
|
# node1 connects to node2
|
||||||
|
addr = node2.get_addrs()[0]
|
||||||
|
info = info_from_p2p_addr(addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
def create_setup_in_new_thread_func(dummy_node):
|
||||||
|
def setup_in_new_thread():
|
||||||
|
asyncio.ensure_future(dummy_node.setup_crypto_networking())
|
||||||
|
return setup_in_new_thread
|
||||||
|
|
||||||
|
async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
|
||||||
|
"""
|
||||||
|
Helper function to allow for easy construction of custom tests for dummy account nodes
|
||||||
|
in various network topologies
|
||||||
|
:param num_nodes: number of nodes in the test
|
||||||
|
:param adjacency_map: adjacency map defining each node and its list of neighbors
|
||||||
|
:param action_func: function to execute that includes actions by the nodes,
|
||||||
|
such as send crypto and set crypto
|
||||||
|
:param assertion_func: assertions for testing the results of the actions are correct
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create nodes
|
||||||
|
dummy_nodes = []
|
||||||
|
for i in range(num_nodes):
|
||||||
|
dummy_nodes.append(await DummyAccountNode.create())
|
||||||
|
|
||||||
|
# Create network
|
||||||
|
for source_num in adjacency_map:
|
||||||
|
target_nums = adjacency_map[source_num]
|
||||||
|
for target_num in target_nums:
|
||||||
|
await connect(dummy_nodes[source_num].libp2p_node, \
|
||||||
|
dummy_nodes[target_num].libp2p_node)
|
||||||
|
|
||||||
|
# Allow time for network creation to take place
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
# Start a thread for each node so that each node can listen and respond
|
||||||
|
# to messages on its own thread, which will avoid waiting indefinitely
|
||||||
|
# on the main thread. On this thread, call the setup func for the node,
|
||||||
|
# which subscribes the node to the CRYPTO_TOPIC topic
|
||||||
|
for dummy_node in dummy_nodes:
|
||||||
|
thread = Thread(target=create_setup_in_new_thread_func(dummy_node))
|
||||||
|
thread.run()
|
||||||
|
|
||||||
|
# Allow time for nodes to subscribe to CRYPTO_TOPIC topic
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
# Perform action function
|
||||||
|
await action_func(dummy_nodes)
|
||||||
|
|
||||||
|
# Allow time for action function to be performed (i.e. messages to propogate)
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
# Perform assertion function
|
||||||
|
for dummy_node in dummy_nodes:
|
||||||
|
assertion_func(dummy_node)
|
||||||
|
|
||||||
|
# Success, terminate pending tasks.
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes():
|
||||||
|
num_nodes = 2
|
||||||
|
adj_map = {0: [1]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("aspyn", 10)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 10
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_three_nodes_line_topography():
|
||||||
|
num_nodes = 3
|
||||||
|
adj_map = {0: [1], 1: [2]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("aspyn", 10)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 10
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_three_nodes_triangle_topography():
|
||||||
|
num_nodes = 3
|
||||||
|
adj_map = {0: [1, 2], 1: [2]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 20
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_seven_nodes_tree_topography():
|
||||||
|
num_nodes = 7
|
||||||
|
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 20
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_set_then_send_from_root_seven_nodes_tree_topography():
|
||||||
|
num_nodes = 7
|
||||||
|
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 15
|
||||||
|
assert dummy_node.get_balance("alex") == 5
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography():
|
||||||
|
num_nodes = 7
|
||||||
|
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[6].publish_set_crypto("aspyn", 20)
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 15
|
||||||
|
assert dummy_node.get_balance("alex") == 5
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_five_nodes_ring_topography():
|
||||||
|
num_nodes = 5
|
||||||
|
adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("aspyn", 20)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("aspyn") == 20
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography():
|
||||||
|
num_nodes = 5
|
||||||
|
adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_set_crypto("alex", 20)
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
await dummy_nodes[3].publish_send_crypto("alex", "rob", 12)
|
||||||
|
|
||||||
|
def assertion_func(dummy_node):
|
||||||
|
assert dummy_node.get_balance("alex") == 8
|
||||||
|
assert dummy_node.get_balance("rob") == 12
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
486
tests/pubsub/test_floodsub.py
Normal file
486
tests/pubsub/test_floodsub.py
Normal file
|
@ -0,0 +1,486 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
from libp2p.pubsub.message import MessageTalk
|
||||||
|
from libp2p.pubsub.message import create_message_talk
|
||||||
|
from libp2p.pubsub.message import generate_message_id
|
||||||
|
|
||||||
|
# pylint: disable=too-many-locals
|
||||||
|
|
||||||
|
async def connect(node1, node2):
|
||||||
|
"""
|
||||||
|
Connect node1 to node2
|
||||||
|
"""
|
||||||
|
addr = node2.get_addrs()[0]
|
||||||
|
info = info_from_p2p_addr(addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes():
|
||||||
|
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
|
||||||
|
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
|
||||||
|
supported_protocols = ["/floodsub/1.0.0"]
|
||||||
|
|
||||||
|
floodsub_a = FloodSub(supported_protocols)
|
||||||
|
pubsub_a = Pubsub(node_a, floodsub_a, "a")
|
||||||
|
floodsub_b = FloodSub(supported_protocols)
|
||||||
|
pubsub_b = Pubsub(node_b, floodsub_b, "b")
|
||||||
|
|
||||||
|
await connect(node_a, node_b)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
qb = await pubsub_b.subscribe("my_topic")
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
node_a_id = str(node_a.get_id())
|
||||||
|
|
||||||
|
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
|
||||||
|
|
||||||
|
await floodsub_a.publish(node_a.get_id(), msg.to_str())
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
res_b = await qb.get()
|
||||||
|
|
||||||
|
# Check that the msg received by node_b is the same
|
||||||
|
# as the message sent by node_a
|
||||||
|
assert res_b == msg.to_str()
|
||||||
|
|
||||||
|
# Success, terminate pending tasks.
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_three_nodes():
|
||||||
|
# Want to pass message from A -> B -> C
|
||||||
|
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
|
||||||
|
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
|
||||||
|
supported_protocols = ["/floodsub/1.0.0"]
|
||||||
|
|
||||||
|
floodsub_a = FloodSub(supported_protocols)
|
||||||
|
pubsub_a = Pubsub(node_a, floodsub_a, "a")
|
||||||
|
floodsub_b = FloodSub(supported_protocols)
|
||||||
|
pubsub_b = Pubsub(node_b, floodsub_b, "b")
|
||||||
|
floodsub_c = FloodSub(supported_protocols)
|
||||||
|
pubsub_c = Pubsub(node_c, floodsub_c, "c")
|
||||||
|
|
||||||
|
await connect(node_a, node_b)
|
||||||
|
await connect(node_b, node_c)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
qb = await pubsub_b.subscribe("my_topic")
|
||||||
|
qc = await pubsub_c.subscribe("my_topic")
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
node_a_id = str(node_a.get_id())
|
||||||
|
|
||||||
|
msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id())
|
||||||
|
|
||||||
|
await floodsub_a.publish(node_a.get_id(), msg.to_str())
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
res_b = await qb.get()
|
||||||
|
res_c = await qc.get()
|
||||||
|
|
||||||
|
# Check that the msg received by node_b is the same
|
||||||
|
# as the message sent by node_a
|
||||||
|
assert res_b == msg.to_str()
|
||||||
|
|
||||||
|
# res_c should match original msg but with b as sender
|
||||||
|
node_b_id = str(node_b.get_id())
|
||||||
|
msg.from_id = node_b_id
|
||||||
|
|
||||||
|
assert res_c == msg.to_str()
|
||||||
|
|
||||||
|
# Success, terminate pending tasks.
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
|
async def perform_test_from_obj(obj):
|
||||||
|
"""
|
||||||
|
Perform a floodsub test from a test obj.
|
||||||
|
test obj are composed as follows:
|
||||||
|
|
||||||
|
{
|
||||||
|
"supported_protocols": ["supported/protocol/1.0.0",...],
|
||||||
|
"adj_list": {
|
||||||
|
"node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...],
|
||||||
|
"node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...],
|
||||||
|
...
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["topic1_for_message", "topic2_for_message", ...],
|
||||||
|
"data": "some contents of the message (newlines are not supported)",
|
||||||
|
"node_id": "message sender node id"
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
}
|
||||||
|
NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A
|
||||||
|
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
|
||||||
|
is undefined (even if it may work)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Step 1) Create graph
|
||||||
|
adj_list = obj["adj_list"]
|
||||||
|
node_map = {}
|
||||||
|
floodsub_map = {}
|
||||||
|
pubsub_map = {}
|
||||||
|
|
||||||
|
supported_protocols = obj["supported_protocols"]
|
||||||
|
|
||||||
|
tasks_connect = []
|
||||||
|
for start_node_id in adj_list:
|
||||||
|
# Create node if node does not yet exist
|
||||||
|
if start_node_id not in node_map:
|
||||||
|
node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
|
||||||
|
node_map[start_node_id] = node
|
||||||
|
|
||||||
|
floodsub = FloodSub(supported_protocols)
|
||||||
|
floodsub_map[start_node_id] = floodsub
|
||||||
|
pubsub = Pubsub(node, floodsub, start_node_id)
|
||||||
|
pubsub_map[start_node_id] = pubsub
|
||||||
|
|
||||||
|
# For each neighbor of start_node, create if does not yet exist,
|
||||||
|
# then connect start_node to neighbor
|
||||||
|
for neighbor_id in adj_list[start_node_id]:
|
||||||
|
# Create neighbor if neighbor does not yet exist
|
||||||
|
if neighbor_id not in node_map:
|
||||||
|
neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
|
||||||
|
node_map[neighbor_id] = neighbor_node
|
||||||
|
|
||||||
|
floodsub = FloodSub(supported_protocols)
|
||||||
|
floodsub_map[neighbor_id] = floodsub
|
||||||
|
pubsub = Pubsub(neighbor_node, floodsub, neighbor_id)
|
||||||
|
pubsub_map[neighbor_id] = pubsub
|
||||||
|
|
||||||
|
# Connect node and neighbor
|
||||||
|
# await connect(node_map[start_node_id], node_map[neighbor_id])
|
||||||
|
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
|
||||||
|
tasks_connect.append(asyncio.sleep(2))
|
||||||
|
await asyncio.gather(*tasks_connect)
|
||||||
|
|
||||||
|
# Allow time for graph creation before continuing
|
||||||
|
# await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
# Step 2) Subscribe to topics
|
||||||
|
queues_map = {}
|
||||||
|
topic_map = obj["topic_map"]
|
||||||
|
|
||||||
|
tasks_topic = []
|
||||||
|
tasks_topic_data = []
|
||||||
|
for topic in topic_map:
|
||||||
|
for node_id in topic_map[topic]:
|
||||||
|
"""
|
||||||
|
# Subscribe node to topic
|
||||||
|
q = await pubsub_map[node_id].subscribe(topic)
|
||||||
|
|
||||||
|
# Create topic-queue map for node_id if one does not yet exist
|
||||||
|
if node_id not in queues_map:
|
||||||
|
queues_map[node_id] = {}
|
||||||
|
|
||||||
|
# Store queue in topic-queue map for node
|
||||||
|
queues_map[node_id][topic] = q
|
||||||
|
"""
|
||||||
|
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
|
||||||
|
tasks_topic_data.append((node_id, topic))
|
||||||
|
tasks_topic.append(asyncio.sleep(2))
|
||||||
|
|
||||||
|
# Gather is like Promise.all
|
||||||
|
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
|
||||||
|
for i in range(len(responses) - 1):
|
||||||
|
q = responses[i]
|
||||||
|
node_id, topic = tasks_topic_data[i]
|
||||||
|
if node_id not in queues_map:
|
||||||
|
queues_map[node_id] = {}
|
||||||
|
|
||||||
|
# Store queue in topic-queue map for node
|
||||||
|
queues_map[node_id][topic] = q
|
||||||
|
|
||||||
|
# Allow time for subscribing before continuing
|
||||||
|
# await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
# Step 3) Publish messages
|
||||||
|
topics_in_msgs_ordered = []
|
||||||
|
messages = obj["messages"]
|
||||||
|
tasks_publish = []
|
||||||
|
for msg in messages:
|
||||||
|
topics = msg["topics"]
|
||||||
|
|
||||||
|
data = msg["data"]
|
||||||
|
node_id = msg["node_id"]
|
||||||
|
|
||||||
|
# Get actual id for sender node (not the id from the test obj)
|
||||||
|
actual_node_id = str(node_map[node_id].get_id())
|
||||||
|
|
||||||
|
# Create correctly formatted message
|
||||||
|
msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id())
|
||||||
|
|
||||||
|
# Publish message
|
||||||
|
# await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())
|
||||||
|
tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str())))
|
||||||
|
|
||||||
|
# For each topic in topics, add topic, msg_talk tuple to ordered test list
|
||||||
|
# TODO: Update message sender to be correct message sender before
|
||||||
|
# adding msg_talk to this list
|
||||||
|
for topic in topics:
|
||||||
|
topics_in_msgs_ordered.append((topic, msg_talk))
|
||||||
|
|
||||||
|
# Allow time for publishing before continuing
|
||||||
|
# await asyncio.sleep(0.4)
|
||||||
|
tasks_publish.append(asyncio.sleep(2))
|
||||||
|
await asyncio.gather(*tasks_publish)
|
||||||
|
|
||||||
|
# Step 4) Check that all messages were received correctly.
|
||||||
|
# TODO: Check message sender too
|
||||||
|
for i in range(len(topics_in_msgs_ordered)):
|
||||||
|
topic, actual_msg = topics_in_msgs_ordered[i]
|
||||||
|
for node_id in topic_map[topic]:
|
||||||
|
# Get message from subscription queue
|
||||||
|
msg_on_node_str = await queues_map[node_id][topic].get()
|
||||||
|
msg_on_node = create_message_talk(msg_on_node_str)
|
||||||
|
|
||||||
|
# Perform checks
|
||||||
|
assert actual_msg.origin_id == msg_on_node.origin_id
|
||||||
|
assert actual_msg.topics == msg_on_node.topics
|
||||||
|
assert actual_msg.data == msg_on_node.data
|
||||||
|
|
||||||
|
# Success, terminate pending tasks.
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"A": ["B"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"topic1": ["B"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["topic1"],
|
||||||
|
"data": "foo",
|
||||||
|
"node_id": "A"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_three_nodes_two_topics_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"A": ["B"],
|
||||||
|
"B": ["C"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"topic1": ["B", "C"],
|
||||||
|
"topic2": ["B", "C"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["topic1"],
|
||||||
|
"data": "foo",
|
||||||
|
"node_id": "A"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["topic2"],
|
||||||
|
"data": "Alex is tall",
|
||||||
|
"node_id": "A"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"A": ["B"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"topic1": ["B"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["topic1"],
|
||||||
|
"data": "Alex is tall",
|
||||||
|
"node_id": "B"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_two_nodes_one_topic_two_msgs_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"A": ["B"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"topic1": ["B"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["topic1"],
|
||||||
|
"data": "Alex is tall",
|
||||||
|
"node_id": "B"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["topic1"],
|
||||||
|
"data": "foo",
|
||||||
|
"node_id": "A"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_seven_nodes_tree_one_topics_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"1": ["2", "3"],
|
||||||
|
"2": ["4", "5"],
|
||||||
|
"3": ["6", "7"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"astrophysics": ["2", "3", "4", "5", "6", "7"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["astrophysics"],
|
||||||
|
"data": "e=mc^2",
|
||||||
|
"node_id": "1"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_seven_nodes_tree_three_topics_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"1": ["2", "3"],
|
||||||
|
"2": ["4", "5"],
|
||||||
|
"3": ["6", "7"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"astrophysics": ["2", "3", "4", "5", "6", "7"],
|
||||||
|
"space": ["2", "3", "4", "5", "6", "7"],
|
||||||
|
"onions": ["2", "3", "4", "5", "6", "7"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["astrophysics"],
|
||||||
|
"data": "e=mc^2",
|
||||||
|
"node_id": "1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["space"],
|
||||||
|
"data": "foobar",
|
||||||
|
"node_id": "1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["onions"],
|
||||||
|
"data": "I am allergic",
|
||||||
|
"node_id": "1"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_seven_nodes_tree_three_topics_diff_origin_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"1": ["2", "3"],
|
||||||
|
"2": ["4", "5"],
|
||||||
|
"3": ["6", "7"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"astrophysics": ["1", "2", "3", "4", "5", "6", "7"],
|
||||||
|
"space": ["1", "2", "3", "4", "5", "6", "7"],
|
||||||
|
"onions": ["1", "2", "3", "4", "5", "6", "7"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["astrophysics"],
|
||||||
|
"data": "e=mc^2",
|
||||||
|
"node_id": "1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["space"],
|
||||||
|
"data": "foobar",
|
||||||
|
"node_id": "4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["onions"],
|
||||||
|
"data": "I am allergic",
|
||||||
|
"node_id": "7"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_three_nodes_clique_two_topic_diff_origin_test_obj():
|
||||||
|
test_obj = {
|
||||||
|
"supported_protocols": ["/floodsub/1.0.0"],
|
||||||
|
"adj_list": {
|
||||||
|
"1": ["2", "3"],
|
||||||
|
"2": ["3"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"astrophysics": ["1", "2", "3"],
|
||||||
|
"school": ["1", "2", "3"]
|
||||||
|
},
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"topics": ["astrophysics"],
|
||||||
|
"data": "e=mc^2",
|
||||||
|
"node_id": "1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["school"],
|
||||||
|
"data": "foobar",
|
||||||
|
"node_id": "2"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"topics": ["astrophysics"],
|
||||||
|
"data": "I am allergic",
|
||||||
|
"node_id": "1"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
await perform_test_from_obj(test_obj)
|
Loading…
Reference in New Issue
Block a user