Modify pubsub to have seen message check incorporate seqno and node id
This commit is contained in:
parent
c2b5389362
commit
9d16aa834d
|
@ -2,7 +2,6 @@ import asyncio
|
||||||
|
|
||||||
from .pb import rpc_pb2
|
from .pb import rpc_pb2
|
||||||
from .pubsub_notifee import PubsubNotifee
|
from .pubsub_notifee import PubsubNotifee
|
||||||
from .read_only_queue import ReadOnlyQueue
|
|
||||||
|
|
||||||
|
|
||||||
class Pubsub():
|
class Pubsub():
|
||||||
|
@ -89,7 +88,7 @@ class Pubsub():
|
||||||
for message in rpc_incoming.publish:
|
for message in rpc_incoming.publish:
|
||||||
if message.seqno not in self.seen_messages:
|
if message.seqno not in self.seen_messages:
|
||||||
should_publish = True
|
should_publish = True
|
||||||
self.seen_messages.append(message.seqno)
|
self.seen_messages.append((message.seqno, message.from_id))
|
||||||
await self.handle_talk(message)
|
await self.handle_talk(message)
|
||||||
|
|
||||||
if rpc_incoming.subscriptions:
|
if rpc_incoming.subscriptions:
|
||||||
|
@ -188,9 +187,8 @@ class Pubsub():
|
||||||
if topic in self.my_topics:
|
if topic in self.my_topics:
|
||||||
# we are subscribed to a topic this message was sent for,
|
# we are subscribed to a topic this message was sent for,
|
||||||
# so add message to the subscription output queue
|
# so add message to the subscription output queue
|
||||||
# for each topic with priority being the message's seqno.
|
# for each topic
|
||||||
# Note: asyncio.PriorityQueue item format is (priority, data)
|
await self.my_topics[topic].put(publish_message)
|
||||||
await self.my_topics[topic].put((publish_message.seqno, publish_message))
|
|
||||||
|
|
||||||
async def subscribe(self, topic_id):
|
async def subscribe(self, topic_id):
|
||||||
"""
|
"""
|
||||||
|
@ -198,8 +196,8 @@ class Pubsub():
|
||||||
:param topic_id: topic_id to subscribe to
|
:param topic_id: topic_id to subscribe to
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Map topic_id to a priority blocking queue
|
# Map topic_id to blocking queue
|
||||||
self.my_topics[topic_id] = asyncio.PriorityQueue()
|
self.my_topics[topic_id] = asyncio.Queue()
|
||||||
|
|
||||||
# Create subscribe message
|
# Create subscribe message
|
||||||
packet = rpc_pb2.RPC()
|
packet = rpc_pb2.RPC()
|
||||||
|
@ -214,8 +212,8 @@ class Pubsub():
|
||||||
# Tell router we are joining this topic
|
# Tell router we are joining this topic
|
||||||
self.router.join(topic_id)
|
self.router.join(topic_id)
|
||||||
|
|
||||||
# Return the readonly queue for messages on this topic
|
# Return the asyncio queue for messages on this topic
|
||||||
return ReadOnlyQueue(self.my_topics[topic_id])
|
return self.my_topics[topic_id]
|
||||||
|
|
||||||
async def unsubscribe(self, topic_id):
|
async def unsubscribe(self, topic_id):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -1,14 +0,0 @@
|
||||||
import asyncio
|
|
||||||
|
|
||||||
class ReadOnlyQueue():
|
|
||||||
|
|
||||||
def __init__(self, queue):
|
|
||||||
self.queue = queue
|
|
||||||
|
|
||||||
async def get(self):
|
|
||||||
"""
|
|
||||||
Get the next item from queue, which has items in the format (priority, data)
|
|
||||||
:return: next item from the queue
|
|
||||||
"""
|
|
||||||
return (await self.queue.get())[1]
|
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import multiaddr
|
import multiaddr
|
||||||
|
import uuid
|
||||||
|
|
||||||
from utils import generate_message_id, generate_RPC_packet
|
from utils import message_id_generator, generate_RPC_packet
|
||||||
from libp2p import new_node
|
from libp2p import new_node
|
||||||
from libp2p.pubsub.pubsub import Pubsub
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
from libp2p.pubsub.floodsub import FloodSub
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
@ -25,6 +26,8 @@ class DummyAccountNode():
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.balances = {}
|
self.balances = {}
|
||||||
|
self.next_msg_id_func = message_id_generator(0)
|
||||||
|
self.node_id = str(uuid.uuid1())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(cls):
|
async def create(cls):
|
||||||
|
@ -51,7 +54,7 @@ class DummyAccountNode():
|
||||||
Handle all incoming messages on the CRYPTO_TOPIC from peers
|
Handle all incoming messages on the CRYPTO_TOPIC from peers
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
incoming = await self.q.get()
|
incoming = await self.q.get()
|
||||||
msg_comps = incoming.data.decode('utf-8').split(",")
|
msg_comps = incoming.data.decode('utf-8').split(",")
|
||||||
|
|
||||||
if msg_comps[0] == "send":
|
if msg_comps[0] == "send":
|
||||||
|
@ -77,7 +80,7 @@ class DummyAccountNode():
|
||||||
"""
|
"""
|
||||||
my_id = str(self.libp2p_node.get_id())
|
my_id = str(self.libp2p_node.get_id())
|
||||||
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
|
msg_contents = "send," + source_user + "," + dest_user + "," + str(amount)
|
||||||
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
|
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func())
|
||||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||||
|
|
||||||
async def publish_set_crypto(self, user, amount):
|
async def publish_set_crypto(self, user, amount):
|
||||||
|
@ -88,7 +91,7 @@ class DummyAccountNode():
|
||||||
"""
|
"""
|
||||||
my_id = str(self.libp2p_node.get_id())
|
my_id = str(self.libp2p_node.get_id())
|
||||||
msg_contents = "set," + user + "," + str(amount)
|
msg_contents = "set," + user + "," + str(amount)
|
||||||
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id())
|
packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func())
|
||||||
|
|
||||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||||
|
|
||||||
|
@ -99,6 +102,7 @@ class DummyAccountNode():
|
||||||
:param dest_user: user to send crypto to
|
:param dest_user: user to send crypto to
|
||||||
:param amount: amount of crypto to send
|
:param amount: amount of crypto to send
|
||||||
"""
|
"""
|
||||||
|
print("handle send " + self.node_id)
|
||||||
if source_user in self.balances:
|
if source_user in self.balances:
|
||||||
self.balances[source_user] -= amount
|
self.balances[source_user] -= amount
|
||||||
else:
|
else:
|
||||||
|
@ -115,6 +119,7 @@ class DummyAccountNode():
|
||||||
:param dest_user: user to set crypto for
|
:param dest_user: user to set crypto for
|
||||||
:param amount: amount of crypto
|
:param amount: amount of crypto
|
||||||
"""
|
"""
|
||||||
|
print("handle set " + self.node_id)
|
||||||
self.balances[dest_user] = amount
|
self.balances[dest_user] = amount
|
||||||
|
|
||||||
def get_balance(self, user):
|
def get_balance(self, user):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user