diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index b1ee363..85abacb 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -61,7 +61,7 @@ class Pubsub(): """ Generate subscription message with all topics we are subscribed to """ - print("MAKING HELLO PACKET") + packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=str(self.host.get_id()).encode('utf-8'), @@ -74,7 +74,7 @@ class Pubsub(): return packet.SerializeToString() - async def continously_read_stream(self, stream): + async def continuously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process messages from other nodes, which for now are considered MessageTalk @@ -88,16 +88,12 @@ class Pubsub(): while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() - print("JUST GOT STRING") - print(incoming) rpc_incoming.ParseFromString(incoming) - print("JUST GOT") + print ("CONTINUOUSLY") print (rpc_incoming) - - if hasattr(rpc_incoming, 'publish'): + if rpc_incoming.publish: # deal with "talk messages" for msg in rpc_incoming.publish: - # TODO check what from_id is in go? is it origin or peer old_format = MessageTalk(peer_id, msg.from_id, msg.topicIDs, @@ -107,48 +103,16 @@ class Pubsub(): await self.handle_talk(old_format) await self.router.publish(peer_id, msg) - if hasattr(rpc_incoming, 'subscriptions'): + if rpc_incoming.subscriptions: # deal with "subscription messages" subs_map = {} for msg in rpc_incoming.subscriptions: if msg.subscribe: - subs_map[msg.topic_id] = "sub" + subs_map[msg.topicid] = "sub" else: - subs_map[msg.topic_id] = "unsub" - old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id()) - self.handle_subscription(old_format) + subs_map[msg.topicid] = "unsub" - # msg_type = msg_comps[0] - - # msg_sender = rpc_incoming.publish.from_id - # # msg_origin = msg_comps[2] - # msg_id = rpc_incoming.publish.seqno - # if msg_id not in self.seen_messages: - # # Do stuff with incoming unseen message - # should_publish = True - # if msg_type == "subscription": - # self.handle_subscription(incoming) - - # # We don't need to relay the subscription to our - # # peers because a given node only needs its peers - # # to know that it is subscribed to the topic (doesn't - # # need everyone to know) - # should_publish = False - # elif msg_type == "talk": - # await self.handle_talk(incoming) - - # Add message id to seen - # self.seen_messages.append(msg_id) - - # Publish message using router's publish - # if should_publish: - # msg = create_message_talk(incoming) - - # # Adjust raw_msg to that the message sender - # # is now our peer_id - # msg.from_id = str(self.host.get_id()) - - # await self.router.publish(msg_sender, msg.to_str()) + self.handle_subscription(rpc_incoming) # Force context switch await asyncio.sleep(0) @@ -167,10 +131,10 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - print(hello) + await stream.write(hello) # Pass stream off to stream reader - asyncio.ensure_future(self.continously_read_stream(stream)) + asyncio.ensure_future(self.continuously_read_stream(stream)) async def handle_peer_queue(self): """ @@ -197,30 +161,30 @@ class Pubsub(): await stream.write(hello) # Pass stream off to stream reader - asyncio.ensure_future(self.continously_read_stream(stream)) + asyncio.ensure_future(self.continuously_read_stream(stream)) # Force context switch await asyncio.sleep(0) - def handle_subscription(self, subscription): + def handle_subscription(self, rpc_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message :param subscription: raw data constituting a subscription message """ - sub_msg = subscription - if sub_msg.subs_map: - print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) - for topic_id in sub_msg.subs_map: + for sub_msg in rpc_message.subscriptions: + # Look at each subscription in the msg individually - if sub_msg.subs_map[topic_id]: - if topic_id not in self.peer_topics: + if sub_msg.subscribe: + origin_id = rpc_message.publish[0].from_id + + if sub_msg.topicid not in self.peer_topics: # Create topic list if it did not yet exist - self.peer_topics[topic_id] = [sub_msg.origin_id] - elif sub_msg.origin_id not in self.peer_topics[topic_id]: + self.peer_topics[sub_msg.topicid] = origin_id + elif orgin_id not in self.peer_topics[sub_msg.topicid]: # Add peer to topic - self.peer_topics[topic_id].append(sub_msg.origin_id) + self.peer_topics[sub_msg.topicid].append(origin_id) else: # TODO: Remove peer from topic pass @@ -250,13 +214,18 @@ class Pubsub(): self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message - sub_msg = MessageSub( - str(self.host.get_id()),\ - str(self.host.get_id()), {topic_id: True}, generate_message_id()\ - ) + packet = rpc_pb2.RPC() + packet.publish.extend([rpc_pb2.Message( + from_id=str(self.host.get_id()).encode('utf-8'), + seqno=str(generate_message_id()).encode('utf-8') + )]) + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe = True, + topicid = topic_id.encode('utf-8') + )]) # Send out subscribe message to all peers - await self.message_all_peers(sub_msg.to_str()) + await self.message_all_peers(packet.SerializeToString()) # Tell router we are joining this topic self.router.join(topic_id) @@ -275,27 +244,31 @@ class Pubsub(): del self.my_topics[topic_id] # Create unsubscribe message - unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\ - {topic_id: False}, generate_message_id()) + packet = rpc_pb2.RPC() + packet.publish.extend([rpc_pb2.Message( + from_id=str(self.host.get_id()).encode('utf-8'), + seqno=str(generate_message_id()).encode('utf-8') + )]) + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe = False, + topicid = topic_id.encode('utf-8') + )]) # Send out unsubscribe message to all peers - await self.message_all_peers(unsub_msg.to_str()) + await self.message_all_peers(packet.SerializeToString()) # Tell router we are leaving this topic self.router.leave(topic_id) - async def message_all_peers(self, raw_msg): + async def message_all_peers(self, rpc_msg): """ Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ - # Encode message for sending - encoded_msg = raw_msg.encode() - # Broadcast message for peer in self.peers: stream = self.peers[peer] # Write message to stream - await stream.write(encoded_msg) \ No newline at end of file + await stream.write(rpc_msg)