Modify pubsub to have seen message check incorporate seqno and node id

This commit is contained in:
Stuckinaboot 2019-04-02 22:34:01 -04:00
parent a47b02dd26
commit 91dd3c0230
3 changed files with 16 additions and 27 deletions

View File

@ -3,7 +3,6 @@ import uuid
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .read_only_queue import ReadOnlyQueue
class Pubsub(): class Pubsub():
@ -90,7 +89,7 @@ class Pubsub():
for message in rpc_incoming.publish: for message in rpc_incoming.publish:
if message.seqno not in self.seen_messages: if message.seqno not in self.seen_messages:
should_publish = True should_publish = True
self.seen_messages.append(message.seqno) self.seen_messages.append((message.seqno, message.from_id))
await self.handle_talk(message) await self.handle_talk(message)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
@ -189,9 +188,8 @@ class Pubsub():
if topic in self.my_topics: if topic in self.my_topics:
# we are subscribed to a topic this message was sent for, # we are subscribed to a topic this message was sent for,
# so add message to the subscription output queue # so add message to the subscription output queue
# for each topic with priority being the message's seqno. # for each topic
# Note: asyncio.PriorityQueue item format is (priority, data) await self.my_topics[topic].put(publish_message)
await self.my_topics[topic].put((publish_message.seqno, publish_message))
async def subscribe(self, topic_id): async def subscribe(self, topic_id):
""" """
@ -199,8 +197,8 @@ class Pubsub():
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
""" """
# Map topic_id to a priority blocking queue # Map topic_id to blocking queue
self.my_topics[topic_id] = asyncio.PriorityQueue() self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message # Create subscribe message
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
@ -219,8 +217,8 @@ class Pubsub():
# Tell router we are joining this topic # Tell router we are joining this topic
self.router.join(topic_id) self.router.join(topic_id)
# Return the readonly queue for messages on this topic # Return the asyncio queue for messages on this topic
return ReadOnlyQueue(self.my_topics[topic_id]) return self.my_topics[topic_id]
async def unsubscribe(self, topic_id): async def unsubscribe(self, topic_id):
""" """

View File

@ -1,14 +0,0 @@
import asyncio
class ReadOnlyQueue():
def __init__(self, queue):
self.queue = queue
async def get(self):
"""
Get the next item from queue, which has items in the format (priority, data)
:return: next item from the queue
"""
return (await self.queue.get())[1]

View File

@ -1,7 +1,8 @@
import asyncio import asyncio
import multiaddr import multiaddr
import uuid
from utils import generate_message_id, generate_RPC_packet from utils import message_id_generator, generate_RPC_packet
from libp2p import new_node from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
@ -25,6 +26,8 @@ class DummyAccountNode():
def __init__(self): def __init__(self):
self.balances = {} self.balances = {}
self.next_msg_id_func = message_id_generator(0)
self.node_id = str(uuid.uuid1())
@classmethod @classmethod
async def create(cls): async def create(cls):
@ -51,7 +54,7 @@ class DummyAccountNode():
Handle all incoming messages on the CRYPTO_TOPIC from peers Handle all incoming messages on the CRYPTO_TOPIC from peers
""" """
while True: while True:
incoming = await self.q.get() incoming = await self.q.get()
msg_comps = incoming.data.decode('utf-8').split(",") msg_comps = incoming.data.decode('utf-8').split(",")
if msg_comps[0] == "send": if msg_comps[0] == "send":
@ -77,7 +80,7 @@ class DummyAccountNode():
""" """
my_id = str(self.libp2p_node.get_id()) my_id = str(self.libp2p_node.get_id())
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func())
await self.floodsub.publish(my_id, packet.SerializeToString()) await self.floodsub.publish(my_id, packet.SerializeToString())
async def publish_set_crypto(self, user, amount): async def publish_set_crypto(self, user, amount):
@ -88,7 +91,7 @@ class DummyAccountNode():
""" """
my_id = str(self.libp2p_node.get_id()) my_id = str(self.libp2p_node.get_id())
msg_contents = "set," + user + "," + str(amount) msg_contents = "set," + user + "," + str(amount)
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func())
await self.floodsub.publish(my_id, packet.SerializeToString()) await self.floodsub.publish(my_id, packet.SerializeToString())
@ -99,6 +102,7 @@ class DummyAccountNode():
:param dest_user: user to send crypto to :param dest_user: user to send crypto to
:param amount: amount of crypto to send :param amount: amount of crypto to send
""" """
print("handle send " + self.node_id)
if source_user in self.balances: if source_user in self.balances:
self.balances[source_user] -= amount self.balances[source_user] -= amount
else: else:
@ -115,6 +119,7 @@ class DummyAccountNode():
:param dest_user: user to set crypto for :param dest_user: user to set crypto for
:param amount: amount of crypto :param amount: amount of crypto
""" """
print("handle set " + self.node_id)
self.balances[dest_user] = amount self.balances[dest_user] = amount
def get_balance(self, user): def get_balance(self, user):