diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 8b9ba1c..d4736d0 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -41,7 +41,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, sender_peer_id, message): + async def publish(self, sender_peer_id, rpc_message): """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens @@ -53,33 +53,41 @@ class FloodSub(IPubsubRouter): It also never forwards a message back to the source or the peer that forwarded the message. :param sender_peer_id: peer_id of message sender - :param message: message to forward + :param rpc_message: pubsub message in RPC string format """ packet = rpc_pb2.RPC() + packet.ParseFromString(rpc_message) print ("IN FLOOODSUB PUBLISH") - print (message) + print (packet) print ("++++++++++++++++") - packet.publish.extend([message]) msg_sender = str(sender_peer_id) - # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message - if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(sender_peer_id, message) + for message in packet.publish: + if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): + await self.pubsub.handle_talk(sender_peer_id, message) - # Deliver to self and peers - for topic in message.topicIDs: - if topic in self.pubsub.peer_topics: - for peer_id_in_topic in self.pubsub.peer_topics[topic]: - # Forward to all known peers in the topic that are not the - # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, message.from_id): - stream = self.pubsub.peers[peer_id_in_topic] - await stream.write(packet.SerializeToString()) - else: - # Implies publish did not write - print("publish did not write") + + print ("OHOHOHOH") + print (self.pubsub.peer_topics) + print ("UUUJUJUJ") + print (self.pubsub.peers) + print ("********") + # Deliver to self and peers + for topic in message.topicIDs: + if topic in self.pubsub.peer_topics: + for peer_id_in_topic in self.pubsub.peer_topics[topic]: + # Forward to all known peers in the topic that are not the + # message sender and are not the message origin + print ("PEERID") + print (peer_id_in_topic) + if peer_id_in_topic not in (msg_sender, message.from_id): + stream = self.pubsub.peers[peer_id_in_topic] + await stream.write(packet.SerializeToString()) + else: + # Implies publish did not write + print("publish did not write") def join(self, topic): """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 976e6b4..7705dfb 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -80,31 +80,43 @@ class Pubsub(): """ # TODO check on types here - peer_id = stream.mplex_conn.peer_id + peer_id = str(stream.mplex_conn.peer_id) while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - print ("CONTINUOUSLY") + + print ("IN PUBSUB CONTINUOUSLY READ") print (rpc_incoming) + print ("###########################") + + should_publish = True + if rpc_incoming.publish: - # deal with "talk messages" - for msg in rpc_incoming.publish: - self.seen_messages.append(msg.seqno) - await self.handle_talk(peer_id, msg) - await self.router.publish(peer_id, msg) + # deal with RPC.publish + for message in rpc_incoming.publish: + self.seen_messages.append(message.seqno) + await self.handle_talk(peer_id, message) + if rpc_incoming.subscriptions: - # deal with "subscription messages" - subs_map = {} - for msg in rpc_incoming.subscriptions: - if msg.subscribe: - subs_map[msg.topicid] = "sub" - else: - subs_map[msg.topicid] = "unsub" + # deal with RPC.subscriptions + # We don't need to relay the subscription to our + # peers because a given node only needs its peers + # to know that it is subscribed to the topic (doesn't + # need everyone to know) + should_publish = False - self.handle_subscription(rpc_incoming) + # TODO check that peer_id is the same as origin_id + from_id = str(rpc_incoming.publish[0].from_id.decode('utf-8')) + for message in rpc_incoming.subscriptions: + if message.subscribe: + self.handle_subscription(from_id, message) + + if should_publish: + # relay message to peers with router + await self.router.publish(peer_id, incoming) # Force context switch await asyncio.sleep(0) @@ -136,7 +148,10 @@ class Pubsub(): pubsub protocols we support """ while True: + print ("PUBSUB HANDLE PEER QUEUE") peer_id = await self.peer_queue.get() + print (peer_id) + print ("++++++++++++++++++++++++") # Open a stream to peer on existing connection # (we know connection exists since that's the only way @@ -158,34 +173,30 @@ class Pubsub(): # Force context switch await asyncio.sleep(0) - def handle_subscription(self, rpc_message): + def handle_subscription(self, peer_id, sub_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message - :param subscription: raw data constituting a subscription message + :param origin_id: id of the peer who subscribe to the message + :param sub_message: RPC.SubOpts """ - for sub_msg in rpc_message.subscriptions: - - # Look at each subscription in the msg individually - if sub_msg.subscribe: - origin_id = rpc_message.publish[0].from_id - - if sub_msg.topicid not in self.peer_topics: - # Create topic list if it did not yet exist - self.peer_topics[sub_msg.topicid] = origin_id - elif orgin_id not in self.peer_topics[sub_msg.topicid]: - # Add peer to topic - self.peer_topics[sub_msg.topicid].append(origin_id) - else: - # TODO: Remove peer from topic - pass + # TODO verify logic here + if sub_message.subscribe: + if sub_message.topicid not in self.peer_topics: + self.peer_topics[sub_message.topicid] = [peer_id] + elif peer_id not in self.peer_topics[sub_message.topicid]: + # Add peer to topic + self.peer_topics[sub_message.topicid].append(peer_id) + else: + # TODO: Remove peer from topic + pass async def handle_talk(self, peer_id, publish_message): """ - Handle incoming Talk message from a peer. A Talk message contains some - custom message that is published on a given topic(s) - :param talk: raw data constituting a talk message + Put incoming message from a peer onto my blocking queue + :param peer_id: peer id whom forwarded this message + :param talk: RPC.Message format """ # Check if this message has any topics that we are subscribed to @@ -269,4 +280,4 @@ def generate_message_id(): Generate a unique message id :return: messgae id """ - return str(uuid.uuid1()) \ No newline at end of file + return str(uuid.uuid1())