diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 68e542c..1cea17e 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,4 +1,6 @@ from .pubsub_router_interface import IPubsubRouter +from .pb import rpc_pb2 +from .message import MessageSub, MessageTalk from .message import create_message_talk class FloodSub(IPubsubRouter): @@ -56,28 +58,49 @@ class FloodSub(IPubsubRouter): """ # Encode message - encoded_msg = message.encode() + # encoded_msg = message.encode() + + if isinstance(message, str): + msg_talk = create_message_talk(message) + message = rpc_pb2.Message( + from_id=str(msg_talk.origin_id).encode('utf-8'), + seqno=str(msg_talk.message_id).encode('utf-8'), + topicIDs=msg_talk.topics, + data=msg_talk.data.encode() + + ) + packet = rpc_pb2.RPC() + print("YEET") + print(type(message)) + packet.publish.extend([message]) + + # Get message sender, origin, and topics - msg_talk = create_message_talk(message) + # msg_talk = create_message_talk(message) msg_sender = str(sender_peer_id) - msg_origin = msg_talk.origin_id - topics = msg_talk.topics + # msg_origin = msg_talk.origin_id + # topics = msg_talk.topics # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message - if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(message) + if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): + old_format = MessageTalk(sender_peer_id, + message.from_id, + message.topicIDs, + message.data, + message.seqno) + await self.pubsub.handle_talk(old_format) # Deliver to self and peers - for topic in topics: + for topic in message.topicIDs: if topic in self.pubsub.peer_topics: for peer_id_in_topic in self.pubsub.peer_topics[topic]: # Forward to all known peers in the topic that are not the # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, msg_origin): + if peer_id_in_topic not in (msg_sender, message.from_id): stream = self.pubsub.peers[peer_id_in_topic] - await stream.write(encoded_msg) + await stream.write(packet.SerializeToString()) else: # Implies publish did not write print("publish did not write") diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 17ecdd5..b1ee363 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -3,7 +3,7 @@ import asyncio from .pb import rpc_pb2_grpc from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .message import MessageSub +from .message import MessageSub, MessageTalk from .message import create_message_talk, create_message_sub from. message import generate_message_id @@ -61,6 +61,7 @@ class Pubsub(): """ Generate subscription message with all topics we are subscribed to """ + print("MAKING HELLO PACKET") packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=str(self.host.get_id()).encode('utf-8'), @@ -80,43 +81,74 @@ class Pubsub(): and MessageSub messages. :param stream: stream to continously read from """ + + # TODO check on types here + peer_id = stream.mplex_conn.peer_id + while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() + print("JUST GOT STRING") + print(incoming) rpc_incoming.ParseFromString(incoming) - print (rpc_incoming.publish) + print("JUST GOT") + print (rpc_incoming) + + if hasattr(rpc_incoming, 'publish'): + # deal with "talk messages" + for msg in rpc_incoming.publish: + # TODO check what from_id is in go? is it origin or peer + old_format = MessageTalk(peer_id, + msg.from_id, + msg.topicIDs, + msg.data, + msg.seqno) + self.seen_messages.append(msg.seqno) + await self.handle_talk(old_format) + await self.router.publish(peer_id, msg) + + if hasattr(rpc_incoming, 'subscriptions'): + # deal with "subscription messages" + subs_map = {} + for msg in rpc_incoming.subscriptions: + if msg.subscribe: + subs_map[msg.topic_id] = "sub" + else: + subs_map[msg.topic_id] = "unsub" + old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id()) + self.handle_subscription(old_format) # msg_type = msg_comps[0] - msg_sender = rpc_incoming.publish.from_id - # msg_origin = msg_comps[2] - msg_id = rpc_incoming.publish.seqno - if msg_id not in self.seen_messages: - # Do stuff with incoming unseen message - should_publish = True - if msg_type == "subscription": - self.handle_subscription(incoming) + # msg_sender = rpc_incoming.publish.from_id + # # msg_origin = msg_comps[2] + # msg_id = rpc_incoming.publish.seqno + # if msg_id not in self.seen_messages: + # # Do stuff with incoming unseen message + # should_publish = True + # if msg_type == "subscription": + # self.handle_subscription(incoming) - # We don't need to relay the subscription to our - # peers because a given node only needs its peers - # to know that it is subscribed to the topic (doesn't - # need everyone to know) - should_publish = False - elif msg_type == "talk": - await self.handle_talk(incoming) + # # We don't need to relay the subscription to our + # # peers because a given node only needs its peers + # # to know that it is subscribed to the topic (doesn't + # # need everyone to know) + # should_publish = False + # elif msg_type == "talk": + # await self.handle_talk(incoming) # Add message id to seen - self.seen_messages.append(msg_id) + # self.seen_messages.append(msg_id) # Publish message using router's publish - if should_publish: - msg = create_message_talk(incoming) + # if should_publish: + # msg = create_message_talk(incoming) - # Adjust raw_msg to that the message sender - # is now our peer_id - msg.from_id = str(self.host.get_id()) + # # Adjust raw_msg to that the message sender + # # is now our peer_id + # msg.from_id = str(self.host.get_id()) - await self.router.publish(msg_sender, msg.to_str()) + # await self.router.publish(msg_sender, msg.to_str()) # Force context switch await asyncio.sleep(0) @@ -135,6 +167,7 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() + print(hello) await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continously_read_stream(stream)) @@ -176,7 +209,7 @@ class Pubsub(): defined in the subscription message :param subscription: raw data constituting a subscription message """ - sub_msg = create_message_sub(subscription) + sub_msg = subscription if sub_msg.subs_map: print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) for topic_id in sub_msg.subs_map: @@ -198,7 +231,7 @@ class Pubsub(): custom message that is published on a given topic(s) :param talk: raw data constituting a talk message """ - msg = create_message_talk(talk) + msg = talk # Check if this message has any topics that we are subscribed to for topic in msg.topics: