From 71282678c4a1f79d91e38f17284c7bc6427f316f Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Tue, 2 Apr 2019 22:05:14 -0400 Subject: [PATCH] Add priority queues to handle seqno --- libp2p/pubsub/pubsub.py | 14 ++++++++------ libp2p/pubsub/read_only_queue.py | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) create mode 100644 libp2p/pubsub/read_only_queue.py diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index bfad873..37b8714 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -3,6 +3,7 @@ import uuid from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee +from .read_only_queue import ReadOnlyQueue class Pubsub(): @@ -188,8 +189,9 @@ class Pubsub(): if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue - # for each topic - await self.my_topics[topic].put(publish_message) + # for each topic with priority being the message's seqno. + # Note: asyncio.PriorityQueue item format is (priority, data) + await self.my_topics[topic].put((publish_message.seqno, publish_message)) async def subscribe(self, topic_id): """ @@ -197,8 +199,8 @@ class Pubsub(): :param topic_id: topic_id to subscribe to """ - # Map topic_id to blocking queue - self.my_topics[topic_id] = asyncio.Queue() + # Map topic_id to a priority blocking queue + self.my_topics[topic_id] = asyncio.PriorityQueue() # Create subscribe message packet = rpc_pb2.RPC() @@ -217,8 +219,8 @@ class Pubsub(): # Tell router we are joining this topic self.router.join(topic_id) - # Return the asyncio queue for messages on this topic - return self.my_topics[topic_id] + # Return the readonly queue for messages on this topic + return ReadOnlyQueue(self.my_topics[topic_id]) async def unsubscribe(self, topic_id): """ diff --git a/libp2p/pubsub/read_only_queue.py b/libp2p/pubsub/read_only_queue.py new file mode 100644 index 0000000..1656768 --- /dev/null +++ b/libp2p/pubsub/read_only_queue.py @@ -0,0 +1,14 @@ +import asyncio + +class ReadOnlyQueue(): + + def __init__(self, queue): + self.queue = queue + + async def get(self): + """ + Get the next item from queue, which has items in the format (priority, data) + :return: next item from the queue + """ + return (await self.queue.get())[1] +