From f5af4b9016d791599ff61465b7e699d82774679a Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sat, 30 Mar 2019 18:49:50 -0400 Subject: [PATCH] remove message.py --- libp2p/pubsub/floodsub.py | 31 +--------- libp2p/pubsub/message.py | 118 -------------------------------------- libp2p/pubsub/pubsub.py | 28 +++++---- 3 files changed, 15 insertions(+), 162 deletions(-) delete mode 100644 libp2p/pubsub/message.py diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 1cea17e..dd4dfc1 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,7 +1,6 @@ from .pubsub_router_interface import IPubsubRouter from .pb import rpc_pb2 -from .message import MessageSub, MessageTalk -from .message import create_message_talk + class FloodSub(IPubsubRouter): @@ -57,40 +56,14 @@ class FloodSub(IPubsubRouter): :param message: message to forward """ - # Encode message - # encoded_msg = message.encode() - - if isinstance(message, str): - msg_talk = create_message_talk(message) - message = rpc_pb2.Message( - from_id=str(msg_talk.origin_id).encode('utf-8'), - seqno=str(msg_talk.message_id).encode('utf-8'), - topicIDs=msg_talk.topics, - data=msg_talk.data.encode() - - ) packet = rpc_pb2.RPC() - print("YEET") - print(type(message)) packet.publish.extend([message]) - - - - # Get message sender, origin, and topics - # msg_talk = create_message_talk(message) msg_sender = str(sender_peer_id) - # msg_origin = msg_talk.origin_id - # topics = msg_talk.topics # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): - old_format = MessageTalk(sender_peer_id, - message.from_id, - message.topicIDs, - message.data, - message.seqno) - await self.pubsub.handle_talk(old_format) + await self.pubsub.handle_talk(sender_peer_id, message) # Deliver to self and peers for topic in message.topicIDs: diff --git a/libp2p/pubsub/message.py b/libp2p/pubsub/message.py deleted file mode 100644 index 2f839dc..0000000 --- a/libp2p/pubsub/message.py +++ /dev/null @@ -1,118 +0,0 @@ -import uuid - - -class MessageTalk(): - - """ - Object to make parsing talk messages easier, where talk messages are - defined as custom messages published to a set of topics - """ - # pylint: disable=too-few-public-methods - def __init__(self, from_id, origin_id, topics, data, message_id): - # pylint: disable=too-many-arguments - self.msg_type = "talk" - self.from_id = from_id - self.origin_id = origin_id - self.topics = topics - self.data = data - self.message_id = message_id - - def to_str(self): - """ - Convert to string - :return: MessageTalk object in string representation - """ - out = self.msg_type + '\n' - out += self.from_id + '\n' - out += self.origin_id + '\n' - out += self.message_id + '\n' - for i in range(len(self.topics)): - out += self.topics[i] - if i < len(self.topics) - 1: - out += ',' - out += '\n' + self.data - return out - - -class MessageSub(): - """ - Object to make parsing subscription messages easier, where subscription - messages are defined as indicating the topics a node wishes to subscribe to - or unsubscribe from - """ - # pylint: disable=too-few-public-methods - def __init__(self, from_id, origin_id, subs_map, message_id): - self.msg_type = "subscription" - self.from_id = from_id - self.origin_id = origin_id - self.subs_map = subs_map - self.message_id = message_id - - def to_str(self): - """ - Convert to string - :return: MessageSub object in string representation - """ - out = self.msg_type + '\n' - out += self.from_id + '\n' - out += self.origin_id + '\n' - out += self.message_id - - if self.subs_map: - out += '\n' - - keys = list(self.subs_map) - - for i, topic in enumerate(keys): - sub = self.subs_map[topic] - if sub: - out += "sub:" - else: - out += "unsub:" - out += topic - if i < len(keys) - 1: - out += '\n' - - return out - -def create_message_talk(msg_talk_as_str): - """ - Create a MessageTalk object from a MessageTalk string representation - :param msg_talk_as_str: a MessageTalk object in its string representation - :return: MessageTalk object - """ - msg_comps = msg_talk_as_str.split('\n') - from_id = msg_comps[1] - origin_id = msg_comps[2] - message_id = msg_comps[3] - topics = msg_comps[4].split(',') - data = msg_comps[5] - return MessageTalk(from_id, origin_id, topics, data, message_id) - -def create_message_sub(msg_sub_as_str): - """ - Create a MessageSub object from a MessageSub string representation - :param msg_talk_as_str: a MessageSub object in its string representation - :return: MessageSub object - """ - msg_comps = msg_sub_as_str.split('\n') - from_id = msg_comps[1] - origin_id = msg_comps[2] - message_id = msg_comps[3] - - subs_map = {} - for i in range(4, len(msg_comps)): - sub_comps = msg_comps[i].split(":") - topic = sub_comps[1] - if sub_comps[0] == "sub": - subs_map[topic] = True - else: - subs_map[topic] = False - return MessageSub(from_id, origin_id, subs_map, message_id) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 85abacb..976e6b4 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,11 +1,9 @@ import asyncio +import uuid from .pb import rpc_pb2_grpc from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .message import MessageSub, MessageTalk -from .message import create_message_talk, create_message_sub -from. message import generate_message_id class Pubsub(): @@ -77,8 +75,7 @@ class Pubsub(): async def continuously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process - messages from other nodes, which for now are considered MessageTalk - and MessageSub messages. + messages from other nodes :param stream: stream to continously read from """ @@ -94,13 +91,8 @@ class Pubsub(): if rpc_incoming.publish: # deal with "talk messages" for msg in rpc_incoming.publish: - old_format = MessageTalk(peer_id, - msg.from_id, - msg.topicIDs, - msg.data, - msg.seqno) self.seen_messages.append(msg.seqno) - await self.handle_talk(old_format) + await self.handle_talk(peer_id, msg) await self.router.publish(peer_id, msg) if rpc_incoming.subscriptions: @@ -189,21 +181,20 @@ class Pubsub(): # TODO: Remove peer from topic pass - async def handle_talk(self, talk): + async def handle_talk(self, peer_id, publish_message): """ Handle incoming Talk message from a peer. A Talk message contains some custom message that is published on a given topic(s) :param talk: raw data constituting a talk message """ - msg = talk # Check if this message has any topics that we are subscribed to - for topic in msg.topics: + for topic in publish_message.topicIDs: if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue # for each topic - await self.my_topics[topic].put(talk) + await self.my_topics[topic].put(publish_message) async def subscribe(self, topic_id): """ @@ -272,3 +263,10 @@ class Pubsub(): # Write message to stream await stream.write(rpc_msg) + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) \ No newline at end of file