diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5f018d7..aec04de 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -2,7 +2,6 @@ import asyncio from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .read_only_queue import ReadOnlyQueue class Pubsub(): @@ -89,7 +88,7 @@ class Pubsub(): for message in rpc_incoming.publish: if message.seqno not in self.seen_messages: should_publish = True - self.seen_messages.append(message.seqno) + self.seen_messages.append((message.seqno, message.from_id)) await self.handle_talk(message) if rpc_incoming.subscriptions: @@ -188,9 +187,8 @@ class Pubsub(): if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue - # for each topic with priority being the message's seqno. - # Note: asyncio.PriorityQueue item format is (priority, data) - await self.my_topics[topic].put((publish_message.seqno, publish_message)) + # for each topic + await self.my_topics[topic].put(publish_message) async def subscribe(self, topic_id): """ @@ -198,8 +196,8 @@ class Pubsub(): :param topic_id: topic_id to subscribe to """ - # Map topic_id to a priority blocking queue - self.my_topics[topic_id] = asyncio.PriorityQueue() + # Map topic_id to blocking queue + self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message packet = rpc_pb2.RPC() @@ -214,8 +212,8 @@ class Pubsub(): # Tell router we are joining this topic self.router.join(topic_id) - # Return the readonly queue for messages on this topic - return ReadOnlyQueue(self.my_topics[topic_id]) + # Return the asyncio queue for messages on this topic + return self.my_topics[topic_id] async def unsubscribe(self, topic_id): """ diff --git a/libp2p/pubsub/read_only_queue.py b/libp2p/pubsub/read_only_queue.py deleted file mode 100644 index 1656768..0000000 --- a/libp2p/pubsub/read_only_queue.py +++ /dev/null @@ -1,14 +0,0 @@ -import asyncio - -class ReadOnlyQueue(): - - def __init__(self, queue): - self.queue = queue - - async def get(self): - """ - Get the next item from queue, which has items in the format (priority, data) - :return: next item from the queue - """ - return (await self.queue.get())[1] - diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 05a02bf..8fb7310 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,7 +1,8 @@ import asyncio import multiaddr +import uuid -from utils import generate_message_id, generate_RPC_packet +from utils import message_id_generator, generate_RPC_packet from libp2p import new_node from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub @@ -25,6 +26,8 @@ class DummyAccountNode(): def __init__(self): self.balances = {} + self.next_msg_id_func = message_id_generator(0) + self.node_id = str(uuid.uuid1()) @classmethod async def create(cls): @@ -51,7 +54,7 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - incoming = await self.q.get() + incoming = await self.q.get() msg_comps = incoming.data.decode('utf-8').split(",") if msg_comps[0] == "send": @@ -77,7 +80,7 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) async def publish_set_crypto(self, user, amount): @@ -88,7 +91,7 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) @@ -99,6 +102,7 @@ class DummyAccountNode(): :param dest_user: user to send crypto to :param amount: amount of crypto to send """ + print("handle send " + self.node_id) if source_user in self.balances: self.balances[source_user] -= amount else: @@ -115,6 +119,7 @@ class DummyAccountNode(): :param dest_user: user to set crypto for :param amount: amount of crypto """ + print("handle set " + self.node_id) self.balances[dest_user] = amount def get_balance(self, user):