diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index f2943ba..b30567b 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -70,7 +70,8 @@ class Swarm(INetwork): raw_conn = await self.transport.dial(multiaddr, self.self_id) # Use upgrader to upgrade raw conn to muxed conn - muxed_conn = self.upgrader.upgrade_connection(raw_conn, self.generic_protocol_handler) + muxed_conn = self.upgrader.upgrade_connection(raw_conn, \ + self.generic_protocol_handler, peer_id) # Store muxed connection in connections self.connections[peer_id] = muxed_conn @@ -145,7 +146,7 @@ class Swarm(INetwork): raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'), multiaddr.value_for_protocol('tcp'), reader, writer, False) muxed_conn = self.upgrader.upgrade_connection(raw_conn, \ - self.generic_protocol_handler) + self.generic_protocol_handler, peer_id) # Store muxed_conn with peer id self.connections[peer_id] = muxed_conn @@ -197,14 +198,17 @@ def create_generic_protocol_handler(swarm): async def generic_protocol_handler(muxed_stream): # Perform protocol muxing to determine protocol to use - _, handler = await multiselect.negotiate(muxed_stream) + protocol, handler = await multiselect.negotiate(muxed_stream) + + net_stream = NetStream(muxed_stream) + net_stream.set_protocol(protocol) # Call notifiers since event occurred for notifee in swarm.notifees: - await notifee.opened_stream(swarm, muxed_stream) + await notifee.opened_stream(swarm, net_stream) # Give to stream handler - asyncio.ensure_future(handler(muxed_stream)) + asyncio.ensure_future(handler(net_stream)) return generic_protocol_handler diff --git a/libp2p/pubsub/__init__.py b/libp2p/pubsub/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py new file mode 100644 index 0000000..68e542c --- /dev/null +++ b/libp2p/pubsub/floodsub.py @@ -0,0 +1,98 @@ +from .pubsub_router_interface import IPubsubRouter +from .message import create_message_talk + +class FloodSub(IPubsubRouter): + + def __init__(self, protocols): + self.protocols = protocols + self.pubsub = None + + def get_protocols(self): + """ + :return: the list of protocols supported by the router + """ + return self.protocols + + def attach(self, pubsub): + """ + Attach is invoked by the PubSub constructor to attach the router to a + freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to + """ + self.pubsub = pubsub + + def add_peer(self, peer_id, protocol_id): + """ + Notifies the router that a new peer has been connected + :param peer_id: id of peer to add + """ + + def remove_peer(self, peer_id): + """ + Notifies the router that a peer has been disconnected + :param peer_id: id of peer to remove + """ + + def handle_rpc(self, rpc): + """ + Invoked to process control messages in the RPC envelope. + It is invoked after subscriptions and payload messages have been processed + :param rpc: rpc message + """ + + async def publish(self, sender_peer_id, message): + """ + Invoked to forward a new message that has been validated. + This is where the "flooding" part of floodsub happens + + With flooding, routing is almost trivial: for each incoming message, + forward to all known peers in the topic. There is a bit of logic, + as the router maintains a timed cache of previous messages, + so that seen messages are not further forwarded. + It also never forwards a message back to the source + or the peer that forwarded the message. + :param sender_peer_id: peer_id of message sender + :param message: message to forward + """ + + # Encode message + encoded_msg = message.encode() + + # Get message sender, origin, and topics + msg_talk = create_message_talk(message) + msg_sender = str(sender_peer_id) + msg_origin = msg_talk.origin_id + topics = msg_talk.topics + + # Deliver to self if self was origin + # Note: handle_talk checks if self is subscribed to topics in message + if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()): + await self.pubsub.handle_talk(message) + + # Deliver to self and peers + for topic in topics: + if topic in self.pubsub.peer_topics: + for peer_id_in_topic in self.pubsub.peer_topics[topic]: + # Forward to all known peers in the topic that are not the + # message sender and are not the message origin + if peer_id_in_topic not in (msg_sender, msg_origin): + stream = self.pubsub.peers[peer_id_in_topic] + await stream.write(encoded_msg) + else: + # Implies publish did not write + print("publish did not write") + + def join(self, topic): + """ + Join notifies the router that we want to receive and + forward messages in a topic. It is invoked after the + subscription announcement + :param topic: topic to join + """ + + def leave(self, topic): + """ + Leave notifies the router that we are no longer interested in a topic. + It is invoked after the unsubscription announcement. + :param topic: topic to leave + """ diff --git a/libp2p/pubsub/message.py b/libp2p/pubsub/message.py new file mode 100644 index 0000000..2f839dc --- /dev/null +++ b/libp2p/pubsub/message.py @@ -0,0 +1,118 @@ +import uuid + + +class MessageTalk(): + + """ + Object to make parsing talk messages easier, where talk messages are + defined as custom messages published to a set of topics + """ + # pylint: disable=too-few-public-methods + def __init__(self, from_id, origin_id, topics, data, message_id): + # pylint: disable=too-many-arguments + self.msg_type = "talk" + self.from_id = from_id + self.origin_id = origin_id + self.topics = topics + self.data = data + self.message_id = message_id + + def to_str(self): + """ + Convert to string + :return: MessageTalk object in string representation + """ + out = self.msg_type + '\n' + out += self.from_id + '\n' + out += self.origin_id + '\n' + out += self.message_id + '\n' + for i in range(len(self.topics)): + out += self.topics[i] + if i < len(self.topics) - 1: + out += ',' + out += '\n' + self.data + return out + + +class MessageSub(): + """ + Object to make parsing subscription messages easier, where subscription + messages are defined as indicating the topics a node wishes to subscribe to + or unsubscribe from + """ + # pylint: disable=too-few-public-methods + def __init__(self, from_id, origin_id, subs_map, message_id): + self.msg_type = "subscription" + self.from_id = from_id + self.origin_id = origin_id + self.subs_map = subs_map + self.message_id = message_id + + def to_str(self): + """ + Convert to string + :return: MessageSub object in string representation + """ + out = self.msg_type + '\n' + out += self.from_id + '\n' + out += self.origin_id + '\n' + out += self.message_id + + if self.subs_map: + out += '\n' + + keys = list(self.subs_map) + + for i, topic in enumerate(keys): + sub = self.subs_map[topic] + if sub: + out += "sub:" + else: + out += "unsub:" + out += topic + if i < len(keys) - 1: + out += '\n' + + return out + +def create_message_talk(msg_talk_as_str): + """ + Create a MessageTalk object from a MessageTalk string representation + :param msg_talk_as_str: a MessageTalk object in its string representation + :return: MessageTalk object + """ + msg_comps = msg_talk_as_str.split('\n') + from_id = msg_comps[1] + origin_id = msg_comps[2] + message_id = msg_comps[3] + topics = msg_comps[4].split(',') + data = msg_comps[5] + return MessageTalk(from_id, origin_id, topics, data, message_id) + +def create_message_sub(msg_sub_as_str): + """ + Create a MessageSub object from a MessageSub string representation + :param msg_talk_as_str: a MessageSub object in its string representation + :return: MessageSub object + """ + msg_comps = msg_sub_as_str.split('\n') + from_id = msg_comps[1] + origin_id = msg_comps[2] + message_id = msg_comps[3] + + subs_map = {} + for i in range(4, len(msg_comps)): + sub_comps = msg_comps[i].split(":") + topic = sub_comps[1] + if sub_comps[0] == "sub": + subs_map[topic] = True + else: + subs_map[topic] = False + return MessageSub(from_id, origin_id, subs_map, message_id) + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py new file mode 100644 index 0000000..9bd072f --- /dev/null +++ b/libp2p/pubsub/pubsub.py @@ -0,0 +1,294 @@ +import asyncio + +from .pubsub_notifee import PubsubNotifee +from .message import MessageSub +from .message import create_message_talk, create_message_sub +from. message import generate_message_id + + +class Pubsub(): + """ + For now, because I'm on a plane and don't have access to the go repo/protobuf stuff, + this is going to be the message format for the two types: subscription and talk + subscription indicates subscribing or unsubscribing from a topic + talk is sending a message on topic(s) + subscription format: + subscription + 'from' + :'topicid' + :'topicid' + ... + Ex. + subscription + msg_sender_peer_id + origin_peer_id + sub:topic1 + sub:topic2 + unsub:fav_topic + talk format: + talk + 'from' + 'origin' + [topic_ids comma-delimited] + 'data' + Ex. + talk + msg_sender_peer_id + origin_peer_id + topic1,topics_are_cool,foo + I like tacos + """ + # pylint: disable=too-many-instance-attributes + + def __init__(self, host, router, my_id): + """ + Construct a new Pubsub object, which is responsible for handling all + Pubsub-related messages and relaying messages as appropriate to the + Pubsub router (which is responsible for choosing who to send messages to). + Since the logic for choosing peers to send pubsub messages to is + in the router, the same Pubsub impl can back floodsub, gossipsub, etc. + """ + self.host = host + self.router = router + self.my_id = my_id + + # Attach this new Pubsub object to the router + self.router.attach(self) + + # Register a notifee + self.peer_queue = asyncio.Queue() + self.host.get_network().notify(PubsubNotifee(self.peer_queue)) + + # Register stream handlers for each pubsub router protocol to handle + # the pubsub streams opened on those protocols + self.protocols = self.router.get_protocols() + for protocol in self.protocols: + self.host.set_stream_handler(protocol, self.stream_handler) + + # TODO: determine if these need to be asyncio queues, or if could possibly + # be ordinary blocking queues + self.incoming_msgs_from_peers = asyncio.Queue() + self.outgoing_messages = asyncio.Queue() + + # TODO: Make seen_messages a cache (LRU cache?) + self.seen_messages = [] + + # Map of topics we are subscribed to to handler functions + # for when the given topic receives a message + self.my_topics = {} + + # Map of topic to peers to keep track of what peers are subscribed to + self.peer_topics = {} + + # Create peers map, which maps peer_id (as string) to stream (to a given peer) + self.peers = {} + + # Call handle peer to keep waiting for updates to peer queue + asyncio.ensure_future(self.handle_peer_queue()) + + def get_hello_packet(self): + """ + Generate subscription message with all topics we are subscribed to + """ + subs_map = {} + for topic in self.my_topics: + subs_map[topic] = True + sub_msg = MessageSub( + str(self.host.get_id()),\ + str(self.host.get_id()), subs_map, generate_message_id()\ + ) + return sub_msg.to_str() + + async def continously_read_stream(self, stream): + """ + Read from input stream in an infinite loop. Process + messages from other nodes, which for now are considered MessageTalk + and MessageSub messages. + TODO: Handle RPC messages instead of my Aspyn's own custom message format + :param stream: stream to continously read from + """ + while True: + incoming = (await stream.read()).decode() + msg_comps = incoming.split('\n') + msg_type = msg_comps[0] + + msg_sender = msg_comps[1] + # msg_origin = msg_comps[2] + msg_id = msg_comps[3] + print("HIT ME1") + if msg_id not in self.seen_messages: + print("HIT ME") + # Do stuff with incoming unseen message + should_publish = True + if msg_type == "subscription": + self.handle_subscription(incoming) + + # We don't need to relay the subscription to our + # peers because a given node only needs its peers + # to know that it is subscribed to the topic (doesn't + # need everyone to know) + should_publish = False + elif msg_type == "talk": + await self.handle_talk(incoming) + + # Add message id to seen + self.seen_messages.append(msg_id) + + # Publish message using router's publish + if should_publish: + msg = create_message_talk(incoming) + + # Adjust raw_msg to that the message sender + # is now our peer_id + msg.from_id = str(self.host.get_id()) + + await self.router.publish(msg_sender, msg.to_str()) + + # Force context switch + await asyncio.sleep(0) + + async def stream_handler(self, stream): + """ + Stream handler for pubsub. Gets invoked whenever a new stream is created + on one of the supported pubsub protocols. + :param stream: newly created stream + """ + # Add peer + # Map peer to stream + peer_id = stream.mplex_conn.peer_id + self.peers[str(peer_id)] = stream + self.router.add_peer(peer_id, stream.get_protocol()) + + # Send hello packet + hello = self.get_hello_packet() + await stream.write(hello.encode()) + # Pass stream off to stream reader + asyncio.ensure_future(self.continously_read_stream(stream)) + + async def handle_peer_queue(self): + """ + Continuously read from peer queue and each time a new peer is found, + open a stream to the peer using a supported pubsub protocol + TODO: Handle failure for when the peer does not support any of the + pubsub protocols we support + """ + while True: + peer_id = await self.peer_queue.get() + + # Open a stream to peer on existing connection + # (we know connection exists since that's the only way + # an element gets added to peer_queue) + stream = await self.host.new_stream(peer_id, self.protocols) + + # Add Peer + # Map peer to stream + self.peers[str(peer_id)] = stream + self.router.add_peer(peer_id, stream.get_protocol()) + + # Send hello packet + hello = self.get_hello_packet() + await stream.write(hello.encode()) + + # Pass stream off to stream reader + asyncio.ensure_future(self.continously_read_stream(stream)) + + # Force context switch + await asyncio.sleep(0) + + def handle_subscription(self, subscription): + """ + Handle an incoming subscription message from a peer. Update internal + mapping to mark the peer as subscribed or unsubscribed to topics as + defined in the subscription message + :param subscription: raw data constituting a subscription message + """ + sub_msg = create_message_sub(subscription) + if sub_msg.subs_map: + print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) + for topic_id in sub_msg.subs_map: + # Look at each subscription in the msg individually + if sub_msg.subs_map[topic_id]: + if topic_id not in self.peer_topics: + # Create topic list if it did not yet exist + self.peer_topics[topic_id] = [sub_msg.origin_id] + elif sub_msg.origin_id not in self.peer_topics[topic_id]: + # Add peer to topic + self.peer_topics[topic_id].append(sub_msg.origin_id) + else: + # TODO: Remove peer from topic + pass + + async def handle_talk(self, talk): + """ + Handle incoming Talk message from a peer. A Talk message contains some + custom message that is published on a given topic(s) + :param talk: raw data constituting a talk message + """ + msg = create_message_talk(talk) + + # Check if this message has any topics that we are subscribed to + for topic in msg.topics: + if topic in self.my_topics: + # we are subscribed to a topic this message was sent for, + # so add message to the subscription output queue + # for each topic + await self.my_topics[topic].put(talk) + + async def subscribe(self, topic_id): + """ + Subscribe ourself to a topic + :param topic_id: topic_id to subscribe to + """ + # Map topic_id to blocking queue + self.my_topics[topic_id] = asyncio.Queue() + + # Create subscribe message + sub_msg = MessageSub( + str(self.host.get_id()),\ + str(self.host.get_id()), {topic_id: True}, generate_message_id()\ + ) + + # Send out subscribe message to all peers + await self.message_all_peers(sub_msg.to_str()) + + # Tell router we are joining this topic + self.router.join(topic_id) + + # Return the asyncio queue for messages on this topic + return self.my_topics[topic_id] + + async def unsubscribe(self, topic_id): + """ + Unsubscribe ourself from a topic + :param topic_id: topic_id to unsubscribe from + """ + + # Remove topic_id from map if present + if topic_id in self.my_topics: + del self.my_topics[topic_id] + + # Create unsubscribe message + unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\ + {topic_id: False}, generate_message_id()) + + # Send out unsubscribe message to all peers + await self.message_all_peers(unsub_msg.to_str()) + + # Tell router we are leaving this topic + self.router.leave(topic_id) + + async def message_all_peers(self, raw_msg): + """ + Broadcast a message to peers + :param raw_msg: raw contents of the message to broadcast + """ + + # Encode message for sending + encoded_msg = raw_msg.encode() + + # Broadcast message + for peer in self.peers: + stream = self.peers[peer] + + # Write message to stream + await stream.write(encoded_msg) diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py new file mode 100644 index 0000000..4173bd8 --- /dev/null +++ b/libp2p/pubsub/pubsub_notifee.py @@ -0,0 +1,40 @@ +from libp2p.network.notifee_interface import INotifee + + +class PubsubNotifee(INotifee): + # pylint: disable=too-many-instance-attributes, cell-var-from-loop + + def __init__(self, initiator_peers_queue): + """ + :param initiator_peers_queue: queue to add new peers to so that pubsub + can process new peers after we connect to them + """ + self.initiator_peers_queue = initiator_peers_queue + + async def opened_stream(self, network, stream): + pass + + async def closed_stream(self, network, stream): + pass + + async def connected(self, network, conn): + """ + Add peer_id to initiator_peers_queue, so that this peer_id can be used to + create a stream and we only want to have one pubsub stream with each peer. + :param network: network the connection was opened on + :param conn: connection that was opened + """ + + # Only add peer_id if we are initiator (otherwise we would end up + # with two pubsub streams between us and the peer) + if conn.initiator: + await self.initiator_peers_queue.put(conn.peer_id) + + async def disconnected(self, network, conn): + pass + + async def listen(self, network, multiaddr): + pass + + async def listen_close(self, network, multiaddr): + pass diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py new file mode 100644 index 0000000..727b39e --- /dev/null +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -0,0 +1,64 @@ +from abc import ABC, abstractmethod + +class IPubsubRouter(ABC): + + @abstractmethod + def get_protocols(self): + """ + :return: the list of protocols supported by the router + """ + + @abstractmethod + def attach(self, pubsub): + """ + Attach is invoked by the PubSub constructor to attach the router to a + freshly initialized PubSub instance. + :param pubsub: pubsub instance to attach to + """ + + @abstractmethod + def add_peer(self, peer_id, protocol_id): + """ + Notifies the router that a new peer has been connected + :param peer_id: id of peer to add + """ + + @abstractmethod + def remove_peer(self, peer_id): + """ + Notifies the router that a peer has been disconnected + :param peer_id: id of peer to remove + """ + + @abstractmethod + def handle_rpc(self, rpc): + """ + Invoked to process control messages in the RPC envelope. + It is invoked after subscriptions and payload messages have been processed + :param rpc: rpc message + """ + + @abstractmethod + def publish(self, sender_peer_id, message): + """ + Invoked to forward a new message that has been validated + :param sender_peer_id: peer_id of message sender + :param message: message to forward + """ + + @abstractmethod + def join(self, topic): + """ + Join notifies the router that we want to receive and + forward messages in a topic. It is invoked after the + subscription announcement + :param topic: topic to join + """ + + @abstractmethod + def leave(self, topic): + """ + Leave notifies the router that we are no longer interested in a topic. + It is invoked after the unsubscription announcement. + :param topic: topic to leave + """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 135ee1e..0d587b5 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -11,14 +11,15 @@ class Mplex(IMuxedConn): reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ - def __init__(self, conn, generic_protocol_handler): + def __init__(self, conn, generic_protocol_handler, peer_id): """ create a new muxed connection :param conn: an instance of raw connection :param generic_protocol_handler: generic protocol handler for new muxed streams + :param peer_id: peer_id of peer the connection is to """ - super(Mplex, self).__init__(conn, generic_protocol_handler) + super(Mplex, self).__init__(conn, generic_protocol_handler, peer_id) self.raw_conn = conn self.initiator = conn.initiator @@ -26,6 +27,9 @@ class Mplex(IMuxedConn): # Store generic protocol handler self.generic_protocol_handler = generic_protocol_handler + # Set peer_id + self.peer_id = peer_id + # Mapping from stream ID -> buffer of messages for that stream self.buffers = {} @@ -56,7 +60,7 @@ class Mplex(IMuxedConn): # TODO: pass down timeout from user and use that if stream_id in self.buffers: try: - data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=3) + data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=8) return data except asyncio.TimeoutError: return None diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index 4017755..541fd64 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -7,12 +7,13 @@ class IMuxedConn(ABC): """ @abstractmethod - def __init__(self, conn, generic_protocol_handler): + def __init__(self, conn, generic_protocol_handler, peer_id): """ create a new muxed connection :param conn: an instance of raw connection :param generic_protocol_handler: generic protocol handler for new muxed streams + :param peer_id: peer_id of peer the connection is to """ @abstractmethod diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 5f193a0..9e311e3 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -17,11 +17,11 @@ class TransportUpgrader: def upgrade_security(self): pass - def upgrade_connection(self, conn, generic_protocol_handler): + def upgrade_connection(self, conn, generic_protocol_handler, peer_id): """ upgrade raw connection to muxed connection """ # For PoC, no security, default to mplex # TODO do exchange to determine multiplexer - return Mplex(conn, generic_protocol_handler) + return Mplex(conn, generic_protocol_handler, peer_id) diff --git a/tests/libp2p/test_notify.py b/tests/libp2p/test_notify.py index 71b455b..570ad57 100644 --- a/tests/libp2p/test_notify.py +++ b/tests/libp2p/test_notify.py @@ -49,6 +49,7 @@ class MyNotifee(INotifee): async def listen_close(self, network, _multiaddr): pass + class InvalidNotifee(): # pylint: disable=too-many-instance-attributes, cell-var-from-loop @@ -70,6 +71,36 @@ class InvalidNotifee(): async def listen(self): assert False + +async def perform_two_host_simple_set_up(): + node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + + async def my_stream_handler(stream): + while True: + read_string = (await stream.read()).decode() + + resp = "ack:" + read_string + await stream.write(resp.encode()) + + node_b.set_stream_handler("/echo/1.0.0", my_stream_handler) + + # Associate the peer with local ip address (see default parameters of Libp2p()) + node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) + return node_a, node_b + + +async def perform_two_host_simple_set_up_custom_handler(handler): + node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + + node_b.set_stream_handler("/echo/1.0.0", handler) + + # Associate the peer with local ip address (see default parameters of Libp2p()) + node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) + return node_a, node_b + + @pytest.mark.asyncio async def test_one_notifier(): node_a, node_b = await perform_two_host_set_up_custom_handler(echo_stream_handler) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py new file mode 100644 index 0000000..f328a6e --- /dev/null +++ b/tests/pubsub/dummy_account_node.py @@ -0,0 +1,134 @@ +import asyncio +import multiaddr + +from libp2p import new_node +from libp2p.pubsub.message import create_message_talk +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.message import MessageTalk +from libp2p.pubsub.message import generate_message_id + +SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +CRYPTO_TOPIC = "ethereum" + +# Message format: +# Sending crypto: ,, +# Ex. send,aspyn,alex,5 +# Set crypto: , +# Ex. set,rob,5 +# Determine message type by looking at first item before first comma + +class DummyAccountNode(): + """ + Node which has an internal balance mapping, meant to serve as + a dummy crypto blockchain. There is no actual blockchain, just a simple + map indicating how much crypto each user in the mappings holds + """ + + def __init__(self): + self.balances = {} + + @classmethod + async def create(cls): + """ + Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub + instance to this new node + + We use create as this serves as a factory function and allows us + to use async await, unlike the init function + """ + self = DummyAccountNode() + + libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + self.libp2p_node = libp2p_node + + self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) + self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") + return self + + async def handle_incoming_msgs(self): + """ + Handle all incoming messages on the CRYPTO_TOPIC from peers + """ + while True: + message_raw = await self.q.get() + message = create_message_talk(message_raw) + contents = message.data + + msg_comps = contents.split(",") + + if msg_comps[0] == "send": + self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) + elif msg_comps[0] == "set": + self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) + + async def setup_crypto_networking(self): + """ + Subscribe to CRYPTO_TOPIC and perform call to function that handles + all incoming messages on said topic + """ + self.q = await self.pubsub.subscribe(CRYPTO_TOPIC) + + asyncio.ensure_future(self.handle_incoming_msgs()) + + async def publish_send_crypto(self, source_user, dest_user, amount): + """ + Create a send crypto message and publish that message to all other nodes + :param source_user: user to send crypto from + :param dest_user: user to send crypto to + :param amount: amount of crypto to send + """ + my_id = str(self.libp2p_node.get_id()) + msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) + msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + await self.floodsub.publish(my_id, msg.to_str()) + + async def publish_set_crypto(self, user, amount): + """ + Create a set crypto message and publish that message to all other nodes + :param user: user to set crypto for + :param amount: amount of crypto + """ + my_id = str(self.libp2p_node.get_id()) + msg_contents = "set," + user + "," + str(amount) + msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + await self.floodsub.publish(my_id, msg.to_str()) + + def handle_send_crypto(self, source_user, dest_user, amount): + """ + Handle incoming send_crypto message + :param source_user: user to send crypto from + :param dest_user: user to send crypto to + :param amount: amount of crypto to send + """ + if source_user in self.balances: + self.balances[source_user] -= amount + else: + self.balances[source_user] = -amount + + if dest_user in self.balances: + self.balances[dest_user] += amount + else: + self.balances[dest_user] = amount + + def handle_set_crypto_for_user(self, dest_user, amount): + """ + Handle incoming set_crypto message + :param dest_user: user to set crypto for + :param amount: amount of crypto + """ + self.balances[dest_user] = amount + + def get_balance(self, user): + """ + Get balance in crypto for a particular user + :param user: user to get balance for + :return: balance of user + """ + if user in self.balances: + return self.balances[user] + else: + return -1 + diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py new file mode 100644 index 0000000..a071fa6 --- /dev/null +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -0,0 +1,189 @@ +import asyncio +import multiaddr +import pytest + +from threading import Thread +from tests.utils import cleanup +from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.message import MessageTalk +from libp2p.pubsub.message import create_message_talk +from dummy_account_node import DummyAccountNode + +# pylint: disable=too-many-locals + +async def connect(node1, node2): + # node1 connects to node2 + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) + +def create_setup_in_new_thread_func(dummy_node): + def setup_in_new_thread(): + asyncio.ensure_future(dummy_node.setup_crypto_networking()) + return setup_in_new_thread + +async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): + """ + Helper function to allow for easy construction of custom tests for dummy account nodes + in various network topologies + :param num_nodes: number of nodes in the test + :param adjacency_map: adjacency map defining each node and its list of neighbors + :param action_func: function to execute that includes actions by the nodes, + such as send crypto and set crypto + :param assertion_func: assertions for testing the results of the actions are correct + """ + + # Create nodes + dummy_nodes = [] + for i in range(num_nodes): + dummy_nodes.append(await DummyAccountNode.create()) + + # Create network + for source_num in adjacency_map: + target_nums = adjacency_map[source_num] + for target_num in target_nums: + await connect(dummy_nodes[source_num].libp2p_node, \ + dummy_nodes[target_num].libp2p_node) + + # Allow time for network creation to take place + await asyncio.sleep(0.25) + + # Start a thread for each node so that each node can listen and respond + # to messages on its own thread, which will avoid waiting indefinitely + # on the main thread. On this thread, call the setup func for the node, + # which subscribes the node to the CRYPTO_TOPIC topic + for dummy_node in dummy_nodes: + thread = Thread(target=create_setup_in_new_thread_func(dummy_node)) + thread.run() + + # Allow time for nodes to subscribe to CRYPTO_TOPIC topic + await asyncio.sleep(0.25) + + # Perform action function + await action_func(dummy_nodes) + + # Allow time for action function to be performed (i.e. messages to propogate) + await asyncio.sleep(0.25) + + # Perform assertion function + for dummy_node in dummy_nodes: + assertion_func(dummy_node) + + # Success, terminate pending tasks. + await cleanup() + +@pytest.mark.asyncio +async def test_simple_two_nodes(): + num_nodes = 2 + adj_map = {0: [1]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 10) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 10 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_three_nodes_line_topography(): + num_nodes = 3 + adj_map = {0: [1], 1: [2]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 10) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 10 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_three_nodes_triangle_topography(): + num_nodes = 3 + adj_map = {0: [1, 2], 1: [2]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 20 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_seven_nodes_tree_topography(): + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 20 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_set_then_send_from_root_seven_nodes_tree_topography(): + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) + await asyncio.sleep(0.25) + await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 15 + assert dummy_node.get_balance("alex") == 5 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} + + async def action_func(dummy_nodes): + await dummy_nodes[6].publish_set_crypto("aspyn", 20) + await asyncio.sleep(0.25) + await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 15 + assert dummy_node.get_balance("alex") == 5 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_five_nodes_ring_topography(): + num_nodes = 5 + adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 20 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): + num_nodes = 5 + adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("alex", 20) + await asyncio.sleep(0.25) + await dummy_nodes[3].publish_send_crypto("alex", "rob", 12) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("alex") == 8 + assert dummy_node.get_balance("rob") == 12 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py new file mode 100644 index 0000000..f4a5826 --- /dev/null +++ b/tests/pubsub/test_floodsub.py @@ -0,0 +1,486 @@ +import asyncio +import multiaddr +import pytest + +from tests.utils import cleanup +from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.message import MessageTalk +from libp2p.pubsub.message import create_message_talk +from libp2p.pubsub.message import generate_message_id + +# pylint: disable=too-many-locals + +async def connect(node1, node2): + """ + Connect node1 to node2 + """ + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) + +@pytest.mark.asyncio +async def test_simple_two_nodes(): + node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + + await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + supported_protocols = ["/floodsub/1.0.0"] + + floodsub_a = FloodSub(supported_protocols) + pubsub_a = Pubsub(node_a, floodsub_a, "a") + floodsub_b = FloodSub(supported_protocols) + pubsub_b = Pubsub(node_b, floodsub_b, "b") + + await connect(node_a, node_b) + + await asyncio.sleep(0.25) + qb = await pubsub_b.subscribe("my_topic") + + await asyncio.sleep(0.25) + + node_a_id = str(node_a.get_id()) + + msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) + + await floodsub_a.publish(node_a.get_id(), msg.to_str()) + + await asyncio.sleep(0.25) + + res_b = await qb.get() + + # Check that the msg received by node_b is the same + # as the message sent by node_a + assert res_b == msg.to_str() + + # Success, terminate pending tasks. + await cleanup() + +@pytest.mark.asyncio +async def test_simple_three_nodes(): + # Want to pass message from A -> B -> C + node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + + await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + supported_protocols = ["/floodsub/1.0.0"] + + floodsub_a = FloodSub(supported_protocols) + pubsub_a = Pubsub(node_a, floodsub_a, "a") + floodsub_b = FloodSub(supported_protocols) + pubsub_b = Pubsub(node_b, floodsub_b, "b") + floodsub_c = FloodSub(supported_protocols) + pubsub_c = Pubsub(node_c, floodsub_c, "c") + + await connect(node_a, node_b) + await connect(node_b, node_c) + + await asyncio.sleep(0.25) + qb = await pubsub_b.subscribe("my_topic") + qc = await pubsub_c.subscribe("my_topic") + await asyncio.sleep(0.25) + + node_a_id = str(node_a.get_id()) + + msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) + + await floodsub_a.publish(node_a.get_id(), msg.to_str()) + + await asyncio.sleep(0.25) + res_b = await qb.get() + res_c = await qc.get() + + # Check that the msg received by node_b is the same + # as the message sent by node_a + assert res_b == msg.to_str() + + # res_c should match original msg but with b as sender + node_b_id = str(node_b.get_id()) + msg.from_id = node_b_id + + assert res_c == msg.to_str() + + # Success, terminate pending tasks. + await cleanup() + +async def perform_test_from_obj(obj): + """ + Perform a floodsub test from a test obj. + test obj are composed as follows: + + { + "supported_protocols": ["supported/protocol/1.0.0",...], + "adj_list": { + "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], + "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], + ... + }, + "topic_map": { + "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] + }, + "messages": [ + { + "topics": ["topic1_for_message", "topic2_for_message", ...], + "data": "some contents of the message (newlines are not supported)", + "node_id": "message sender node id" + }, + ... + ] + } + NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A + or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior + is undefined (even if it may work) + """ + + # Step 1) Create graph + adj_list = obj["adj_list"] + node_map = {} + floodsub_map = {} + pubsub_map = {} + + supported_protocols = obj["supported_protocols"] + + tasks_connect = [] + for start_node_id in adj_list: + # Create node if node does not yet exist + if start_node_id not in node_map: + node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + node_map[start_node_id] = node + + floodsub = FloodSub(supported_protocols) + floodsub_map[start_node_id] = floodsub + pubsub = Pubsub(node, floodsub, start_node_id) + pubsub_map[start_node_id] = pubsub + + # For each neighbor of start_node, create if does not yet exist, + # then connect start_node to neighbor + for neighbor_id in adj_list[start_node_id]: + # Create neighbor if neighbor does not yet exist + if neighbor_id not in node_map: + neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + node_map[neighbor_id] = neighbor_node + + floodsub = FloodSub(supported_protocols) + floodsub_map[neighbor_id] = floodsub + pubsub = Pubsub(neighbor_node, floodsub, neighbor_id) + pubsub_map[neighbor_id] = pubsub + + # Connect node and neighbor + # await connect(node_map[start_node_id], node_map[neighbor_id]) + tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id]))) + tasks_connect.append(asyncio.sleep(2)) + await asyncio.gather(*tasks_connect) + + # Allow time for graph creation before continuing + # await asyncio.sleep(0.25) + + # Step 2) Subscribe to topics + queues_map = {} + topic_map = obj["topic_map"] + + tasks_topic = [] + tasks_topic_data = [] + for topic in topic_map: + for node_id in topic_map[topic]: + """ + # Subscribe node to topic + q = await pubsub_map[node_id].subscribe(topic) + + # Create topic-queue map for node_id if one does not yet exist + if node_id not in queues_map: + queues_map[node_id] = {} + + # Store queue in topic-queue map for node + queues_map[node_id][topic] = q + """ + tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic))) + tasks_topic_data.append((node_id, topic)) + tasks_topic.append(asyncio.sleep(2)) + + # Gather is like Promise.all + responses = await asyncio.gather(*tasks_topic, return_exceptions=True) + for i in range(len(responses) - 1): + q = responses[i] + node_id, topic = tasks_topic_data[i] + if node_id not in queues_map: + queues_map[node_id] = {} + + # Store queue in topic-queue map for node + queues_map[node_id][topic] = q + + # Allow time for subscribing before continuing + # await asyncio.sleep(0.01) + + # Step 3) Publish messages + topics_in_msgs_ordered = [] + messages = obj["messages"] + tasks_publish = [] + for msg in messages: + topics = msg["topics"] + + data = msg["data"] + node_id = msg["node_id"] + + # Get actual id for sender node (not the id from the test obj) + actual_node_id = str(node_map[node_id].get_id()) + + # Create correctly formatted message + msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id()) + + # Publish message + # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) + tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()))) + + # For each topic in topics, add topic, msg_talk tuple to ordered test list + # TODO: Update message sender to be correct message sender before + # adding msg_talk to this list + for topic in topics: + topics_in_msgs_ordered.append((topic, msg_talk)) + + # Allow time for publishing before continuing + # await asyncio.sleep(0.4) + tasks_publish.append(asyncio.sleep(2)) + await asyncio.gather(*tasks_publish) + + # Step 4) Check that all messages were received correctly. + # TODO: Check message sender too + for i in range(len(topics_in_msgs_ordered)): + topic, actual_msg = topics_in_msgs_ordered[i] + for node_id in topic_map[topic]: + # Get message from subscription queue + msg_on_node_str = await queues_map[node_id][topic].get() + msg_on_node = create_message_talk(msg_on_node_str) + + # Perform checks + assert actual_msg.origin_id == msg_on_node.origin_id + assert actual_msg.topics == msg_on_node.topics + assert actual_msg.data == msg_on_node.data + + # Success, terminate pending tasks. + await cleanup() + +@pytest.mark.asyncio +async def test_simple_two_nodes_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "foo", + "node_id": "A" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_three_nodes_two_topics_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"], + "B": ["C"] + }, + "topic_map": { + "topic1": ["B", "C"], + "topic2": ["B", "C"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "foo", + "node_id": "A" + }, + { + "topics": ["topic2"], + "data": "Alex is tall", + "node_id": "A" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "Alex is tall", + "node_id": "B" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_two_nodes_one_topic_two_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": "Alex is tall", + "node_id": "B" + }, + { + "topics": ["topic1"], + "data": "foo", + "node_id": "A" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_seven_nodes_tree_one_topics_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"] + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_seven_nodes_tree_three_topics_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"] + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"], + "space": ["2", "3", "4", "5", "6", "7"], + "onions": ["2", "3", "4", "5", "6", "7"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["space"], + "data": "foobar", + "node_id": "1" + }, + { + "topics": ["onions"], + "data": "I am allergic", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], + "space": ["1", "2", "3", "4", "5", "6", "7"], + "onions": ["1", "2", "3", "4", "5", "6", "7"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["space"], + "data": "foobar", + "node_id": "4" + }, + { + "topics": ["onions"], + "data": "I am allergic", + "node_id": "7" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3"], + "2": ["3"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3"], + "school": ["1", "2", "3"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj)