diff --git a/demos/bee_movie/bee_movie.py b/demos/bee_movie/bee_movie.py new file mode 100644 index 0000000..e69de29 diff --git a/demos/bee_movie/msg_ordering_node.py b/demos/bee_movie/msg_ordering_node.py new file mode 100644 index 0000000..0870326 --- /dev/null +++ b/demos/bee_movie/msg_ordering_node.py @@ -0,0 +1,95 @@ +import asyncio +import multiaddr + +from tests.pubsub.utils import message_id_generator, generate_RPC_packet +from libp2p import new_node +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub +from ordered_queue import OrderedQueue + +SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +BEE_MOVIE_TOPIC = "bee_movie" + +# Message format: +# Sending crypto: ,, +# Ex. send,aspyn,alex,5 +# Set crypto: , +# Ex. set,rob,5 +# Determine message type by looking at first item before first comma + +class MsgOrderingNode(): + """ + Node which has an internal balance mapping, meant to serve as + a dummy crypto blockchain. There is no actual blockchain, just a simple + map indicating how much crypto each user in the mappings holds + """ + + def __init__(self): + self.balances = {} + self.next_msg_id_func = message_id_generator(0) + self.priority_queue = OrderedQueue() + # self.last_word_gotten_seqno = 0 + + @classmethod + async def create(cls): + """ + Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub + instance to this new node + + We use create as this serves as a factory function and allows us + to use async await, unlike the init function + """ + self = MsgOrderingNode() + + libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + self.libp2p_node = libp2p_node + + self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) + self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") + return self + + async def handle_incoming_msgs(self): + """ + Handle all incoming messages on the BEE_MOVIE_TOPIC from peers + """ + while True: + incoming = await self.q.get() + seqno = int.from_bytes(incoming.seqno, byteorder='big') + word = incoming.data.decode('utf-8') + + await self.handle_bee_movie_word(seqno, word) + + async def setup_crypto_networking(self): + """ + Subscribe to BEE_MOVIE_TOPIC and perform call to function that handles + all incoming messages on said topic + """ + self.q = await self.pubsub.subscribe(BEE_MOVIE_TOPIC) + + asyncio.ensure_future(self.handle_incoming_msgs()) + + async def publish_bee_movie_word(self, word, msg_id=None): + print("Publish hit for " + word) + my_id = str(self.libp2p_node.get_id()) + if msg_id is None: + msg_id = self.next_msg_id_func() + packet = generate_RPC_packet(my_id, [BEE_MOVIE_TOPIC], word, msg_id) + print("Packet generated") + await self.floodsub.publish(my_id, packet.SerializeToString()) + + async def handle_bee_movie_word(self, seqno, word): + print("Handle hit for " + str(seqno) + ", " + word) + await self.priority_queue.put((seqno, word)) + + async def get_next_word_in_bee_movie(self): + # Get just the word (and not the seqno) and return the word + next_word = (await self.priority_queue.get())[1] + return next_word + # if (self.priority_queue.qsize() > 0 and \ + # self.priority_queue[0][0] == self.last_word_gotten_seqno + 1): + # return await self.priority_queue.get() + # else: + # # Wait until a word is put with next_word_seqno == self.last_word_gotten_seqno + 1 + # # Then return first element of priority queue diff --git a/demos/bee_movie/ordered_queue.py b/demos/bee_movie/ordered_queue.py new file mode 100644 index 0000000..5150fbe --- /dev/null +++ b/demos/bee_movie/ordered_queue.py @@ -0,0 +1,57 @@ +import asyncio + +""" +NOTE: ISSUE IS THAT THERE task IS BLOCKING SINCE IT IS WAITING ON SAME COROUTINE THAT +WE ARE RUNNING ON +""" +class OrderedQueue(): + + def __init__(self): + self.last_gotten_seqno = 0 + self.queue = asyncio.PriorityQueue() + self.task = None + + async def put(self, item): + print("Put " + str(item)) + seqno = item[0] + await self.queue.put(item) + print("Added to queue " + str(item)) + if self.last_gotten_seqno + 1 == seqno and self.task is not None: + # Allow get future to return the most recent item that is put + print("Set called") + self.task.set() + + async def get(self): + print("get") + print(str(self.last_gotten_seqno)) + if self.queue.qsize() > 0: + front_item = await self.queue.get() + + if front_item[0] == self.last_gotten_seqno + 1: + print("Trivial get " + str(front_item)) + self.last_gotten_seqno += 1 + return front_item + else: + # Put element back as it should not be delivered yet + print("Front item put back 1") + print(str(self.last_gotten_seqno)) + print(front_item[1]) + await self.queue.put(front_item) + print("Front item put back 2") + + print("get 2") + # Wait until + self.task = asyncio.Event() + print("get 3") + await self.task.wait() + print("get 4") + item = await self.queue.get() + print("get 5") + # print(str(item) + " got from queue") + + # Remove task + self.task = None + + self.last_gotten_seqno += 1 + + return item \ No newline at end of file diff --git a/demos/bee_movie/test_bee_movie.py b/demos/bee_movie/test_bee_movie.py new file mode 100644 index 0000000..6cba89e --- /dev/null +++ b/demos/bee_movie/test_bee_movie.py @@ -0,0 +1,201 @@ +import asyncio +import multiaddr +import pytest +import struct + +from threading import Thread +from tests.utils import cleanup +from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub +from msg_ordering_node import MsgOrderingNode + +# pylint: disable=too-many-locals + +async def connect(node1, node2): + # node1 connects to node2 + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) + +def create_setup_in_new_thread_func(dummy_node): + def setup_in_new_thread(): + asyncio.ensure_future(dummy_node.setup_crypto_networking()) + return setup_in_new_thread + +async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): + """ + Helper function to allow for easy construction of custom tests for dummy account nodes + in various network topologies + :param num_nodes: number of nodes in the test + :param adjacency_map: adjacency map defining each node and its list of neighbors + :param action_func: function to execute that includes actions by the nodes, + such as send crypto and set crypto + :param assertion_func: assertions for testing the results of the actions are correct + """ + + # Create nodes + dummy_nodes = [] + for i in range(num_nodes): + dummy_nodes.append(await MsgOrderingNode.create()) + + # Create network + for source_num in adjacency_map: + target_nums = adjacency_map[source_num] + for target_num in target_nums: + await connect(dummy_nodes[source_num].libp2p_node, \ + dummy_nodes[target_num].libp2p_node) + + # Allow time for network creation to take place + await asyncio.sleep(0.25) + + # Start a thread for each node so that each node can listen and respond + # to messages on its own thread, which will avoid waiting indefinitely + # on the main thread. On this thread, call the setup func for the node, + # which subscribes the node to the CRYPTO_TOPIC topic + for dummy_node in dummy_nodes: + thread = Thread(target=create_setup_in_new_thread_func(dummy_node)) + thread.run() + + # Allow time for nodes to subscribe to CRYPTO_TOPIC topic + await asyncio.sleep(0.25) + + # Perform action function + await action_func(dummy_nodes) + + # Allow time for action function to be performed (i.e. messages to propogate) + await asyncio.sleep(1) + + # Perform assertion function + for dummy_node in dummy_nodes: + await assertion_func(dummy_node) + + # Success, terminate pending tasks. + await cleanup() + +@pytest.mark.asyncio +async def test_simple_two_nodes_one_word(): + num_nodes = 2 + adj_map = {0: [1]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_bee_movie_word("aspyn") + # await asyncio.sleep(0.25) + await dummy_nodes[0].publish_bee_movie_word("hello") + # await asyncio.sleep(0.25) + + async def assertion_func(dummy_node): + next_word = await dummy_node.get_next_word_in_bee_movie() + assert next_word == "aspyn" + next_word = await dummy_node.get_next_word_in_bee_movie() + assert next_word == "hello" + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_two_nodes_ten_words(): + num_nodes = 2 + adj_map = {0: [1]} + + words = ["aspyn", "is", "so", "good", "at", "writing", "code", "XD", ":)", "foobar"] + + async def action_func(dummy_nodes): + for word in words: + await dummy_nodes[0].publish_bee_movie_word(word) + # await asyncio.sleep(0.25) + + async def assertion_func(dummy_node): + for word in words: + assert await dummy_node.get_next_word_in_bee_movie() == word + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_two_nodes_two_words_out_of_order_ids(): + num_nodes = 2 + adj_map = {0: [1]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_bee_movie_word("word 2", struct.pack('>I', 2)) + word, _, _ = await asyncio.gather(dummy_nodes[0].get_next_word_in_bee_movie(),\ + asyncio.sleep(0.25), \ + dummy_nodes[0].publish_bee_movie_word("word 1", struct.pack('>I', 1))) + assert word == "word 1" + assert await dummy_nodes[0].get_next_word_in_bee_movie() == "word 2" + + async def assertion_func(dummy_node): + pass + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_two_nodes_two_words_read_then_publish_out_of_order_ids(): + num_nodes = 2 + adj_map = {0: [1]} + collected = None + + async def collect_all_words(expected_len, dummy_node): + collected_words = [] + while True: + word = await dummy_node.get_next_word_in_bee_movie() + collected_words.append(word) + if len(collected_words) == expected_len: + return collected_words + + async def action_func(dummy_nodes): + words, _, _, _ = await asyncio.gather(collect_all_words(2, dummy_nodes[0]),\ + asyncio.sleep(0.25), \ + dummy_nodes[0].publish_bee_movie_word("word 2", struct.pack('>I', 2)),\ + dummy_nodes[0].publish_bee_movie_word("word 1", struct.pack('>I', 1))) + + # Store collected words to be checked in assertion func + nonlocal collected + collected = words + + async def assertion_func(dummy_node): + assert collected[0] == "word 1" + assert collected[1] == "word 2" + + await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_simple_two_nodes_ten_words_out_of_order_ids(): + num_nodes = 2 + adj_map = {0: [1]} + collected = None + + async def collect_all_words(expected_len, dummy_node): + collected_words = [] + while True: + word = await dummy_node.get_next_word_in_bee_movie() + collected_words.append(word) + if len(collected_words) == expected_len: + return collected_words + + async def action_func(dummy_nodes): + words = ["e", "b", "d", "i", "a", "h", "c", "f", "g", "j"] + msg_id_nums = [5, 2, 4, 9, 1, 8, 3, 6, 7, 10] + msg_ids = [] + tasks = [] + for msg_id_num in msg_id_nums: + msg_ids.append(struct.pack('>I', msg_id_num)) + + tasks.append(collect_all_words(len(words), dummy_nodes[0])) + tasks.append(asyncio.sleep(0.25)) + + for i in range(len(words)): + tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i])) + + res = await asyncio.gather(*tasks) + + nonlocal collected + collected = res[0] + + async def assertion_func(dummy_node): + correct_words = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"] + for i in range(len(correct_words)): + assert collected[i] == correct_words[i] + assert len(collected) == len(correct_words) + + await perform_test(num_nodes, adj_map, action_func, assertion_func)