From 93cf5a2c3229e2fe8bbadcecbea65181c9ea704d Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 24 Jul 2019 22:33:32 +0800 Subject: [PATCH] A roughly skeleton of `floodsub.publish` Still need to ensure when to deliver to ourselves --- libp2p/peer/id.py | 3 ++ libp2p/pubsub/floodsub.py | 93 +++++++++++++++++++++++++++------------ 2 files changed, 67 insertions(+), 29 deletions(-) diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 3097e1f..77e2f87 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -16,6 +16,9 @@ class ID: def __init__(self, id_str): self._id_str = id_str + def to_bytes(self) -> bytes: + return self._id_str + def get_raw_id(self): return self._id_str diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 659b672..f1eaa9c 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,3 +1,8 @@ +from typing import ( + Generator, + Sequence, +) + from libp2p.peer.id import ( ID, ) @@ -60,40 +65,70 @@ class FloodSub(IPubsubRouter): :param sender_peer_id: peer_id of message sender :param rpc_message: pubsub message in RPC string format """ + + peers_gen = self._get_peers_to_send( + pubsub_message.topicIDs, + from_peer_id=from_peer, + src_peer_id=ID(pubsub_message.from_id), + ) + rpc_msg = rpc_pb2.RPC( + publish=[pubsub_message], + ) + for peer_id in peers_gen: + stream = self.pubsub.peers[str(peer_id)] + await stream.write(rpc_msg.SerializeToString()) + # packet = rpc_pb2.RPC() # packet.ParseFromString(rpc_message) - from_peer_str = str(from_peer) - for topic in pubsub_message.topicIDs: + # from_peer_str = str(from_peer) + # for topic in pubsub_message.topicIDs: + # if topic not in self.pubsub.topics: + # continue + # peers = self.pubsub.peer_topics[topic] + # # Deliver to self if self was origin + # # Note: handle_talk checks if self is subscribed to topics in message + # for message in packet.publish: + # decoded_from_id = message.from_id.decode('utf-8') + # if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): + # id_in_seen_msgs = (message.seqno, message.from_id) + + # if id_in_seen_msgs not in self.pubsub.seen_messages: + # self.pubsub.seen_messages[id_in_seen_msgs] = 1 + + # await self.pubsub.handle_talk(message) + + # # Deliver to self and peers + # for topic in message.topicIDs: + # if topic in self.pubsub.peer_topics: + # for peer_id_in_topic in self.pubsub.peer_topics[topic]: + # # Forward to all known peers in the topic that are not the + # # message sender and are not the message origin + # if peer_id_in_topic not in (msg_sender, decoded_from_id): + # stream = self.pubsub.peers[peer_id_in_topic] + # # Create new packet with just publish message + # new_packet = rpc_pb2.RPC() + # new_packet.publish.extend([message]) + + # # Publish the packet + # await stream.write(new_packet.SerializeToString()) + + def _get_peers_to_send( + self, + topic_ids: Sequence[str], + from_peer_id: ID, + src_peer_id: ID) -> Generator[ID]: + # TODO: should send to self if `src_peer_id` is ourself? + for topic in topic_ids: if topic not in self.pubsub.topics: continue - peers = self.pubsub.peer_topics[topic] - # Deliver to self if self was origin - # Note: handle_talk checks if self is subscribed to topics in message - for message in packet.publish: - decoded_from_id = message.from_id.decode('utf-8') - if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): - id_in_seen_msgs = (message.seqno, message.from_id) - - if id_in_seen_msgs not in self.pubsub.seen_messages: - self.pubsub.seen_messages[id_in_seen_msgs] = 1 - - await self.pubsub.handle_talk(message) - - # Deliver to self and peers - for topic in message.topicIDs: - if topic in self.pubsub.peer_topics: - for peer_id_in_topic in self.pubsub.peer_topics[topic]: - # Forward to all known peers in the topic that are not the - # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, decoded_from_id): - stream = self.pubsub.peers[peer_id_in_topic] - # Create new packet with just publish message - new_packet = rpc_pb2.RPC() - new_packet.publish.extend([message]) - - # Publish the packet - await stream.write(new_packet.SerializeToString()) + for peer_id in self.pubsub.peer_topics[topic]: + if peer_id in (from_peer_id, src_peer_id): + continue + # FIXME: Should change `self.pubsub.peers` to Dict[PeerID, ...] + if str(peer_id) not in self.pubsub.peers: + continue + yield peer_id async def join(self, topic): """