Add priority queues to handle seqno

This commit is contained in:
Stuckinaboot 2019-04-02 22:05:14 -04:00 committed by zixuanzh
parent 0190481e10
commit f6299c7dee
2 changed files with 22 additions and 6 deletions

View File

@ -2,6 +2,7 @@ import asyncio
from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee
from .read_only_queue import ReadOnlyQueue
class Pubsub():
@ -187,8 +188,9 @@ class Pubsub():
if topic in self.my_topics:
# we are subscribed to a topic this message was sent for,
# so add message to the subscription output queue
# for each topic
await self.my_topics[topic].put(publish_message)
# for each topic with priority being the message's seqno.
# Note: asyncio.PriorityQueue item format is (priority, data)
await self.my_topics[topic].put((publish_message.seqno, publish_message))
async def subscribe(self, topic_id):
"""
@ -196,8 +198,8 @@ class Pubsub():
:param topic_id: topic_id to subscribe to
"""
# Map topic_id to blocking queue
self.my_topics[topic_id] = asyncio.Queue()
# Map topic_id to a priority blocking queue
self.my_topics[topic_id] = asyncio.PriorityQueue()
# Create subscribe message
packet = rpc_pb2.RPC()
@ -212,8 +214,8 @@ class Pubsub():
# Tell router we are joining this topic
self.router.join(topic_id)
# Return the asyncio queue for messages on this topic
return self.my_topics[topic_id]
# Return the readonly queue for messages on this topic
return ReadOnlyQueue(self.my_topics[topic_id])
async def unsubscribe(self, topic_id):
"""

View File

@ -0,0 +1,14 @@
import asyncio
class ReadOnlyQueue():
def __init__(self, queue):
self.queue = queue
async def get(self):
"""
Get the next item from queue, which has items in the format (priority, data)
:return: next item from the queue
"""
return (await self.queue.get())[1]