diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 9bd072f..17ecdd5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,5 +1,7 @@ import asyncio +from .pb import rpc_pb2_grpc +from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee from .message import MessageSub from .message import create_message_talk, create_message_sub @@ -7,37 +9,6 @@ from. message import generate_message_id class Pubsub(): - """ - For now, because I'm on a plane and don't have access to the go repo/protobuf stuff, - this is going to be the message format for the two types: subscription and talk - subscription indicates subscribing or unsubscribing from a topic - talk is sending a message on topic(s) - subscription format: - subscription - 'from' - :'topicid' - :'topicid' - ... - Ex. - subscription - msg_sender_peer_id - origin_peer_id - sub:topic1 - sub:topic2 - unsub:fav_topic - talk format: - talk - 'from' - 'origin' - [topic_ids comma-delimited] - 'data' - Ex. - talk - msg_sender_peer_id - origin_peer_id - topic1,topics_are_cool,foo - I like tacos - """ # pylint: disable=too-many-instance-attributes def __init__(self, host, router, my_id): @@ -90,34 +61,37 @@ class Pubsub(): """ Generate subscription message with all topics we are subscribed to """ - subs_map = {} - for topic in self.my_topics: - subs_map[topic] = True - sub_msg = MessageSub( - str(self.host.get_id()),\ - str(self.host.get_id()), subs_map, generate_message_id()\ - ) - return sub_msg.to_str() + packet = rpc_pb2.RPC() + message = rpc_pb2.Message( + from_id=str(self.host.get_id()).encode('utf-8'), + seqno=str(generate_message_id()).encode('utf-8') + ) + packet.publish.extend([message]) + for topic_id in self.my_topics: + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, topicid=topic_id)]) + + return packet.SerializeToString() async def continously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process messages from other nodes, which for now are considered MessageTalk and MessageSub messages. - TODO: Handle RPC messages instead of my Aspyn's own custom message format :param stream: stream to continously read from """ while True: - incoming = (await stream.read()).decode() - msg_comps = incoming.split('\n') - msg_type = msg_comps[0] + incoming = (await stream.read()) + rpc_incoming = rpc_pb2.RPC() + rpc_incoming.ParseFromString(incoming) + print (rpc_incoming.publish) - msg_sender = msg_comps[1] + # msg_type = msg_comps[0] + + msg_sender = rpc_incoming.publish.from_id # msg_origin = msg_comps[2] - msg_id = msg_comps[3] - print("HIT ME1") + msg_id = rpc_incoming.publish.seqno if msg_id not in self.seen_messages: - print("HIT ME") # Do stuff with incoming unseen message should_publish = True if msg_type == "subscription": @@ -161,7 +135,7 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.encode()) + await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continously_read_stream(stream)) @@ -187,7 +161,7 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.encode()) + await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continously_read_stream(stream)) @@ -291,4 +265,4 @@ class Pubsub(): stream = self.peers[peer] # Write message to stream - await stream.write(encoded_msg) + await stream.write(encoded_msg) \ No newline at end of file