This commit is contained in:
Stuckinaboot 2019-05-08 20:08:27 -04:00
parent 4c2bf6873a
commit fa292ae7c8
4 changed files with 53 additions and 33 deletions

View File

@ -5,7 +5,7 @@ from tests.pubsub.utils import message_id_generator, generate_RPC_packet
from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from ordered_queue import OrderedQueue
from .ordered_queue import OrderedQueue
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
BEE_MOVIE_TOPIC = "bee_movie"
@ -17,12 +17,15 @@ class MsgOrderingNode():
self.balances = {}
self.next_msg_id_func = message_id_generator(0)
self.priority_queue = OrderedQueue()
# self.last_word_gotten_seqno = 0
self.libp2p_node = None
self.floodsub = None
self.pubsub = None
@classmethod
async def create(cls):
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
Create a new MsgOrderingNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
We use create as this serves as a factory function and allows us
@ -44,7 +47,7 @@ class MsgOrderingNode():
Handle all incoming messages on the BEE_MOVIE_TOPIC from peers
"""
while True:
incoming = await self.q.get()
incoming = await self.queue.get()
seqno = int.from_bytes(incoming.seqno, byteorder='big')
word = incoming.data.decode('utf-8')
@ -55,11 +58,12 @@ class MsgOrderingNode():
Subscribe to BEE_MOVIE_TOPIC and perform call to function that handles
all incoming messages on said topic
"""
self.q = await self.pubsub.subscribe(BEE_MOVIE_TOPIC)
self.queue = await self.pubsub.subscribe(BEE_MOVIE_TOPIC)
asyncio.ensure_future(self.handle_incoming_msgs())
async def publish_bee_movie_word(self, word, msg_id=None):
# Publish a bee movie word to all peers
my_id = str(self.libp2p_node.get_id())
if msg_id is None:
msg_id = self.next_msg_id_func()
@ -67,7 +71,7 @@ class MsgOrderingNode():
await self.floodsub.publish(my_id, packet.SerializeToString())
async def handle_bee_movie_word(self, seqno, word):
# print("Handle hit for " + str(seqno) + ", " + word)
# Handle bee movie word received
await self.priority_queue.put((seqno, word))
async def get_next_word_in_bee_movie(self):

View File

@ -1,10 +1,13 @@
import asyncio
"""
NOTE: ISSUE IS THAT THERE task IS BLOCKING SINCE IT IS WAITING ON SAME COROUTINE THAT
WE ARE RUNNING ON
"""
class OrderedQueue():
"""
asyncio.queue wrapper that delivers messages in order of subsequent sequence numbers,
so if message 1 and 3 are received and the following get calls occur:
get(), get(), get()
the queue will deliver message 1, will wait until message 2 is received to deliver message 2,
and then deliver message 3
"""
def __init__(self):
self.last_gotten_seqno = 0
@ -12,6 +15,9 @@ class OrderedQueue():
self.task = None
async def put(self, item):
"""
:param item: put item tuple (seqno, data) onto queue
"""
seqno = item[0]
await self.queue.put(item)
if self.last_gotten_seqno + 1 == seqno and self.task is not None:
@ -19,15 +25,18 @@ class OrderedQueue():
self.task.set()
async def get(self):
"""
Get item with last_gotten_seqno + 1 from the queue
:return: (seqno, data)
"""
if self.queue.qsize() > 0:
front_item = await self.queue.get()
if front_item[0] == self.last_gotten_seqno + 1:
self.last_gotten_seqno += 1
return front_item
else:
# Put element back as it should not be delivered yet
await self.queue.put(front_item)
# Put element back as it should not be delivered yet
await self.queue.put(front_item)
# Wait until item with subsequent seqno is put on queue
self.task = asyncio.Event()

View File

@ -1,19 +1,26 @@
import asyncio
import multiaddr
import pytest
from threading import Thread
import struct
import pytest
import urllib.request
from threading import Thread
from tests.utils import cleanup
from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from msg_ordering_node import MsgOrderingNode
from .msg_ordering_node import MsgOrderingNode
from tests.utils import cleanup
# pylint: disable=too-many-locals
"""
Test-cases demonstrating how to create nodes that continuously stream data
and ensure that data is delivered to each node with pre-determined ordering.
The ordering is such that if a peer A sends a publish 1 and 2 with seqno=1 and with seqno=2,
respectively, even if the publish 2 (with seqno=2) reaches the peers first, it will not
be processed until seqno=1 is received (and then publish 1 with seqno=1 must be
processed before publish 2 with seqno=2 will be).
This concept is demonstrated by streaming the script to the entire bee movie to several nodes
"""
async def connect(node1, node2):
# node1 connects to node2
addr = node2.get_addrs()[0]
@ -37,7 +44,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
# Create nodes
dummy_nodes = []
for i in range(num_nodes):
for _ in range(num_nodes):
dummy_nodes.append(await MsgOrderingNode.create())
# Create network
@ -148,7 +155,7 @@ async def test_simple_two_nodes_two_words_read_then_publish_out_of_order_ids():
asyncio.sleep(0.25), \
dummy_nodes[0].publish_bee_movie_word("word 2", struct.pack('>I', 2)),\
dummy_nodes[0].publish_bee_movie_word("word 1", struct.pack('>I', 1)))
# Store collected words to be checked in assertion func
nonlocal collected
collected = words
@ -185,10 +192,10 @@ async def test_simple_two_nodes_ten_words_out_of_order_ids():
tasks.append(asyncio.sleep(0.25))
for i in range(len(words)):
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
res = await asyncio.gather(*tasks)
nonlocal collected
collected = res[0]
@ -228,10 +235,10 @@ async def test_simple_five_nodes_rings_words_out_of_order_ids():
tasks.append(asyncio.sleep(0.25))
for i in range(len(words)):
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
res = await asyncio.gather(*tasks)
nonlocal collected
for i in range(num_nodes):
collected.append(res[i])
@ -272,10 +279,10 @@ async def test_simple_seven_nodes_tree_words_out_of_order_ids():
tasks.append(asyncio.sleep(0.25))
for i in range(len(words)):
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i], msg_ids[i]))
res = await asyncio.gather(*tasks)
nonlocal collected
for i in range(num_nodes):
collected.append(res[i])
@ -336,11 +343,11 @@ async def test_simple_two_nodes_bee_movie():
print("Add publish")
for i in range(len(words)):
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i]))
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i]))
print("Perform gather")
res = await asyncio.gather(*tasks)
print("Filling collected")
nonlocal collected
for i in range(num_nodes):
@ -398,11 +405,11 @@ async def test_simple_seven_nodes_tree_bee_movie():
print("Add publish")
for i in range(len(words)):
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i]))
tasks.append(dummy_nodes[0].publish_bee_movie_word(words[i]))
print("Perform gather")
res = await asyncio.gather(*tasks, return_exceptions=True)
print("Filling collected")
nonlocal collected
for i in range(num_nodes):