Add basic out of order tests
This commit is contained in:
parent
f4fb71e0cf
commit
7b67c9cb2f
0
demos/bee_movie/bee_movie.py
Normal file
0
demos/bee_movie/bee_movie.py
Normal file
95
demos/bee_movie/msg_ordering_node.py
Normal file
95
demos/bee_movie/msg_ordering_node.py
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
|
||||||
|
from tests.pubsub.utils import message_id_generator, generate_RPC_packet
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
from ordered_queue import OrderedQueue
|
||||||
|
|
||||||
|
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||||
|
BEE_MOVIE_TOPIC = "bee_movie"
|
||||||
|
|
||||||
|
# Message format:
|
||||||
|
# Sending crypto: <source>,<dest>,<amount as integer>
|
||||||
|
# Ex. send,aspyn,alex,5
|
||||||
|
# Set crypto: <dest>,<amount as integer>
|
||||||
|
# Ex. set,rob,5
|
||||||
|
# Determine message type by looking at first item before first comma
|
||||||
|
|
||||||
|
class MsgOrderingNode():
|
||||||
|
"""
|
||||||
|
Node which has an internal balance mapping, meant to serve as
|
||||||
|
a dummy crypto blockchain. There is no actual blockchain, just a simple
|
||||||
|
map indicating how much crypto each user in the mappings holds
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.balances = {}
|
||||||
|
self.next_msg_id_func = message_id_generator(0)
|
||||||
|
self.priority_queue = OrderedQueue()
|
||||||
|
# self.last_word_gotten_seqno = 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def create(cls):
|
||||||
|
"""
|
||||||
|
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
|
||||||
|
instance to this new node
|
||||||
|
|
||||||
|
We use create as this serves as a factory function and allows us
|
||||||
|
to use async await, unlike the init function
|
||||||
|
"""
|
||||||
|
self = MsgOrderingNode()
|
||||||
|
|
||||||
|
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||||
|
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||||
|
|
||||||
|
self.libp2p_node = libp2p_node
|
||||||
|
|
||||||
|
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||||
|
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def handle_incoming_msgs(self):
|
||||||
|
"""
|
||||||
|
Handle all incoming messages on the BEE_MOVIE_TOPIC from peers
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
incoming = await self.q.get()
|
||||||
|
seqno = int.from_bytes(incoming.seqno, byteorder='big')
|
||||||
|
word = incoming.data.decode('utf-8')
|
||||||
|
|
||||||
|
await self.handle_bee_movie_word(seqno, word)
|
||||||
|
|
||||||
|
async def setup_crypto_networking(self):
|
||||||
|
"""
|
||||||
|
Subscribe to BEE_MOVIE_TOPIC and perform call to function that handles
|
||||||
|
all incoming messages on said topic
|
||||||
|
"""
|
||||||
|
self.q = await self.pubsub.subscribe(BEE_MOVIE_TOPIC)
|
||||||
|
|
||||||
|
asyncio.ensure_future(self.handle_incoming_msgs())
|
||||||
|
|
||||||
|
async def publish_bee_movie_word(self, word, msg_id=None):
|
||||||
|
print("Publish hit for " + word)
|
||||||
|
my_id = str(self.libp2p_node.get_id())
|
||||||
|
if msg_id is None:
|
||||||
|
msg_id = self.next_msg_id_func()
|
||||||
|
packet = generate_RPC_packet(my_id, [BEE_MOVIE_TOPIC], word, msg_id)
|
||||||
|
print("Packet generated")
|
||||||
|
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||||
|
|
||||||
|
async def handle_bee_movie_word(self, seqno, word):
|
||||||
|
print("Handle hit for " + str(seqno) + ", " + word)
|
||||||
|
await self.priority_queue.put((seqno, word))
|
||||||
|
|
||||||
|
async def get_next_word_in_bee_movie(self):
|
||||||
|
# Get just the word (and not the seqno) and return the word
|
||||||
|
next_word = (await self.priority_queue.get())[1]
|
||||||
|
return next_word
|
||||||
|
# if (self.priority_queue.qsize() > 0 and \
|
||||||
|
# self.priority_queue[0][0] == self.last_word_gotten_seqno + 1):
|
||||||
|
# return await self.priority_queue.get()
|
||||||
|
# else:
|
||||||
|
# # Wait until a word is put with next_word_seqno == self.last_word_gotten_seqno + 1
|
||||||
|
# # Then return first element of priority queue
|
57
demos/bee_movie/ordered_queue.py
Normal file
57
demos/bee_movie/ordered_queue.py
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
"""
|
||||||
|
NOTE: ISSUE IS THAT THERE task IS BLOCKING SINCE IT IS WAITING ON SAME COROUTINE THAT
|
||||||
|
WE ARE RUNNING ON
|
||||||
|
"""
|
||||||
|
class OrderedQueue():
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.last_gotten_seqno = 0
|
||||||
|
self.queue = asyncio.PriorityQueue()
|
||||||
|
self.task = None
|
||||||
|
|
||||||
|
async def put(self, item):
|
||||||
|
print("Put " + str(item))
|
||||||
|
seqno = item[0]
|
||||||
|
await self.queue.put(item)
|
||||||
|
print("Added to queue " + str(item))
|
||||||
|
if self.last_gotten_seqno + 1 == seqno and self.task is not None:
|
||||||
|
# Allow get future to return the most recent item that is put
|
||||||
|
print("Set called")
|
||||||
|
self.task.set()
|
||||||
|
|
||||||
|
async def get(self):
|
||||||
|
print("get")
|
||||||
|
print(str(self.last_gotten_seqno))
|
||||||
|
if self.queue.qsize() > 0:
|
||||||
|
front_item = await self.queue.get()
|
||||||
|
|
||||||
|
if front_item[0] == self.last_gotten_seqno + 1:
|
||||||
|
print("Trivial get " + str(front_item))
|
||||||
|
self.last_gotten_seqno += 1
|
||||||
|
return front_item
|
||||||
|
else:
|
||||||
|
# Put element back as it should not be delivered yet
|
||||||
|
print("Front item put back 1")
|
||||||
|
print(str(self.last_gotten_seqno))
|
||||||
|
print(front_item[1])
|
||||||
|
await self.queue.put(front_item)
|
||||||
|
print("Front item put back 2")
|
||||||
|
|
||||||
|
print("get 2")
|
||||||
|
# Wait until
|
||||||
|
self.task = asyncio.Event()
|
||||||
|
print("get 3")
|
||||||
|
await self.task.wait()
|
||||||
|
print("get 4")
|
||||||
|
item = await self.queue.get()
|
||||||
|
print("get 5")
|
||||||
|
# print(str(item) + " got from queue")
|
||||||
|
|
||||||
|
# Remove task
|
||||||
|
self.task = None
|
||||||
|
|
||||||
|
self.last_gotten_seqno += 1
|
||||||
|
|
||||||
|
return item
|
201
demos/bee_movie/test_bee_movie.py
Normal file
201
demos/bee_movie/test_bee_movie.py
Normal file
|
@ -0,0 +1,201 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
import pytest
|
||||||
|
import struct
|
||||||
|
|
||||||
|
from threading import Thread
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
from msg_ordering_node import MsgOrderingNode
|
||||||
|
|
||||||
|
# pylint: disable=too-many-locals
|
||||||
|
|
||||||
|
async def connect(node1, node2):
|
||||||
|
# node1 connects to node2
|
||||||
|
addr = node2.get_addrs()[0]
|
||||||
|
info = info_from_p2p_addr(addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
def create_setup_in_new_thread_func(dummy_node):
|
||||||
|
def setup_in_new_thread():
|
||||||
|
asyncio.ensure_future(dummy_node.setup_crypto_networking())
|
||||||
|
return setup_in_new_thread
|
||||||
|
|
||||||
|
async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
|
||||||
|
"""
|
||||||
|
Helper function to allow for easy construction of custom tests for dummy account nodes
|
||||||
|
in various network topologies
|
||||||
|
:param num_nodes: number of nodes in the test
|
||||||
|
:param adjacency_map: adjacency map defining each node and its list of neighbors
|
||||||
|
:param action_func: function to execute that includes actions by the nodes,
|
||||||
|
such as send crypto and set crypto
|
||||||
|
:param assertion_func: assertions for testing the results of the actions are correct
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create nodes
|
||||||
|
dummy_nodes = []
|
||||||
|
for i in range(num_nodes):
|
||||||
|
dummy_nodes.append(await MsgOrderingNode.create())
|
||||||
|
|
||||||
|
# Create network
|
||||||
|
for source_num in adjacency_map:
|
||||||
|
target_nums = adjacency_map[source_num]
|
||||||
|
for target_num in target_nums:
|
||||||
|
await connect(dummy_nodes[source_num].libp2p_node, \
|
||||||
|
dummy_nodes[target_num].libp2p_node)
|
||||||
|
|
||||||
|
# Allow time for network creation to take place
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
# Start a thread for each node so that each node can listen and respond
|
||||||
|
# to messages on its own thread, which will avoid waiting indefinitely
|
||||||
|
# on the main thread. On this thread, call the setup func for the node,
|
||||||
|
# which subscribes the node to the CRYPTO_TOPIC topic
|
||||||
|
for dummy_node in dummy_nodes:
|
||||||
|
thread = Thread(target=create_setup_in_new_thread_func(dummy_node))
|
||||||
|
thread.run()
|
||||||
|
|
||||||
|
# Allow time for nodes to subscribe to CRYPTO_TOPIC topic
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
# Perform action function
|
||||||
|
await action_func(dummy_nodes)
|
||||||
|
|
||||||
|
# Allow time for action function to be performed (i.e. messages to propogate)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Perform assertion function
|
||||||
|
for dummy_node in dummy_nodes:
|
||||||
|
await assertion_func(dummy_node)
|
||||||
|
|
||||||
|
# Success, terminate pending tasks.
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes_one_word():
|
||||||
|
num_nodes = 2
|
||||||
|
adj_map = {0: [1]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_bee_movie_word("aspyn")
|
||||||
|
# await asyncio.sleep(0.25)
|
||||||
|
await dummy_nodes[0].publish_bee_movie_word("hello")
|
||||||
|
# await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
async def assertion_func(dummy_node):
|
||||||
|
next_word = await dummy_node.get_next_word_in_bee_movie()
|
||||||
|
assert next_word == "aspyn"
|
||||||
|
next_word = await dummy_node.get_next_word_in_bee_movie()
|
||||||
|
assert next_word == "hello"
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes_ten_words():
|
||||||
|
num_nodes = 2
|
||||||
|
adj_map = {0: [1]}
|
||||||
|
|
||||||
|
words = ["aspyn", "is", "so", "good", "at", "writing", "code", "XD", ":)", "foobar"]
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
for word in words:
|
||||||
|
await dummy_nodes[0].publish_bee_movie_word(word)
|
||||||
|
# await asyncio.sleep(0.25)
|
||||||
|
|
||||||
|
async def assertion_func(dummy_node):
|
||||||
|
for word in words:
|
||||||
|
assert await dummy_node.get_next_word_in_bee_movie() == word
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes_two_words_out_of_order_ids():
|
||||||
|
num_nodes = 2
|
||||||
|
adj_map = {0: [1]}
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
await dummy_nodes[0].publish_bee_movie_word("word 2", struct.pack('>I', 2))
|
||||||
|
word, _, _ = await asyncio.gather(dummy_nodes[0].get_next_word_in_bee_movie(),\
|
||||||
|
asyncio.sleep(0.25), \
|
||||||
|
dummy_nodes[0].publish_bee_movie_word("word 1", struct.pack('>I', 1)))
|
||||||
|
assert word == "word 1"
|
||||||
|
assert await dummy_nodes[0].get_next_word_in_bee_movie() == "word 2"
|
||||||
|
|
||||||
|
async def assertion_func(dummy_node):
|
||||||
|
pass
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes_two_words_read_then_publish_out_of_order_ids():
|
||||||
|
num_nodes = 2
|
||||||
|
adj_map = {0: [1]}
|
||||||
|
collected = None
|
||||||
|
|
||||||
|
async def collect_all_words(expected_len, dummy_node):
|
||||||
|
collected_words = []
|
||||||
|
while True:
|
||||||
|
word = await dummy_node.get_next_word_in_bee_movie()
|
||||||
|
collected_words.append(word)
|
||||||
|
if len(collected_words) == expected_len:
|
||||||
|
return collected_words
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
words, _, _, _ = await asyncio.gather(collect_all_words(2, dummy_nodes[0]),\
|
||||||
|
asyncio.sleep(0.25), \
|
||||||
|
dummy_nodes[0].publish_bee_movie_word("word 2", struct.pack('>I', 2)),\
|
||||||
|
dummy_nodes[0].publish_bee_movie_word("word 1", struct.pack('>I', 1)))
|
||||||
|
|
||||||
|
# Store collected words to be checked in assertion func
|
||||||
|
nonlocal collected
|
||||||
|
collected = words
|
||||||
|
|
||||||
|
async def assertion_func(dummy_node):
|
||||||
|
assert collected[0] == "word 1"
|
||||||
|
assert collected[1] == "word 2"
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_simple_two_nodes_ten_words_out_of_order_ids():
|
||||||
|
num_nodes = 2
|
||||||
|
adj_map = {0: [1]}
|
||||||
|
collected = None
|
||||||
|
|
||||||
|
async def collect_all_words(expected_len, dummy_node):
|
||||||
|
collected_words = []
|
||||||
|
while True:
|
||||||
|
word = await dummy_node.get_next_word_in_bee_movie()
|
||||||
|
collected_words.append(word)
|
||||||
|
if len(collected_words) == expected_len:
|
||||||
|
return collected_words
|
||||||
|
|
||||||
|
async def action_func(dummy_nodes):
|
||||||
|
words = ["e", "b", "d", "i", "a", "h", "c", "f", "g", "j"]
|
||||||
|
msg_id_nums = [5, 2, 4, 9, 1, 8, 3, 6, 7, 10]
|
||||||
|
msg_ids = []
|
||||||
|
tasks = []
|
||||||
|
for msg_id_num in msg_id_nums:
|
||||||
|
msg_ids.append(struct.pack('>I', msg_id_num))
|
||||||
|
|
||||||
|
tasks.append(collect_all_words(len(words), dummy_nodes[0]))
|
||||||
|
tasks.append(asyncio.sleep(0.25))
|
||||||
|
|
||||||
|
for i in range(len(words)):
|
||||||
|
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
|
||||||
|
|
||||||
|
res = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
nonlocal collected
|
||||||
|
collected = res[0]
|
||||||
|
|
||||||
|
async def assertion_func(dummy_node):
|
||||||
|
correct_words = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
|
||||||
|
for i in range(len(correct_words)):
|
||||||
|
assert collected[i] == correct_words[i]
|
||||||
|
assert len(collected) == len(correct_words)
|
||||||
|
|
||||||
|
await perform_test(num_nodes, adj_map, action_func, assertion_func)
|
Loading…
Reference in New Issue
Block a user