From fa292ae7c8ffb92ff4526e47cd978512f692da64 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Wed, 8 May 2019 20:08:27 -0400 Subject: [PATCH] Cleanup --- .../bee_movie/{bee_movie.py => __init__.py} | 0 examples/bee_movie/msg_ordering_node.py | 16 ++++--- examples/bee_movie/ordered_queue.py | 23 ++++++--- examples/bee_movie/test_bee_movie.py | 47 +++++++++++-------- 4 files changed, 53 insertions(+), 33 deletions(-) rename examples/bee_movie/{bee_movie.py => __init__.py} (100%) diff --git a/examples/bee_movie/bee_movie.py b/examples/bee_movie/__init__.py similarity index 100% rename from examples/bee_movie/bee_movie.py rename to examples/bee_movie/__init__.py diff --git a/examples/bee_movie/msg_ordering_node.py b/examples/bee_movie/msg_ordering_node.py index e0fa82a..6605d35 100644 --- a/examples/bee_movie/msg_ordering_node.py +++ b/examples/bee_movie/msg_ordering_node.py @@ -5,7 +5,7 @@ from tests.pubsub.utils import message_id_generator, generate_RPC_packet from libp2p import new_node from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from ordered_queue import OrderedQueue +from .ordered_queue import OrderedQueue SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] BEE_MOVIE_TOPIC = "bee_movie" @@ -17,12 +17,15 @@ class MsgOrderingNode(): self.balances = {} self.next_msg_id_func = message_id_generator(0) self.priority_queue = OrderedQueue() - # self.last_word_gotten_seqno = 0 + + self.libp2p_node = None + self.floodsub = None + self.pubsub = None @classmethod async def create(cls): """ - Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub + Create a new MsgOrderingNode and attach a libp2p node, a floodsub, and a pubsub instance to this new node We use create as this serves as a factory function and allows us @@ -44,7 +47,7 @@ class MsgOrderingNode(): Handle all incoming messages on the BEE_MOVIE_TOPIC from peers """ while True: - incoming = await self.q.get() + incoming = await self.queue.get() seqno = int.from_bytes(incoming.seqno, byteorder='big') word = incoming.data.decode('utf-8') @@ -55,11 +58,12 @@ class MsgOrderingNode(): Subscribe to BEE_MOVIE_TOPIC and perform call to function that handles all incoming messages on said topic """ - self.q = await self.pubsub.subscribe(BEE_MOVIE_TOPIC) + self.queue = await self.pubsub.subscribe(BEE_MOVIE_TOPIC) asyncio.ensure_future(self.handle_incoming_msgs()) async def publish_bee_movie_word(self, word, msg_id=None): + # Publish a bee movie word to all peers my_id = str(self.libp2p_node.get_id()) if msg_id is None: msg_id = self.next_msg_id_func() @@ -67,7 +71,7 @@ class MsgOrderingNode(): await self.floodsub.publish(my_id, packet.SerializeToString()) async def handle_bee_movie_word(self, seqno, word): - # print("Handle hit for " + str(seqno) + ", " + word) + # Handle bee movie word received await self.priority_queue.put((seqno, word)) async def get_next_word_in_bee_movie(self): diff --git a/examples/bee_movie/ordered_queue.py b/examples/bee_movie/ordered_queue.py index 831744d..9dbc79e 100644 --- a/examples/bee_movie/ordered_queue.py +++ b/examples/bee_movie/ordered_queue.py @@ -1,10 +1,13 @@ import asyncio -""" -NOTE: ISSUE IS THAT THERE task IS BLOCKING SINCE IT IS WAITING ON SAME COROUTINE THAT -WE ARE RUNNING ON -""" class OrderedQueue(): + """ + asyncio.queue wrapper that delivers messages in order of subsequent sequence numbers, + so if message 1 and 3 are received and the following get calls occur: + get(), get(), get() + the queue will deliver message 1, will wait until message 2 is received to deliver message 2, + and then deliver message 3 + """ def __init__(self): self.last_gotten_seqno = 0 @@ -12,6 +15,9 @@ class OrderedQueue(): self.task = None async def put(self, item): + """ + :param item: put item tuple (seqno, data) onto queue + """ seqno = item[0] await self.queue.put(item) if self.last_gotten_seqno + 1 == seqno and self.task is not None: @@ -19,15 +25,18 @@ class OrderedQueue(): self.task.set() async def get(self): + """ + Get item with last_gotten_seqno + 1 from the queue + :return: (seqno, data) + """ if self.queue.qsize() > 0: front_item = await self.queue.get() if front_item[0] == self.last_gotten_seqno + 1: self.last_gotten_seqno += 1 return front_item - else: - # Put element back as it should not be delivered yet - await self.queue.put(front_item) + # Put element back as it should not be delivered yet + await self.queue.put(front_item) # Wait until item with subsequent seqno is put on queue self.task = asyncio.Event() diff --git a/examples/bee_movie/test_bee_movie.py b/examples/bee_movie/test_bee_movie.py index 14b6dfe..ac0beaa 100644 --- a/examples/bee_movie/test_bee_movie.py +++ b/examples/bee_movie/test_bee_movie.py @@ -1,19 +1,26 @@ import asyncio -import multiaddr -import pytest +from threading import Thread import struct +import pytest import urllib.request -from threading import Thread -from tests.utils import cleanup -from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.pubsub.pubsub import Pubsub -from libp2p.pubsub.floodsub import FloodSub -from msg_ordering_node import MsgOrderingNode +from .msg_ordering_node import MsgOrderingNode +from tests.utils import cleanup # pylint: disable=too-many-locals +""" +Test-cases demonstrating how to create nodes that continuously stream data +and ensure that data is delivered to each node with pre-determined ordering. +The ordering is such that if a peer A sends a publish 1 and 2 with seqno=1 and with seqno=2, +respectively, even if the publish 2 (with seqno=2) reaches the peers first, it will not +be processed until seqno=1 is received (and then publish 1 with seqno=1 must be +processed before publish 2 with seqno=2 will be). + +This concept is demonstrated by streaming the script to the entire bee movie to several nodes +""" + async def connect(node1, node2): # node1 connects to node2 addr = node2.get_addrs()[0] @@ -37,7 +44,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): # Create nodes dummy_nodes = [] - for i in range(num_nodes): + for _ in range(num_nodes): dummy_nodes.append(await MsgOrderingNode.create()) # Create network @@ -148,7 +155,7 @@ async def test_simple_two_nodes_two_words_read_then_publish_out_of_order_ids(): asyncio.sleep(0.25), \ dummy_nodes[0].publish_bee_movie_word("word 2", struct.pack('>I', 2)),\ dummy_nodes[0].publish_bee_movie_word("word 1", struct.pack('>I', 1))) - + # Store collected words to be checked in assertion func nonlocal collected collected = words @@ -185,10 +192,10 @@ async def test_simple_two_nodes_ten_words_out_of_order_ids(): tasks.append(asyncio.sleep(0.25)) for i in range(len(words)): - tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) res = await asyncio.gather(*tasks) - + nonlocal collected collected = res[0] @@ -228,10 +235,10 @@ async def test_simple_five_nodes_rings_words_out_of_order_ids(): tasks.append(asyncio.sleep(0.25)) for i in range(len(words)): - tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) res = await asyncio.gather(*tasks) - + nonlocal collected for i in range(num_nodes): collected.append(res[i]) @@ -272,10 +279,10 @@ async def test_simple_seven_nodes_tree_words_out_of_order_ids(): tasks.append(asyncio.sleep(0.25)) for i in range(len(words)): - tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) res = await asyncio.gather(*tasks) - + nonlocal collected for i in range(num_nodes): collected.append(res[i]) @@ -336,11 +343,11 @@ async def test_simple_two_nodes_bee_movie(): print("Add publish") for i in range(len(words)): - tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i])) + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i])) print("Perform gather") res = await asyncio.gather(*tasks) - + print("Filling collected") nonlocal collected for i in range(num_nodes): @@ -398,11 +405,11 @@ async def test_simple_seven_nodes_tree_bee_movie(): print("Add publish") for i in range(len(words)): - tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i])) + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i])) print("Perform gather") res = await asyncio.gather(*tasks, return_exceptions=True) - + print("Filling collected") nonlocal collected for i in range(num_nodes):