remove message.py

This commit is contained in:
zixuanzh 2019-03-30 18:49:50 -04:00
parent aec783b843
commit f5af4b9016
3 changed files with 15 additions and 162 deletions

View File

@ -1,7 +1,6 @@
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
from .pb import rpc_pb2 from .pb import rpc_pb2
from .message import MessageSub, MessageTalk
from .message import create_message_talk
class FloodSub(IPubsubRouter): class FloodSub(IPubsubRouter):
@ -57,40 +56,14 @@ class FloodSub(IPubsubRouter):
:param message: message to forward :param message: message to forward
""" """
# Encode message
# encoded_msg = message.encode()
if isinstance(message, str):
msg_talk = create_message_talk(message)
message = rpc_pb2.Message(
from_id=str(msg_talk.origin_id).encode('utf-8'),
seqno=str(msg_talk.message_id).encode('utf-8'),
topicIDs=msg_talk.topics,
data=msg_talk.data.encode()
)
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
print("YEET")
print(type(message))
packet.publish.extend([message]) packet.publish.extend([message])
# Get message sender, origin, and topics
# msg_talk = create_message_talk(message)
msg_sender = str(sender_peer_id) msg_sender = str(sender_peer_id)
# msg_origin = msg_talk.origin_id
# topics = msg_talk.topics
# Deliver to self if self was origin # Deliver to self if self was origin
# Note: handle_talk checks if self is subscribed to topics in message # Note: handle_talk checks if self is subscribed to topics in message
if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()):
old_format = MessageTalk(sender_peer_id, await self.pubsub.handle_talk(sender_peer_id, message)
message.from_id,
message.topicIDs,
message.data,
message.seqno)
await self.pubsub.handle_talk(old_format)
# Deliver to self and peers # Deliver to self and peers
for topic in message.topicIDs: for topic in message.topicIDs:

View File

@ -1,118 +0,0 @@
import uuid
class MessageTalk():
"""
Object to make parsing talk messages easier, where talk messages are
defined as custom messages published to a set of topics
"""
# pylint: disable=too-few-public-methods
def __init__(self, from_id, origin_id, topics, data, message_id):
# pylint: disable=too-many-arguments
self.msg_type = "talk"
self.from_id = from_id
self.origin_id = origin_id
self.topics = topics
self.data = data
self.message_id = message_id
def to_str(self):
"""
Convert to string
:return: MessageTalk object in string representation
"""
out = self.msg_type + '\n'
out += self.from_id + '\n'
out += self.origin_id + '\n'
out += self.message_id + '\n'
for i in range(len(self.topics)):
out += self.topics[i]
if i < len(self.topics) - 1:
out += ','
out += '\n' + self.data
return out
class MessageSub():
"""
Object to make parsing subscription messages easier, where subscription
messages are defined as indicating the topics a node wishes to subscribe to
or unsubscribe from
"""
# pylint: disable=too-few-public-methods
def __init__(self, from_id, origin_id, subs_map, message_id):
self.msg_type = "subscription"
self.from_id = from_id
self.origin_id = origin_id
self.subs_map = subs_map
self.message_id = message_id
def to_str(self):
"""
Convert to string
:return: MessageSub object in string representation
"""
out = self.msg_type + '\n'
out += self.from_id + '\n'
out += self.origin_id + '\n'
out += self.message_id
if self.subs_map:
out += '\n'
keys = list(self.subs_map)
for i, topic in enumerate(keys):
sub = self.subs_map[topic]
if sub:
out += "sub:"
else:
out += "unsub:"
out += topic
if i < len(keys) - 1:
out += '\n'
return out
def create_message_talk(msg_talk_as_str):
"""
Create a MessageTalk object from a MessageTalk string representation
:param msg_talk_as_str: a MessageTalk object in its string representation
:return: MessageTalk object
"""
msg_comps = msg_talk_as_str.split('\n')
from_id = msg_comps[1]
origin_id = msg_comps[2]
message_id = msg_comps[3]
topics = msg_comps[4].split(',')
data = msg_comps[5]
return MessageTalk(from_id, origin_id, topics, data, message_id)
def create_message_sub(msg_sub_as_str):
"""
Create a MessageSub object from a MessageSub string representation
:param msg_talk_as_str: a MessageSub object in its string representation
:return: MessageSub object
"""
msg_comps = msg_sub_as_str.split('\n')
from_id = msg_comps[1]
origin_id = msg_comps[2]
message_id = msg_comps[3]
subs_map = {}
for i in range(4, len(msg_comps)):
sub_comps = msg_comps[i].split(":")
topic = sub_comps[1]
if sub_comps[0] == "sub":
subs_map[topic] = True
else:
subs_map[topic] = False
return MessageSub(from_id, origin_id, subs_map, message_id)
def generate_message_id():
"""
Generate a unique message id
:return: messgae id
"""
return str(uuid.uuid1())

View File

@ -1,11 +1,9 @@
import asyncio import asyncio
import uuid
from .pb import rpc_pb2_grpc from .pb import rpc_pb2_grpc
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .message import MessageSub, MessageTalk
from .message import create_message_talk, create_message_sub
from. message import generate_message_id
class Pubsub(): class Pubsub():
@ -77,8 +75,7 @@ class Pubsub():
async def continuously_read_stream(self, stream): async def continuously_read_stream(self, stream):
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk messages from other nodes
and MessageSub messages.
:param stream: stream to continously read from :param stream: stream to continously read from
""" """
@ -94,13 +91,8 @@ class Pubsub():
if rpc_incoming.publish: if rpc_incoming.publish:
# deal with "talk messages" # deal with "talk messages"
for msg in rpc_incoming.publish: for msg in rpc_incoming.publish:
old_format = MessageTalk(peer_id,
msg.from_id,
msg.topicIDs,
msg.data,
msg.seqno)
self.seen_messages.append(msg.seqno) self.seen_messages.append(msg.seqno)
await self.handle_talk(old_format) await self.handle_talk(peer_id, msg)
await self.router.publish(peer_id, msg) await self.router.publish(peer_id, msg)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
@ -189,21 +181,20 @@ class Pubsub():
# TODO: Remove peer from topic # TODO: Remove peer from topic
pass pass
async def handle_talk(self, talk): async def handle_talk(self, peer_id, publish_message):
""" """
Handle incoming Talk message from a peer. A Talk message contains some Handle incoming Talk message from a peer. A Talk message contains some
custom message that is published on a given topic(s) custom message that is published on a given topic(s)
:param talk: raw data constituting a talk message :param talk: raw data constituting a talk message
""" """
msg = talk
# Check if this message has any topics that we are subscribed to # Check if this message has any topics that we are subscribed to
for topic in msg.topics: for topic in publish_message.topicIDs:
if topic in self.my_topics: if topic in self.my_topics:
# we are subscribed to a topic this message was sent for, # we are subscribed to a topic this message was sent for,
# so add message to the subscription output queue # so add message to the subscription output queue
# for each topic # for each topic
await self.my_topics[topic].put(talk) await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id): async def subscribe(self, topic_id):
""" """
@ -272,3 +263,10 @@ class Pubsub():
# Write message to stream # Write message to stream
await stream.write(rpc_msg) await stream.write(rpc_msg)
def generate_message_id():
"""
Generate a unique message id
:return: messgae id
"""
return str(uuid.uuid1())