reworked floodsub logic

This commit is contained in:
zixuanzh 2019-03-31 22:16:28 -04:00 committed by Stuckinaboot
parent 1cbd909dc2
commit 87269a9524
2 changed files with 74 additions and 55 deletions

View File

@ -41,7 +41,7 @@ class FloodSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
async def publish(self, sender_peer_id, message): async def publish(self, sender_peer_id, rpc_message):
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens This is where the "flooding" part of floodsub happens
@ -53,33 +53,41 @@ class FloodSub(IPubsubRouter):
It also never forwards a message back to the source It also never forwards a message back to the source
or the peer that forwarded the message. or the peer that forwarded the message.
:param sender_peer_id: peer_id of message sender :param sender_peer_id: peer_id of message sender
:param message: message to forward :param rpc_message: pubsub message in RPC string format
""" """
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
packet.ParseFromString(rpc_message)
print ("IN FLOOODSUB PUBLISH") print ("IN FLOOODSUB PUBLISH")
print (message) print (packet)
print ("++++++++++++++++") print ("++++++++++++++++")
packet.publish.extend([message])
msg_sender = str(sender_peer_id) msg_sender = str(sender_peer_id)
# Deliver to self if self was origin # Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message # Note: handle_talk checks if self is subscribed to topics in message
if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): for message in packet.publish:
await self.pubsub.handle_talk(sender_peer_id, message) if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()):
await self.pubsub.handle_talk(sender_peer_id, message)
# Deliver to self and peers
for topic in message.topicIDs: print ("OHOHOHOH")
if topic in self.pubsub.peer_topics: print (self.pubsub.peer_topics)
for peer_id_in_topic in self.pubsub.peer_topics[topic]: print ("UUUJUJUJ")
# Forward to all known peers in the topic that are not the print (self.pubsub.peers)
# message sender and are not the message origin print ("********")
if peer_id_in_topic not in (msg_sender, message.from_id): # Deliver to self and peers
stream = self.pubsub.peers[peer_id_in_topic] for topic in message.topicIDs:
await stream.write(packet.SerializeToString()) if topic in self.pubsub.peer_topics:
else: for peer_id_in_topic in self.pubsub.peer_topics[topic]:
# Implies publish did not write # Forward to all known peers in the topic that are not the
print("publish did not write") # message sender and are not the message origin
print ("PEERID")
print (peer_id_in_topic)
if peer_id_in_topic not in (msg_sender, message.from_id):
stream = self.pubsub.peers[peer_id_in_topic]
await stream.write(packet.SerializeToString())
else:
# Implies publish did not write
print("publish did not write")
def join(self, topic): def join(self, topic):
""" """

View File

@ -80,31 +80,43 @@ class Pubsub():
""" """
# TODO check on types here # TODO check on types here
peer_id = stream.mplex_conn.peer_id peer_id = str(stream.mplex_conn.peer_id)
while True: while True:
incoming = (await stream.read()) incoming = (await stream.read())
rpc_incoming = rpc_pb2.RPC() rpc_incoming = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming) rpc_incoming.ParseFromString(incoming)
print ("CONTINUOUSLY")
print ("IN PUBSUB CONTINUOUSLY READ")
print (rpc_incoming) print (rpc_incoming)
print ("###########################")
should_publish = True
if rpc_incoming.publish: if rpc_incoming.publish:
# deal with "talk messages" # deal with RPC.publish
for msg in rpc_incoming.publish: for message in rpc_incoming.publish:
self.seen_messages.append(msg.seqno) self.seen_messages.append(message.seqno)
await self.handle_talk(peer_id, msg) await self.handle_talk(peer_id, message)
await self.router.publish(peer_id, msg)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
# deal with "subscription messages" # deal with RPC.subscriptions
subs_map = {} # We don't need to relay the subscription to our
for msg in rpc_incoming.subscriptions: # peers because a given node only needs its peers
if msg.subscribe: # to know that it is subscribed to the topic (doesn't
subs_map[msg.topicid] = "sub" # need everyone to know)
else: should_publish = False
subs_map[msg.topicid] = "unsub"
self.handle_subscription(rpc_incoming) # TODO check that peer_id is the same as origin_id
from_id = str(rpc_incoming.publish[0].from_id.decode('utf-8'))
for message in rpc_incoming.subscriptions:
if message.subscribe:
self.handle_subscription(from_id, message)
if should_publish:
# relay message to peers with router
await self.router.publish(peer_id, incoming)
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
@ -136,7 +148,10 @@ class Pubsub():
pubsub protocols we support pubsub protocols we support
""" """
while True: while True:
print ("PUBSUB HANDLE PEER QUEUE")
peer_id = await self.peer_queue.get() peer_id = await self.peer_queue.get()
print (peer_id)
print ("++++++++++++++++++++++++")
# Open a stream to peer on existing connection # Open a stream to peer on existing connection
# (we know connection exists since that's the only way # (we know connection exists since that's the only way
@ -158,34 +173,30 @@ class Pubsub():
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
def handle_subscription(self, rpc_message): def handle_subscription(self, peer_id, sub_message):
""" """
Handle an incoming subscription message from a peer. Update internal Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as mapping to mark the peer as subscribed or unsubscribed to topics as
defined in the subscription message defined in the subscription message
:param subscription: raw data constituting a subscription message :param origin_id: id of the peer who subscribe to the message
:param sub_message: RPC.SubOpts
""" """
for sub_msg in rpc_message.subscriptions: # TODO verify logic here
if sub_message.subscribe:
# Look at each subscription in the msg individually if sub_message.topicid not in self.peer_topics:
if sub_msg.subscribe: self.peer_topics[sub_message.topicid] = [peer_id]
origin_id = rpc_message.publish[0].from_id elif peer_id not in self.peer_topics[sub_message.topicid]:
# Add peer to topic
if sub_msg.topicid not in self.peer_topics: self.peer_topics[sub_message.topicid].append(peer_id)
# Create topic list if it did not yet exist else:
self.peer_topics[sub_msg.topicid] = origin_id # TODO: Remove peer from topic
elif orgin_id not in self.peer_topics[sub_msg.topicid]: pass
# Add peer to topic
self.peer_topics[sub_msg.topicid].append(origin_id)
else:
# TODO: Remove peer from topic
pass
async def handle_talk(self, peer_id, publish_message): async def handle_talk(self, peer_id, publish_message):
""" """
Handle incoming Talk message from a peer. A Talk message contains some Put incoming message from a peer onto my blocking queue
custom message that is published on a given topic(s) :param peer_id: peer id whom forwarded this message
:param talk: raw data constituting a talk message :param talk: RPC.Message format
""" """
# Check if this message has any topics that we are subscribed to # Check if this message has any topics that we are subscribed to
@ -269,4 +280,4 @@ def generate_message_id():
Generate a unique message id Generate a unique message id
:return: messgae id :return: messgae id
""" """
return str(uuid.uuid1()) return str(uuid.uuid1())