diff --git a/demos/bee_movie/msg_ordering_node.py b/demos/bee_movie/msg_ordering_node.py index 0870326..dbe8b9c 100644 --- a/demos/bee_movie/msg_ordering_node.py +++ b/demos/bee_movie/msg_ordering_node.py @@ -71,16 +71,16 @@ class MsgOrderingNode(): asyncio.ensure_future(self.handle_incoming_msgs()) async def publish_bee_movie_word(self, word, msg_id=None): - print("Publish hit for " + word) + # print("Publish hit for " + word) my_id = str(self.libp2p_node.get_id()) if msg_id is None: msg_id = self.next_msg_id_func() packet = generate_RPC_packet(my_id, [BEE_MOVIE_TOPIC], word, msg_id) - print("Packet generated") + # print("Packet generated") await self.floodsub.publish(my_id, packet.SerializeToString()) async def handle_bee_movie_word(self, seqno, word): - print("Handle hit for " + str(seqno) + ", " + word) + # print("Handle hit for " + str(seqno) + ", " + word) await self.priority_queue.put((seqno, word)) async def get_next_word_in_bee_movie(self): diff --git a/demos/bee_movie/ordered_queue.py b/demos/bee_movie/ordered_queue.py index 5150fbe..a540297 100644 --- a/demos/bee_movie/ordered_queue.py +++ b/demos/bee_movie/ordered_queue.py @@ -12,41 +12,39 @@ class OrderedQueue(): self.task = None async def put(self, item): - print("Put " + str(item)) + # print("Put " + str(item)) seqno = item[0] await self.queue.put(item) - print("Added to queue " + str(item)) + # print("Added to queue " + str(item)) if self.last_gotten_seqno + 1 == seqno and self.task is not None: # Allow get future to return the most recent item that is put - print("Set called") + # print("Set called") self.task.set() async def get(self): - print("get") - print(str(self.last_gotten_seqno)) if self.queue.qsize() > 0: front_item = await self.queue.get() if front_item[0] == self.last_gotten_seqno + 1: - print("Trivial get " + str(front_item)) + # print("Trivial get " + str(front_item)) self.last_gotten_seqno += 1 return front_item else: # Put element back as it should not be delivered yet - print("Front item put back 1") - print(str(self.last_gotten_seqno)) - print(front_item[1]) + # print("Front item put back 1") + # print(str(self.last_gotten_seqno)) + # print(front_item[1]) await self.queue.put(front_item) - print("Front item put back 2") + # print("Front item put back 2") - print("get 2") + # print("get 2") # Wait until self.task = asyncio.Event() - print("get 3") + # print("get 3") await self.task.wait() - print("get 4") + # print("get 4") item = await self.queue.get() - print("get 5") + # print("get 5") # print(str(item) + " got from queue") # Remove task diff --git a/demos/bee_movie/test_bee_movie.py b/demos/bee_movie/test_bee_movie.py index 39eb350..a305aa3 100644 --- a/demos/bee_movie/test_bee_movie.py +++ b/demos/bee_movie/test_bee_movie.py @@ -2,6 +2,7 @@ import asyncio import multiaddr import pytest import struct +import urllib.request from threading import Thread from tests.utils import cleanup @@ -287,3 +288,134 @@ async def test_simple_seven_nodes_tree_words_out_of_order_ids(): assert len(collected[i]) == len(correct_words) await perform_test(num_nodes, adj_map, action_func, assertion_func) + +def download_bee_movie(): + url = "https://gist.githubusercontent.com/stuckinaboot/c531823814af1f6785f75ed7eedf60cb/raw/5107c5e6c2fda2ff54cfcc9803bbb297a53db71b/bee_movie.txt" + response = urllib.request.urlopen(url) + data = response.read() # a `bytes` object + text = data.decode('utf-8') # a `str`; this step can't be used if data is binary + return text + +@pytest.mark.asyncio +async def test_simple_two_nodes_bee_movie(): + print("Downloading Bee Movie") + bee_movie_script = download_bee_movie() + print("Downloaded Bee Movie") + bee_movie_words = bee_movie_script.split(" ") + print("Bee Movie Script Split on Spaces, # spaces = " + str(len(bee_movie_words))) + + num_nodes = 2 + adj_map = {0: [1]} + collected = [] + + async def collect_all_words(expected_len, dummy_node, log_nodes=[]): + collected_words = [] + while True: + word = await dummy_node.get_next_word_in_bee_movie() + collected_words.append(word) + + # Log if needed + if dummy_node in log_nodes: + print(word + "| " + str(len(collected_words)) + "/" + str(expected_len)) + + if len(collected_words) == expected_len: + print("Returned collected words") + return collected_words + + async def action_func(dummy_nodes): + print("Start action function") + words = bee_movie_words + tasks = [] + + print("Add collect all words") + log_nodes = [dummy_nodes[0]] + for i in range(num_nodes): + tasks.append(collect_all_words(len(words), dummy_nodes[i], log_nodes)) + + print("Add sleep") + tasks.append(asyncio.sleep(0.25)) + + print("Add publish") + for i in range(len(words)): + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i])) + + print("Perform gather") + res = await asyncio.gather(*tasks) + + print("Filling collected") + nonlocal collected + for i in range(num_nodes): + collected.append(res[i]) + print("Filled collected") + + async def assertion_func(dummy_node): + print("Perform assertion") + correct_words = bee_movie_words + for i in range(num_nodes): + assert collected[i] == correct_words + assert len(collected[i]) == len(correct_words) + print("Assertion performed") + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_seven_nodes_tree_bee_movie(): + print("Downloading Bee Movie") + bee_movie_script = download_bee_movie() + print("Downloaded Bee Movie") + bee_movie_words = bee_movie_script.split(" ") + print("Bee Movie Script Split on Spaces, # spaces = " + str(len(bee_movie_words))) + + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} + collected = [] + + async def collect_all_words(expected_len, dummy_node, log_nodes=[]): + collected_words = [] + while True: + word = await dummy_node.get_next_word_in_bee_movie() + collected_words.append(word) + + # Log if needed + if dummy_node in log_nodes: + print(word + "| " + str(len(collected_words)) + "/" + str(expected_len)) + + if len(collected_words) == expected_len: + print("Returned collected words") + return collected_words + + async def action_func(dummy_nodes): + print("Start action function") + words = bee_movie_words + tasks = [] + + print("Add collect all words") + log_nodes = [dummy_nodes[0]] + for i in range(num_nodes): + tasks.append(collect_all_words(len(words), dummy_nodes[i], log_nodes)) + + print("Add sleep") + tasks.append(asyncio.sleep(0.25)) + + print("Add publish") + for i in range(len(words)): + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i])) + + print("Perform gather") + res = await asyncio.gather(*tasks, return_exceptions=True) + + print("Filling collected") + nonlocal collected + for i in range(num_nodes): + collected.append(res[i]) + print("Filled collected") + + async def assertion_func(dummy_node): + print("Perform assertion") + correct_words = bee_movie_words + for i in range(num_nodes): + assert collected[i] == correct_words + assert len(collected[i]) == len(correct_words) + print("Assertion performed") + + await perform_test(num_nodes, adj_map, action_func, assertion_func)