Add priority queues to handle seqno

This commit is contained in:
Stuckinaboot 2019-04-02 22:05:14 -04:00
parent 41d1aae55b
commit 71282678c4
2 changed files with 22 additions and 6 deletions

View File

@ -3,6 +3,7 @@ import uuid
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .read_only_queue import ReadOnlyQueue
class Pubsub(): class Pubsub():
@ -188,8 +189,9 @@ class Pubsub():
if topic in self.my_topics: if topic in self.my_topics:
# we are subscribed to a topic this message was sent for, # we are subscribed to a topic this message was sent for,
# so add message to the subscription output queue # so add message to the subscription output queue
# for each topic # for each topic with priority being the message's seqno.
await self.my_topics[topic].put(publish_message) # Note: asyncio.PriorityQueue item format is (priority, data)
await self.my_topics[topic].put((publish_message.seqno, publish_message))
async def subscribe(self, topic_id): async def subscribe(self, topic_id):
""" """
@ -197,8 +199,8 @@ class Pubsub():
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
""" """
# Map topic_id to blocking queue # Map topic_id to a priority blocking queue
self.my_topics[topic_id] = asyncio.Queue() self.my_topics[topic_id] = asyncio.PriorityQueue()
# Create subscribe message # Create subscribe message
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
@ -217,8 +219,8 @@ class Pubsub():
# Tell router we are joining this topic # Tell router we are joining this topic
self.router.join(topic_id) self.router.join(topic_id)
# Return the asyncio queue for messages on this topic # Return the readonly queue for messages on this topic
return self.my_topics[topic_id] return ReadOnlyQueue(self.my_topics[topic_id])
async def unsubscribe(self, topic_id): async def unsubscribe(self, topic_id):
""" """

View File

@ -0,0 +1,14 @@
import asyncio
class ReadOnlyQueue():
def __init__(self, queue):
self.queue = queue
async def get(self):
"""
Get the next item from queue, which has items in the format (priority, data)
:return: next item from the queue
"""
return (await self.queue.get())[1]