diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 7d05991..e3d037e 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -195,6 +195,15 @@ class Swarm(INetwork): def add_router(self, router): self.router = router + # TODO: `tear_down` + async def tear_down(self) -> None: + # pylint: disable=line-too-long + # Reference: https://github.com/libp2p/go-libp2p-swarm/blob/8be680aef8dea0a4497283f2f98470c2aeae6b65/swarm.go#L118 # noqa: E501 + pass + + # TODO: `disconnect`? + + def create_generic_protocol_handler(swarm): """ Create a generic protocol handler from the given swarm. We use swarm diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 3097e1f..77e2f87 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -16,6 +16,9 @@ class ID: def __init__(self, id_str): self._id_str = id_str + def to_bytes(self) -> bytes: + return self._id_str + def get_raw_id(self): return self._id_str diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index c056214..0404021 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,5 +1,10 @@ +from typing import ( + Iterable, +) + from libp2p.peer.id import ( ID, + id_b58_decode, ) from .pb import rpc_pb2 @@ -46,7 +51,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, sender_peer_id: ID, rpc_message: rpc_pb2.Message) -> None: + async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens @@ -57,38 +62,23 @@ class FloodSub(IPubsubRouter): so that seen messages are not further forwarded. It also never forwards a message back to the source or the peer that forwarded the message. - :param sender_peer_id: peer_id of message sender - :param rpc_message: pubsub message in RPC string format + :param msg_forwarder: peer ID of the peer who forwards the message to us + :param pubsub_msg: pubsub message in protobuf. """ - packet = rpc_pb2.RPC() - packet.ParseFromString(rpc_message) - msg_sender = str(sender_peer_id) - # Deliver to self if self was origin - # Note: handle_talk checks if self is subscribed to topics in message - for message in packet.publish: - decoded_from_id = message.from_id.decode('utf-8') - if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): - id_in_seen_msgs = (message.seqno, message.from_id) - if id_in_seen_msgs not in self.pubsub.seen_messages: - self.pubsub.seen_messages[id_in_seen_msgs] = 1 - - await self.pubsub.handle_talk(message) - - # Deliver to self and peers - for topic in message.topicIDs: - if topic in self.pubsub.peer_topics: - for peer_id_in_topic in self.pubsub.peer_topics[topic]: - # Forward to all known peers in the topic that are not the - # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, decoded_from_id): - stream = self.pubsub.peers[peer_id_in_topic] - # Create new packet with just publish message - new_packet = rpc_pb2.RPC() - new_packet.publish.extend([message]) - - # Publish the packet - await stream.write(new_packet.SerializeToString()) + peers_gen = self._get_peers_to_send( + pubsub_msg.topicIDs, + msg_forwarder=msg_forwarder, + origin=ID(pubsub_msg.from_id), + ) + rpc_msg = rpc_pb2.RPC( + publish=[pubsub_msg], + ) + for peer_id in peers_gen: + stream = self.pubsub.peers[str(peer_id)] + # FIXME: We should add a `WriteMsg` similar to write delimited messages. + # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + await stream.write(rpc_msg.SerializeToString()) async def join(self, topic): """ @@ -104,3 +94,26 @@ class FloodSub(IPubsubRouter): It is invoked after the unsubscription announcement. :param topic: topic to leave """ + + def _get_peers_to_send( + self, + topic_ids: Iterable[str], + msg_forwarder: ID, + origin: ID) -> Iterable[ID]: + """ + Get the eligible peers to send the data to. + :param msg_forwarder: peer ID of the peer who forwards the message to us. + :param origin: peer id of the peer the message originate from. + :return: a generator of the peer ids who we send data to. + """ + for topic in topic_ids: + if topic not in self.pubsub.peer_topics: + continue + for peer_id_str in self.pubsub.peer_topics[topic]: + peer_id = id_b58_decode(peer_id_str) + if peer_id in (msg_forwarder, origin): + continue + # FIXME: Should change `self.pubsub.peers` to Dict[PeerID, ...] + if str(peer_id) not in self.pubsub.peers: + continue + yield peer_id diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 31ab606..0af5e5f 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -1,10 +1,22 @@ -import random import asyncio +import random +from typing import ( + Iterable, + List, + MutableSet, + Sequence, +) from ast import literal_eval + +from libp2p.peer.id import ( + ID, + id_b58_decode, +) + +from .mcache import MessageCache from .pb import rpc_pb2 from .pubsub_router_interface import IPubsubRouter -from .mcache import MessageCache class GossipSub(IPubsubRouter): @@ -20,8 +32,8 @@ class GossipSub(IPubsubRouter): # Store target degree, upper degree bound, and lower degree bound self.degree = degree - self.degree_high = degree_high self.degree_low = degree_low + self.degree_high = degree_high # Store time to live (for topics in fanout) self.time_to_live = time_to_live @@ -91,6 +103,7 @@ class GossipSub(IPubsubRouter): :param rpc: rpc message """ control_message = rpc.control + sender_peer_id = str(sender_peer_id) # Relay each rpc control to the appropriate handler if control_message.ihave: @@ -106,70 +119,77 @@ class GossipSub(IPubsubRouter): for prune in control_message.prune: await self.handle_prune(prune, sender_peer_id) - async def publish(self, sender_peer_id, rpc_message): + async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: # pylint: disable=too-many-locals """ Invoked to forward a new message that has been validated. """ + self.mcache.put(pubsub_msg) - packet = rpc_pb2.RPC() - packet.ParseFromString(rpc_message) - msg_sender = str(sender_peer_id) + peers_gen = self._get_peers_to_send( + pubsub_msg.topicIDs, + msg_forwarder=msg_forwarder, + origin=ID(pubsub_msg.from_id), + ) + rpc_msg = rpc_pb2.RPC( + publish=[pubsub_msg], + ) + for peer_id in peers_gen: + stream = self.pubsub.peers[str(peer_id)] + # FIXME: We should add a `WriteMsg` similar to write delimited messages. + # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 + # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. + await stream.write(rpc_msg.SerializeToString()) - # Deliver to self if self was origin - # Note: handle_talk checks if self is subscribed to topics in message - for message in packet.publish: - # Add RPC message to cache - self.mcache.put(message) + def _get_peers_to_send( + self, + topic_ids: Iterable[str], + msg_forwarder: ID, + origin: ID) -> Iterable[ID]: + """ + Get the eligible peers to send the data to. + :param msg_forwarder: the peer id of the peer who forwards the message to me. + :param origin: peer id of the peer the message originate from. + :return: a generator of the peer ids who we send data to. + """ + send_to: MutableSet[ID] = set() + for topic in topic_ids: + if topic not in self.pubsub.peer_topics: + continue - decoded_from_id = message.from_id.decode('utf-8') - new_packet = rpc_pb2.RPC() - new_packet.publish.extend([message]) - new_packet_serialized = new_packet.SerializeToString() + # floodsub peers + for peer_id_str in self.pubsub.peer_topics[topic]: + # FIXME: `gossipsub.peers_floodsub` can be changed to `gossipsub.peers` in go. + # This will improve the efficiency when searching for a peer's protocol id. + if peer_id_str in self.peers_floodsub: + peer_id = id_b58_decode(peer_id_str) + send_to.add(peer_id) - # Deliver to self if needed - if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): - id_in_seen_msgs = (message.seqno, message.from_id) + # gossipsub peers + # FIXME: Change `str` to `ID` + in_topic_gossipsub_peers: List[str] = None + # TODO: Do we need to check `topic in self.pubsub.my_topics`? + if topic in self.mesh: + in_topic_gossipsub_peers = self.mesh[topic] + else: + # TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not + # subscribed? + # I assume there could be short periods between heartbeats where topic may not + # be but we should check that this path gets hit appropriately - if id_in_seen_msgs not in self.pubsub.seen_messages: - self.pubsub.seen_messages[id_in_seen_msgs] = 1 - - await self.pubsub.handle_talk(message) - - # Deliver to peers - for topic in message.topicIDs: - # If topic has floodsub peers, deliver to floodsub peers - # TODO: This can be done more efficiently. Do it more efficiently. - floodsub_peers_in_topic = [] - if topic in self.pubsub.peer_topics: - for peer in self.pubsub.peer_topics[topic]: - if str(peer) in self.peers_floodsub: - floodsub_peers_in_topic.append(peer) - - await self.deliver_messages_to_peers(floodsub_peers_in_topic, msg_sender, - decoded_from_id, new_packet_serialized) - - # If you are subscribed to topic, send to mesh, otherwise send to fanout - if topic in self.pubsub.my_topics and topic in self.mesh: - await self.deliver_messages_to_peers(self.mesh[topic], msg_sender, - decoded_from_id, new_packet_serialized) - else: - # Send to fanout peers - if topic not in self.fanout: - # If no peers in fanout, choose some peers from gossipsub peers in topic - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - - selected = \ - GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) - self.fanout[topic] = selected - - # TODO: Is topic DEFINITELY supposed to be in fanout if we are not subscribed? - # I assume there could be short periods between heartbeats where topic may not - # be but we should check that this path gets hit appropriately - - await self.deliver_messages_to_peers(self.fanout[topic], msg_sender, - decoded_from_id, new_packet_serialized) + # pylint: disable=len-as-condition + if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): + # If no peers in fanout, choose some peers from gossipsub peers in topic. + self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree, + [], + ) + in_topic_gossipsub_peers = self.fanout[topic] + for peer_id_str in in_topic_gossipsub_peers: + send_to.add(id_b58_decode(peer_id_str)) + # Excludes `msg_forwarder` and `origin` + yield from send_to.difference([msg_forwarder, origin]) async def join(self, topic): # Note: the comments here are the near-exact algorithm description from the spec @@ -192,13 +212,11 @@ class GossipSub(IPubsubRouter): # in the fanout for a topic (or the topic is not in the fanout). # Selects the remaining number of peers (D-x) from peers.gossipsub[topic]. if topic in self.pubsub.peer_topics: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - selected_peers = \ - GossipSub.select_from_minus(self.degree - fanout_size, - gossipsub_peers_in_topic, - fanout_peers) - + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree - fanout_size, + fanout_peers, + ) # Combine fanout peers with selected peers fanout_peers += selected_peers @@ -272,14 +290,11 @@ class GossipSub(IPubsubRouter): num_mesh_peers_in_topic = len(self.mesh[topic]) if num_mesh_peers_in_topic < self.degree_low: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - # Select D - |mesh[topic]| peers from peers.gossipsub[topic] - mesh[topic] - selected_peers = GossipSub.select_from_minus( + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - num_mesh_peers_in_topic, - gossipsub_peers_in_topic, - self.mesh[topic] + self.mesh[topic], ) fanout_peers_not_in_mesh = [ @@ -320,12 +335,11 @@ class GossipSub(IPubsubRouter): # If |fanout[topic]| < D if num_fanout_peers_in_topic < self.degree: # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - selected_peers = \ - GossipSub.select_from_minus(self.degree - num_fanout_peers_in_topic, - gossipsub_peers_in_topic, self.fanout[topic]) - + selected_peers = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree - num_fanout_peers_in_topic, + self.fanout[topic], + ) # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) @@ -337,12 +351,12 @@ class GossipSub(IPubsubRouter): # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in a topic and only add them if they are gossipsub peers too if topic in self.pubsub.peer_topics: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = \ - GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree, + [], + ) for peer in peers_to_emit_ihave_to: # TODO: this line is a monster, can hopefully be simplified @@ -351,6 +365,7 @@ class GossipSub(IPubsubRouter): msg_ids = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_ids, peer) + # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh for topic in self.fanout: if topic not in self.mesh: @@ -359,12 +374,12 @@ class GossipSub(IPubsubRouter): # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in topic and only add if they are gossipsub peers also if topic in self.pubsub.peer_topics: - gossipsub_peers_in_topic = [peer for peer in self.pubsub.peer_topics[topic] - if peer in self.peers_gossipsub] - # Select D peers from peers.gossipsub[topic] - peers_to_emit_ihave_to = \ - GossipSub.select_from_minus(self.degree, gossipsub_peers_in_topic, []) + peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus( + topic, + self.degree, + [], + ) for peer in peers_to_emit_ihave_to: if peer not in self.mesh[topic] and peer not in self.fanout[topic]: @@ -400,6 +415,22 @@ class GossipSub(IPubsubRouter): return selection + def _get_in_topic_gossipsub_peers_from_minus( + self, + topic: str, + num_to_select: int, + minus: Sequence[ID]) -> List[ID]: + gossipsub_peers_in_topic = [ + peer_str + for peer_str in self.pubsub.peer_topics[topic] + if peer_str in self.peers_gossipsub + ] + return self.select_from_minus( + num_to_select, + gossipsub_peers_in_topic, + list(minus), + ) + # RPC handlers async def handle_ihave(self, ihave_msg, sender_peer_id): diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7afdc2a..2af4e24 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -45,7 +45,8 @@ class Pubsub: outgoing_messages: asyncio.Queue() seen_messages: LRU my_topics: Dict[str, asyncio.Queue] - peer_topics: Dict[str, List[ID]] + # FIXME: Should be changed to `Dict[str, List[ID]]` + peer_topics: Dict[str, List[str]] # FIXME: Should be changed to `Dict[ID, INetStream]` peers: Dict[str, INetStream] # NOTE: Be sure it is increased atomically everytime. @@ -127,26 +128,21 @@ class Pubsub: messages from other nodes :param stream: stream to continously read from """ - - # TODO check on types here - peer_id = str(stream.mplex_conn.peer_id) + peer_id = stream.mplex_conn.peer_id while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - should_publish = False - if rpc_incoming.publish: # deal with RPC.publish - for message in rpc_incoming.publish: - id_in_seen_msgs = (message.seqno, message.from_id) - if id_in_seen_msgs not in self.seen_messages: - should_publish = True - self.seen_messages[id_in_seen_msgs] = 1 - - await self.handle_talk(message) + for msg in rpc_incoming.publish: + if not self._is_subscribed_to_msg(msg): + continue + # TODO(mhchia): This will block this read_stream loop until all data are pushed. + # Should investigate further if this is an issue. + await self.push_msg(msg_forwarder=peer_id, msg=msg) if rpc_incoming.subscriptions: # deal with RPC.subscriptions @@ -157,10 +153,6 @@ class Pubsub: for message in rpc_incoming.subscriptions: self.handle_subscription(peer_id, message) - if should_publish: - # relay message to peers with router - await self.router.publish(peer_id, incoming) - if rpc_incoming.control: # Pass rpc to router so router could perform custom logic await self.router.handle_rpc(rpc_incoming, peer_id) @@ -227,6 +219,7 @@ class Pubsub: :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ + origin_id = str(origin_id) if sub_message.subscribe: if sub_message.topicid not in self.peer_topics: self.peer_topics[sub_message.topicid] = [origin_id] @@ -319,3 +312,64 @@ class Pubsub: for _, stream in self.peers.items(): # Write message to stream await stream.write(rpc_msg) + + async def publish(self, topic_id: str, data: bytes) -> None: + """ + Publish data to a topic + :param topic_id: topic which we are going to publish the data to + :param data: data which we are publishing + """ + msg = rpc_pb2.Message( + data=data, + topicIDs=[topic_id], + # Origin is ourself. + from_id=self.host.get_id().to_bytes(), + seqno=self._next_seqno(), + ) + + # TODO: Sign with our signing key + + await self.push_msg(self.host.get_id(), msg) + + async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None: + """ + Push a pubsub message to others. + :param msg_forwarder: the peer who forward us the message. + :param msg: the message we are going to push out. + """ + # TODO: - Check if the `source` is in the blacklist. If yes, reject. + + # TODO: - Check if the `from` is in the blacklist. If yes, reject. + + # TODO: - Check if signing is required and if so signature should be attached. + + if self._is_msg_seen(msg): + return + + # TODO: - Validate the message. If failed, reject it. + + self._mark_msg_seen(msg) + await self.handle_talk(msg) + await self.router.publish(msg_forwarder, msg) + + def _next_seqno(self) -> bytes: + """ + Make the next message sequence id. + """ + self.counter += 1 + return self.counter.to_bytes(8, 'big') + + def _is_msg_seen(self, msg: rpc_pb2.Message) -> bool: + msg_id = get_msg_id(msg) + return msg_id in self.seen_messages + + def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None: + msg_id = get_msg_id(msg) + # FIXME: Mapping `msg_id` to `1` is quite awkward. Should investigate if there is a + # more appropriate way. + self.seen_messages[msg_id] = 1 + + def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: + if len(self.my_topics) == 0: + return False + return all([topic in self.my_topics for topic in msg.topicIDs]) diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index e581570..8819e5f 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -42,11 +42,11 @@ class IPubsubRouter(ABC): """ @abstractmethod - def publish(self, sender_peer_id, rpc_message): + async def publish(self, msg_forwarder, pubsub_msg): """ Invoked to forward a new message that has been validated - :param sender_peer_id: peer_id of message sender - :param rpc_message: message to forward + :param msg_forwarder: peer_id of message sender + :param pubsub_msg: pubsub message to forward """ @abstractmethod diff --git a/tests/pubsub/__init__.py b/tests/pubsub/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py new file mode 100644 index 0000000..99295e4 --- /dev/null +++ b/tests/pubsub/configs.py @@ -0,0 +1,7 @@ +import multiaddr + + +FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" +GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" + +LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index a640997..42d17ca 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,13 +1,18 @@ import asyncio -import multiaddr import uuid -from utils import message_id_generator, generate_RPC_packet -from libp2p import new_node -from libp2p.pubsub.pubsub import Pubsub -from libp2p.pubsub.floodsub import FloodSub +import multiaddr -SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +from libp2p import new_node +from libp2p.host.host_interface import IHost +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pubsub import Pubsub + +from .configs import FLOODSUB_PROTOCOL_ID +from .utils import message_id_generator + + +SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] CRYPTO_TOPIC = "ethereum" # Message format: @@ -17,14 +22,25 @@ CRYPTO_TOPIC = "ethereum" # Ex. set,rob,5 # Determine message type by looking at first item before first comma -class DummyAccountNode(): + +class DummyAccountNode: """ - Node which has an internal balance mapping, meant to serve as + Node which has an internal balance mapping, meant to serve as a dummy crypto blockchain. There is no actual blockchain, just a simple map indicating how much crypto each user in the mappings holds """ + libp2p_node: IHost + pubsub: Pubsub + floodsub: FloodSub - def __init__(self): + def __init__( + self, + libp2p_node: IHost, + pubsub: Pubsub, + floodsub: FloodSub): + self.libp2p_node = libp2p_node + self.pubsub = pubsub + self.floodsub = floodsub self.balances = {} self.next_msg_id_func = message_id_generator(0) self.node_id = str(uuid.uuid1()) @@ -38,16 +54,21 @@ class DummyAccountNode(): We use create as this serves as a factory function and allows us to use async await, unlike the init function """ - self = DummyAccountNode() libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - self.libp2p_node = libp2p_node - - self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) - self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") - return self + floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) + pubsub = Pubsub( + libp2p_node, + floodsub, + "a", + ) + return cls( + libp2p_node=libp2p_node, + pubsub=pubsub, + floodsub=floodsub, + ) async def handle_incoming_msgs(self): """ @@ -78,10 +99,8 @@ class DummyAccountNode(): :param dest_user: user to send crypto to :param amount: amount of crypto to send """ - my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) - await self.floodsub.publish(my_id, packet.SerializeToString()) + await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode()) async def publish_set_crypto(self, user, amount): """ @@ -89,18 +108,15 @@ class DummyAccountNode(): :param user: user to set crypto for :param amount: amount of crypto """ - my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) - - await self.floodsub.publish(my_id, packet.SerializeToString()) + await self.pubsub.publish(CRYPTO_TOPIC, msg_contents.encode()) def handle_send_crypto(self, source_user, dest_user, amount): """ Handle incoming send_crypto message :param source_user: user to send crypto from :param dest_user: user to send crypto to - :param amount: amount of crypto to send + :param amount: amount of crypto to send """ if source_user in self.balances: self.balances[source_user] -= amount @@ -130,4 +146,3 @@ class DummyAccountNode(): return self.balances[user] else: return -1 - diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py new file mode 100644 index 0000000..77d96a7 --- /dev/null +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -0,0 +1,448 @@ +import asyncio + +import pytest + +from libp2p import new_node +from libp2p.peer.id import ID +from libp2p.pubsub.pubsub import Pubsub + +from tests.utils import ( + cleanup, + connect, +) + +from .configs import ( + FLOODSUB_PROTOCOL_ID, + LISTEN_MADDR, +) + + +SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] + +FLOODSUB_PROTOCOL_TEST_CASES = [ + { + "name": "simple_two_nodes", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"foo", + "node_id": "A" + } + ] + }, + { + "name": "three_nodes_two_topics", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"], + "B": ["C"], + }, + "topic_map": { + "topic1": ["B", "C"], + "topic2": ["B", "C"], + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"foo", + "node_id": "A", + }, + { + "topics": ["topic2"], + "data": b"Alex is tall", + "node_id": "A", + } + ] + }, + { + "name": "two_nodes_one_topic_single_subscriber_is_sender", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"], + }, + "topic_map": { + "topic1": ["B"], + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"Alex is tall", + "node_id": "B", + } + ] + }, + { + "name": "two_nodes_one_topic_two_msgs", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"], + }, + "topic_map": { + "topic1": ["B"], + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"Alex is tall", + "node_id": "B", + }, + { + "topics": ["topic1"], + "data": b"foo", + "node_id": "A", + } + ] + }, + { + "name": "seven_nodes_tree_one_topics", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"], + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + } + ] + }, + { + "name": "seven_nodes_tree_three_topics", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"], + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"], + "space": ["2", "3", "4", "5", "6", "7"], + "onions": ["2", "3", "4", "5", "6", "7"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["space"], + "data": b"foobar", + "node_id": "1", + }, + { + "topics": ["onions"], + "data": b"I am allergic", + "node_id": "1", + } + ] + }, + { + "name": "seven_nodes_tree_three_topics_diff_origin", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], + "space": ["1", "2", "3", "4", "5", "6", "7"], + "onions": ["1", "2", "3", "4", "5", "6", "7"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["space"], + "data": b"foobar", + "node_id": "4", + }, + { + "topics": ["onions"], + "data": b"I am allergic", + "node_id": "7", + } + ] + }, + { + "name": "three_nodes_clique_two_topic_diff_origin", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["3"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3"], + "school": ["1", "2", "3"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic", + "node_id": "1", + } + ] + }, + { + "name": "four_nodes_clique_two_topic_diff_origin_many_msgs", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3", "4"], + "2": ["1", "3", "4"], + "3": ["1", "2", "4"], + "4": ["1", "2", "3"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4"], + "school": ["1", "2", "3", "4"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar2", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar3", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic3", + "node_id": "1", + } + ] + }, + { + "name": "five_nodes_ring_two_topic_diff_origin_many_msgs", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2"], + "2": ["3"], + "3": ["4"], + "4": ["5"], + "5": ["1"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5"], + "school": ["1", "2", "3", "4", "5"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar2", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar3", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic3", + "node_id": "1", + } + ] + } +] + +# pylint: disable=invalid-name +floodsub_protocol_pytest_params = [ + pytest.param(test_case, id=test_case["name"]) + for test_case in FLOODSUB_PROTOCOL_TEST_CASES +] + + +# pylint: disable=too-many-locals +async def perform_test_from_obj(obj, router_factory): + """ + Perform pubsub tests from a test obj. + test obj are composed as follows: + + { + "supported_protocols": ["supported/protocol/1.0.0",...], + "adj_list": { + "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], + "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], + ... + }, + "topic_map": { + "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] + }, + "messages": [ + { + "topics": ["topic1_for_message", "topic2_for_message", ...], + "data": b"some contents of the message (newlines are not supported)", + "node_id": "message sender node id" + }, + ... + ] + } + NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A + or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior + is undefined (even if it may work) + """ + + # Step 1) Create graph + adj_list = obj["adj_list"] + node_map = {} + pubsub_map = {} + + async def add_node(node_id: str) -> None: + node = await new_node(transport_opt=[str(LISTEN_MADDR)]) + await node.get_network().listen(LISTEN_MADDR) + node_map[node_id] = node + pubsub_router = router_factory(protocols=obj["supported_protocols"]) + pubsub = Pubsub(node, pubsub_router, ID(node_id.encode())) + pubsub_map[node_id] = pubsub + + tasks_connect = [] + for start_node_id in adj_list: + # Create node if node does not yet exist + if start_node_id not in node_map: + await add_node(start_node_id) + + # For each neighbor of start_node, create if does not yet exist, + # then connect start_node to neighbor + for neighbor_id in adj_list[start_node_id]: + # Create neighbor if neighbor does not yet exist + if neighbor_id not in node_map: + await add_node(neighbor_id) + tasks_connect.append( + connect(node_map[start_node_id], node_map[neighbor_id]) + ) + # Connect nodes and wait at least for 2 seconds + await asyncio.gather(*tasks_connect, asyncio.sleep(2)) + + # Step 2) Subscribe to topics + queues_map = {} + topic_map = obj["topic_map"] + + tasks_topic = [] + tasks_topic_data = [] + for topic, node_ids in topic_map.items(): + for node_id in node_ids: + tasks_topic.append(pubsub_map[node_id].subscribe(topic)) + tasks_topic_data.append((node_id, topic)) + tasks_topic.append(asyncio.sleep(2)) + + # Gather is like Promise.all + responses = await asyncio.gather(*tasks_topic, return_exceptions=True) + for i in range(len(responses) - 1): + node_id, topic = tasks_topic_data[i] + if node_id not in queues_map: + queues_map[node_id] = {} + # Store queue in topic-queue map for node + queues_map[node_id][topic] = responses[i] + + # Allow time for subscribing before continuing + await asyncio.sleep(0.01) + + # Step 3) Publish messages + topics_in_msgs_ordered = [] + messages = obj["messages"] + tasks_publish = [] + + for msg in messages: + topics = msg["topics"] + data = msg["data"] + node_id = msg["node_id"] + + # Publish message + # TODO: Should be single RPC package with several topics + for topic in topics: + tasks_publish.append( + pubsub_map[node_id].publish( + topic, + data, + ) + ) + + # For each topic in topics, add (topic, node_id, data) tuple to ordered test list + for topic in topics: + topics_in_msgs_ordered.append((topic, node_id, data)) + + # Allow time for publishing before continuing + await asyncio.gather(*tasks_publish, asyncio.sleep(2)) + + # Step 4) Check that all messages were received correctly. + for topic, origin_node_id, data in topics_in_msgs_ordered: + # Look at each node in each topic + for node_id in topic_map[topic]: + # Get message from subscription queue + msg = await queues_map[node_id][topic].get() + assert data == msg.data + # Check the message origin + assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id + + # Success, terminate pending tasks. + await cleanup() diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index 9fa2aa7..1efa579 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -1,35 +1,31 @@ import asyncio -import multiaddr +from threading import Thread + import pytest -from threading import Thread -from tests.utils import cleanup -from libp2p import new_node -from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.pubsub.pubsub import Pubsub -from libp2p.pubsub.floodsub import FloodSub -from dummy_account_node import DummyAccountNode +from tests.utils import ( + cleanup, + connect, +) + +from .dummy_account_node import DummyAccountNode # pylint: disable=too-many-locals -async def connect(node1, node2): - # node1 connects to node2 - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) def create_setup_in_new_thread_func(dummy_node): def setup_in_new_thread(): asyncio.ensure_future(dummy_node.setup_crypto_networking()) return setup_in_new_thread + async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): """ Helper function to allow for easy construction of custom tests for dummy account nodes in various network topologies :param num_nodes: number of nodes in the test :param adjacency_map: adjacency map defining each node and its list of neighbors - :param action_func: function to execute that includes actions by the nodes, + :param action_func: function to execute that includes actions by the nodes, such as send crypto and set crypto :param assertion_func: assertions for testing the results of the actions are correct """ @@ -73,6 +69,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_simple_two_nodes(): num_nodes = 2 @@ -86,6 +83,7 @@ async def test_simple_two_nodes(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_simple_three_nodes_line_topography(): num_nodes = 3 @@ -99,6 +97,7 @@ async def test_simple_three_nodes_line_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_simple_three_nodes_triangle_topography(): num_nodes = 3 @@ -112,6 +111,7 @@ async def test_simple_three_nodes_triangle_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_simple_seven_nodes_tree_topography(): num_nodes = 7 @@ -125,6 +125,7 @@ async def test_simple_seven_nodes_tree_topography(): await perform_test(num_nodes, adj_map, action_func, assertion_func) + @pytest.mark.asyncio async def test_set_then_send_from_root_seven_nodes_tree_topography(): num_nodes = 7 diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 2c67185..c3d3e24 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -2,615 +2,132 @@ import asyncio import multiaddr import pytest -from tests.utils import cleanup from libp2p import new_node -from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.pubsub.pb import rpc_pb2 -from libp2p.pubsub.pubsub import Pubsub +from libp2p.peer.id import ID from libp2p.pubsub.floodsub import FloodSub -from utils import message_id_generator, generate_RPC_packet +from libp2p.pubsub.pubsub import Pubsub +from tests.utils import ( + cleanup, + connect, +) + +from .configs import ( + FLOODSUB_PROTOCOL_ID, + LISTEN_MADDR, +) +from .floodsub_integration_test_settings import ( + perform_test_from_obj, + floodsub_protocol_pytest_params, +) + + +SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] # pylint: disable=too-many-locals -async def connect(node1, node2): - """ - Connect node1 to node2 - """ - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) @pytest.mark.asyncio async def test_simple_two_nodes(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_a = await new_node(transport_opt=[str(LISTEN_MADDR)]) + node_b = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node_a.get_network().listen(LISTEN_MADDR) + await node_b.get_network().listen(LISTEN_MADDR) - supported_protocols = ["/floodsub/1.0.0"] + supported_protocols = [FLOODSUB_PROTOCOL_ID] + topic = "my_topic" + data = b"some data" floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, "a") + pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32)) floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, "b") + pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32)) await connect(node_a, node_b) - - await asyncio.sleep(0.25) - qb = await pubsub_b.subscribe("my_topic") - await asyncio.sleep(0.25) - node_a_id = str(node_a.get_id()) - - next_msg_id_func = message_id_generator(0) - msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", next_msg_id_func()) - await floodsub_a.publish(node_a_id, msg.SerializeToString()) + sub_b = await pubsub_b.subscribe(topic) + # Sleep to let a know of b's subscription await asyncio.sleep(0.25) - res_b = await qb.get() + await pubsub_a.publish(topic, data) + + res_b = await sub_b.get() # Check that the msg received by node_b is the same # as the message sent by node_a - assert res_b.SerializeToString() == msg.publish[0].SerializeToString() + assert ID(res_b.from_id) == node_a.get_id() + assert res_b.data == data + assert res_b.topicIDs == [topic] # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio -async def test_lru_cache_two_nodes(): +async def test_lru_cache_two_nodes(monkeypatch): # two nodes with cache_size of 4 - # node_a send the following messages to node_b - # [1, 1, 2, 1, 3, 1, 4, 1, 5, 1] - # node_b should only receive the following - # [1, 2, 3, 4, 5, 1] - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + # `node_a` send the following messages to node_b + message_indices = [1, 1, 2, 1, 3, 1, 4, 1, 5, 1] + # `node_b` should only receive the following + expected_received_indices = [1, 2, 3, 4, 5, 1] - await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + node_a = await new_node(transport_opt=[str(LISTEN_MADDR)]) + node_b = await new_node(transport_opt=[str(LISTEN_MADDR)]) - supported_protocols = ["/floodsub/1.0.0"] + await node_a.get_network().listen(LISTEN_MADDR) + await node_b.get_network().listen(LISTEN_MADDR) - # initialize PubSub with a cache_size of 4 + supported_protocols = SUPPORTED_PROTOCOLS + topic = "my_topic" + + # Mock `get_msg_id` to make us easier to manipulate `msg_id` by `data`. + def get_msg_id(msg): + # Originally it is `(msg.seqno, msg.from_id)` + return (msg.data, msg.from_id) + import libp2p.pubsub.pubsub + monkeypatch.setattr(libp2p.pubsub.pubsub, "get_msg_id", get_msg_id) + + # Initialize Pubsub with a cache_size of 4 + cache_size = 4 floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, "a", 4) + pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32), cache_size) + floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, "b", 4) + pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32), cache_size) await connect(node_a, node_b) - - await asyncio.sleep(0.25) - qb = await pubsub_b.subscribe("my_topic") - await asyncio.sleep(0.25) - node_a_id = str(node_a.get_id()) - - # initialize message_id_generator - # store first message - next_msg_id_func = message_id_generator(0) - first_message = generate_RPC_packet(node_a_id, ["my_topic"], "some data 1", next_msg_id_func()) - - await floodsub_a.publish(node_a_id, first_message.SerializeToString()) - await asyncio.sleep(0.25) - print (first_message) - - messages = [first_message] - # for the next 5 messages - for i in range(2, 6): - # write first message - await floodsub_a.publish(node_a_id, first_message.SerializeToString()) - await asyncio.sleep(0.25) - - # generate and write next message - msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data " + str(i), next_msg_id_func()) - messages.append(msg) - - await floodsub_a.publish(node_a_id, msg.SerializeToString()) - await asyncio.sleep(0.25) - - # write first message again - await floodsub_a.publish(node_a_id, first_message.SerializeToString()) + sub_b = await pubsub_b.subscribe(topic) await asyncio.sleep(0.25) - # check the first five messages in queue - # should only see 1 first_message - for i in range(5): - # Check that the msg received by node_b is the same - # as the message sent by node_a - res_b = await qb.get() - assert res_b.SerializeToString() == messages[i].publish[0].SerializeToString() + def _make_testing_data(i: int) -> bytes: + num_int_bytes = 4 + if i >= 2**(num_int_bytes * 8): + raise ValueError("integer is too large to be serialized") + return b"data" + i.to_bytes(num_int_bytes, "big") + + for index in message_indices: + await pubsub_a.publish(topic, _make_testing_data(index)) + await asyncio.sleep(0.25) + + for index in expected_received_indices: + res_b = await sub_b.get() + assert res_b.data == _make_testing_data(index) + assert sub_b.empty() - # the 6th message should be first_message - res_b = await qb.get() - assert res_b.SerializeToString() == first_message.publish[0].SerializeToString() - assert qb.empty() - # Success, terminate pending tasks. await cleanup() -async def perform_test_from_obj(obj): - """ - Perform a floodsub test from a test obj. - test obj are composed as follows: - - { - "supported_protocols": ["supported/protocol/1.0.0",...], - "adj_list": { - "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], - "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], - ... - }, - "topic_map": { - "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] - }, - "messages": [ - { - "topics": ["topic1_for_message", "topic2_for_message", ...], - "data": "some contents of the message (newlines are not supported)", - "node_id": "message sender node id" - }, - ... - ] - } - NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A - or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior - is undefined (even if it may work) - """ - - # Step 1) Create graph - adj_list = obj["adj_list"] - node_map = {} - floodsub_map = {} - pubsub_map = {} - - supported_protocols = obj["supported_protocols"] - - tasks_connect = [] - for start_node_id in adj_list: - # Create node if node does not yet exist - if start_node_id not in node_map: - node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[start_node_id] = node - - floodsub = FloodSub(supported_protocols) - floodsub_map[start_node_id] = floodsub - pubsub = Pubsub(node, floodsub, start_node_id) - pubsub_map[start_node_id] = pubsub - - # For each neighbor of start_node, create if does not yet exist, - # then connect start_node to neighbor - for neighbor_id in adj_list[start_node_id]: - # Create neighbor if neighbor does not yet exist - if neighbor_id not in node_map: - neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[neighbor_id] = neighbor_node - - floodsub = FloodSub(supported_protocols) - floodsub_map[neighbor_id] = floodsub - pubsub = Pubsub(neighbor_node, floodsub, neighbor_id) - pubsub_map[neighbor_id] = pubsub - - # Connect node and neighbor - # await connect(node_map[start_node_id], node_map[neighbor_id]) - tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id]))) - tasks_connect.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_connect) - - # Allow time for graph creation before continuing - # await asyncio.sleep(0.25) - - # Step 2) Subscribe to topics - queues_map = {} - topic_map = obj["topic_map"] - - tasks_topic = [] - tasks_topic_data = [] - for topic in topic_map: - for node_id in topic_map[topic]: - """ - # Subscribe node to topic - q = await pubsub_map[node_id].subscribe(topic) - - # Create topic-queue map for node_id if one does not yet exist - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - """ - tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic))) - tasks_topic_data.append((node_id, topic)) - tasks_topic.append(asyncio.sleep(2)) - - # Gather is like Promise.all - responses = await asyncio.gather(*tasks_topic, return_exceptions=True) - for i in range(len(responses) - 1): - q = responses[i] - node_id, topic = tasks_topic_data[i] - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - - # Allow time for subscribing before continuing - # await asyncio.sleep(0.01) - - # Step 3) Publish messages - topics_in_msgs_ordered = [] - messages = obj["messages"] - tasks_publish = [] - next_msg_id_func = message_id_generator(0) - - for msg in messages: - topics = msg["topics"] - - data = msg["data"] - node_id = msg["node_id"] - - # Get actual id for sender node (not the id from the test obj) - actual_node_id = str(node_map[node_id].get_id()) - - # Create correctly formatted message - msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) - - # Publish message - # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) - tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\ - actual_node_id, msg_talk.SerializeToString()))) - - # For each topic in topics, add topic, msg_talk tuple to ordered test list - # TODO: Update message sender to be correct message sender before - # adding msg_talk to this list - for topic in topics: - topics_in_msgs_ordered.append((topic, msg_talk)) - - # Allow time for publishing before continuing - # await asyncio.sleep(0.4) - tasks_publish.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_publish) - - # Step 4) Check that all messages were received correctly. - # TODO: Check message sender too - for i in range(len(topics_in_msgs_ordered)): - topic, actual_msg = topics_in_msgs_ordered[i] - - # Look at each node in each topic - for node_id in topic_map[topic]: - # Get message from subscription queue - msg_on_node_str = await queues_map[node_id][topic].get() - assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() - - # Success, terminate pending tasks. - await cleanup() - +@pytest.mark.parametrize( + "test_case_obj", + floodsub_protocol_pytest_params, +) @pytest.mark.asyncio -async def test_simple_two_nodes_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_two_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"], - "B": ["C"] - }, - "topic_map": { - "topic1": ["B", "C"], - "topic2": ["B", "C"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - }, - { - "topics": ["topic2"], - "data": "Alex is tall", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_two_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" - }, - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_one_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"], - "space": ["2", "3", "4", "5", "6", "7"], - "onions": ["2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["space"], - "data": "foobar", - "node_id": "1" - }, - { - "topics": ["onions"], - "data": "I am allergic", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], - "space": ["1", "2", "3", "4", "5", "6", "7"], - "onions": ["1", "2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["space"], - "data": "foobar", - "node_id": "4" - }, - { - "topics": ["onions"], - "data": "I am allergic", - "node_id": "7" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["3"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3"], - "school": ["1", "2", "3"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3", "4"], - "2": ["1", "3", "4"], - "3": ["1", "2", "4"], - "4": ["1", "2", "3"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4"], - "school": ["1", "2", "3", "4"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar2", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar3", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2"], - "2": ["3"], - "3": ["4"], - "4": ["5"], - "5": ["1"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5"], - "school": ["1", "2", "3", "4", "5"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar2", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar3", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) +async def test_gossipsub_run_with_floodsub_tests(test_case_obj): + await perform_test_from_obj( + test_case_obj, + FloodSub, + ) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index bb47135..e0fed92 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -1,13 +1,23 @@ import asyncio -import pytest import random -from utils import message_id_generator, generate_RPC_packet, \ - create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \ - connect, one_to_all_connect -from tests.utils import cleanup +import pytest -SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] +from tests.utils import ( + cleanup, + connect, +) + +from .configs import GOSSIPSUB_PROTOCOL_ID +from .utils import ( + create_libp2p_hosts, + create_pubsub_and_gossipsub_instances, + dense_connect, + one_to_all_connect, +) + + +SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID] @pytest.mark.asyncio @@ -41,13 +51,8 @@ async def test_join(): # Central node publish to the topic so that this topic # is added to central node's fanout - next_msg_id_func = message_id_generator(0) - msg_content = "" - host_id = str(libp2p_hosts[central_node_index].get_id()) - # Generate message packet - packet = generate_RPC_packet(host_id, [topic], msg_content, next_msg_id_func()) # publish from the randomly chosen host - await gossipsubs[central_node_index].publish(host_id, packet.SerializeToString()) + await pubsubs[central_node_index].publish(topic, b"data") # Check that the gossipsub of central node has fanout for the topic assert topic in gossipsubs[central_node_index].fanout @@ -86,6 +91,8 @@ async def test_leave(): gossipsub = gossipsubs[0] topic = "test_leave" + assert topic not in gossipsub.mesh + await gossipsub.join(topic) assert topic in gossipsub.mesh @@ -205,14 +212,12 @@ async def test_handle_prune(): @pytest.mark.asyncio async def test_dense(): # Create libp2p hosts - next_msg_id_func = message_id_generator(0) - num_hosts = 10 num_msgs = 5 libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ SUPPORTED_PROTOCOLS, \ 10, 9, 11, 30, 3, 5, 0.5) @@ -231,41 +236,35 @@ async def test_dense(): await asyncio.sleep(2) for i in range(num_msgs): - msg_content = "foo " + str(i) + msg_content = b"foo " + i.to_bytes(1, 'big') # randomly pick a message origin origin_idx = random.randint(0, num_hosts - 1) - origin_host = libp2p_hosts[origin_idx] - host_id = str(origin_host.get_id()) - - # Generate message packet - packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) # publish from the randomly chosen host - await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + await pubsubs[origin_idx].publish("foobar", msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() - assert msg.data == packet.publish[0].data + assert msg.data == msg_content await cleanup() + @pytest.mark.asyncio async def test_fanout(): # Create libp2p hosts - next_msg_id_func = message_id_generator(0) - num_hosts = 10 num_msgs = 5 libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ SUPPORTED_PROTOCOLS, \ 10, 9, 11, 30, 3, 5, 0.5) - # All pubsub subscribe to foobar + # All pubsub subscribe to foobar except for `pubsubs[0]` queues = [] for i in range(1, len(pubsubs)): q = await pubsubs[i].subscribe("foobar") @@ -279,71 +278,61 @@ async def test_fanout(): # Wait 2 seconds for heartbeat to allow mesh to connect await asyncio.sleep(2) + topic = "foobar" # Send messages with origin not subscribed for i in range(num_msgs): - msg_content = "foo " + str(i) + msg_content = b"foo " + i.to_bytes(1, "big") # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 - origin_host = libp2p_hosts[origin_idx] - host_id = str(origin_host.get_id()) - - # Generate message packet - packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) # publish from the randomly chosen host - await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + await pubsubs[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() - assert msg.SerializeToString() == packet.publish[0].SerializeToString() + assert msg.data == msg_content # Subscribe message origin - queues.append(await pubsubs[0].subscribe("foobar")) + queues.insert(0, await pubsubs[0].subscribe(topic)) # Send messages again for i in range(num_msgs): - msg_content = "foo " + str(i) + msg_content = b"bar " + i.to_bytes(1, 'big') # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 - origin_host = libp2p_hosts[origin_idx] - host_id = str(origin_host.get_id()) - - # Generate message packet - packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) # publish from the randomly chosen host - await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + await pubsubs[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() - assert msg.SerializeToString() == packet.publish[0].SerializeToString() + assert msg.data == msg_content await cleanup() @pytest.mark.asyncio async def test_fanout_maintenance(): # Create libp2p hosts - next_msg_id_func = message_id_generator(0) - num_hosts = 10 num_msgs = 5 libp2p_hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ + pubsubs, _ = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ SUPPORTED_PROTOCOLS, \ 10, 9, 11, 30, 3, 5, 0.5) # All pubsub subscribe to foobar queues = [] + topic = "foobar" for i in range(1, len(pubsubs)): - q = await pubsubs[i].subscribe("foobar") + q = await pubsubs[i].subscribe(topic) # Add each blocking queue to an array of blocking queues queues.append(q) @@ -356,27 +345,22 @@ async def test_fanout_maintenance(): # Send messages with origin not subscribed for i in range(num_msgs): - msg_content = "foo " + str(i) + msg_content = b"foo " + i.to_bytes(1, 'big') # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 - origin_host = libp2p_hosts[origin_idx] - host_id = str(origin_host.get_id()) - - # Generate message packet - packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) # publish from the randomly chosen host - await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + await pubsubs[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() - assert msg.SerializeToString() == packet.publish[0].SerializeToString() + assert msg.data == msg_content for sub in pubsubs: - await sub.unsubscribe('foobar') + await sub.unsubscribe(topic) queues = [] @@ -384,7 +368,7 @@ async def test_fanout_maintenance(): # Resub and repeat for i in range(1, len(pubsubs)): - q = await pubsubs[i].subscribe("foobar") + q = await pubsubs[i].subscribe(topic) # Add each blocking queue to an array of blocking queues queues.append(q) @@ -393,65 +377,61 @@ async def test_fanout_maintenance(): # Check messages can still be sent for i in range(num_msgs): - msg_content = "foo " + str(i) + msg_content = b"bar " + i.to_bytes(1, 'big') # Pick the message origin to the node that is not subscribed to 'foobar' origin_idx = 0 - origin_host = libp2p_hosts[origin_idx] - host_id = str(origin_host.get_id()) - - # Generate message packet - packet = generate_RPC_packet(host_id, ["foobar"], msg_content, next_msg_id_func()) # publish from the randomly chosen host - await gossipsubs[origin_idx].publish(host_id, packet.SerializeToString()) + await pubsubs[origin_idx].publish(topic, msg_content) await asyncio.sleep(0.5) # Assert that all blocking queues receive the message for queue in queues: msg = await queue.get() - assert msg.SerializeToString() == packet.publish[0].SerializeToString() + assert msg.data == msg_content await cleanup() + @pytest.mark.asyncio async def test_gossip_propagation(): # Create libp2p hosts - next_msg_id_func = message_id_generator(0) - num_hosts = 2 - libp2p_hosts = await create_libp2p_hosts(num_hosts) + hosts = await create_libp2p_hosts(num_hosts) # Create pubsub, gossipsub instances - pubsubs, gossipsubs = create_pubsub_and_gossipsub_instances(libp2p_hosts, \ - SUPPORTED_PROTOCOLS, \ - 1, 0, 2, 30, 50, 100, 0.5) - node1, node2 = libp2p_hosts[0], libp2p_hosts[1] - sub1, sub2 = pubsubs[0], pubsubs[1] - gsub1, gsub2 = gossipsubs[0], gossipsubs[1] + pubsubs, _ = create_pubsub_and_gossipsub_instances( + hosts, + SUPPORTED_PROTOCOLS, + 1, + 0, + 2, + 30, + 50, + 100, + 0.5, + ) - node1_queue = await sub1.subscribe('foo') + topic = "foo" + await pubsubs[0].subscribe(topic) - # node 1 publish to topic - msg_content = 'foo_msg' - node1_id = str(node1.get_id()) - - # Generate message packet - packet = generate_RPC_packet(node1_id, ["foo"], msg_content, next_msg_id_func()) + # node 0 publish to topic + msg_content = b'foo_msg' # publish from the randomly chosen host - await gsub1.publish(node1_id, packet.SerializeToString()) + await pubsubs[0].publish(topic, msg_content) - # now node 2 subscribes - node2_queue = await sub2.subscribe('foo') + # now node 1 subscribes + queue_1 = await pubsubs[1].subscribe(topic) - await connect(node2, node1) + await connect(hosts[0], hosts[1]) # wait for gossip heartbeat await asyncio.sleep(2) # should be able to read message - msg = await node2_queue.get() - assert msg.SerializeToString() == packet.publish[0].SerializeToString() + msg = await queue_1.get() + assert msg.data == msg_content await cleanup() diff --git a/tests/pubsub/test_gossipsub_backward_compatibility.py b/tests/pubsub/test_gossipsub_backward_compatibility.py index 468e25f..2a3167b 100644 --- a/tests/pubsub/test_gossipsub_backward_compatibility.py +++ b/tests/pubsub/test_gossipsub_backward_compatibility.py @@ -1,35 +1,31 @@ -import asyncio -import multiaddr +import functools + import pytest from libp2p import new_node -from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.gossipsub import GossipSub -from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub -from utils import message_id_generator, generate_RPC_packet + from tests.utils import cleanup +from .configs import ( + FLOODSUB_PROTOCOL_ID, + LISTEN_MADDR, +) +from .floodsub_integration_test_settings import ( + perform_test_from_obj, + floodsub_protocol_pytest_params, +) + + # pylint: disable=too-many-locals - -async def connect(node1, node2): - """ - Connect node1 to node2 - """ - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) - @pytest.mark.asyncio -async def test_init(): - node = await new_node(transport_opt=["/ip4/127.1/tcp/0"]) +async def test_gossipsub_initialize_with_floodsub_protocol(): + node = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.1/tcp/0")) + await node.get_network().listen(LISTEN_MADDR) - supported_protocols = ["/gossipsub/1.0.0"] - - gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) + gossipsub = GossipSub([FLOODSUB_PROTOCOL_ID], 3, 2, 4, 30) pubsub = Pubsub(node, gossipsub, "a") # Did it work? @@ -37,483 +33,20 @@ async def test_init(): await cleanup() -async def perform_test_from_obj(obj): - """ - Perform a floodsub test from a test obj. - test obj are composed as follows: - - { - "supported_protocols": ["supported/protocol/1.0.0",...], - "adj_list": { - "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], - "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], - ... - }, - "topic_map": { - "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] - }, - "messages": [ - { - "topics": ["topic1_for_message", "topic2_for_message", ...], - "data": "some contents of the message (newlines are not supported)", - "node_id": "message sender node id" - }, - ... - ] - } - NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A - or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior - is undefined (even if it may work) - """ - - # Step 1) Create graph - adj_list = obj["adj_list"] - node_map = {} - gossipsub_map = {} - pubsub_map = {} - - supported_protocols = obj["supported_protocols"] - - tasks_connect = [] - for start_node_id in adj_list: - # Create node if node does not yet exist - if start_node_id not in node_map: - node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[start_node_id] = node - - gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) - gossipsub_map[start_node_id] = gossipsub - pubsub = Pubsub(node, gossipsub, start_node_id) - pubsub_map[start_node_id] = pubsub - - # For each neighbor of start_node, create if does not yet exist, - # then connect start_node to neighbor - for neighbor_id in adj_list[start_node_id]: - # Create neighbor if neighbor does not yet exist - if neighbor_id not in node_map: - neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[neighbor_id] = neighbor_node - - gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) - gossipsub_map[neighbor_id] = gossipsub - pubsub = Pubsub(neighbor_node, gossipsub, neighbor_id) - pubsub_map[neighbor_id] = pubsub - - # Connect node and neighbor - tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id]))) - tasks_connect.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_connect) - - # Allow time for graph creation before continuing - # await asyncio.sleep(0.25) - - # Step 2) Subscribe to topics - queues_map = {} - topic_map = obj["topic_map"] - - tasks_topic = [] - tasks_topic_data = [] - for topic in topic_map: - for node_id in topic_map[topic]: - """ - # Subscribe node to topic - q = await pubsub_map[node_id].subscribe(topic) - - # Create topic-queue map for node_id if one does not yet exist - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - """ - tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic))) - tasks_topic_data.append((node_id, topic)) - tasks_topic.append(asyncio.sleep(2)) - - # Gather is like Promise.all - responses = await asyncio.gather(*tasks_topic, return_exceptions=True) - for i in range(len(responses) - 1): - q = responses[i] - node_id, topic = tasks_topic_data[i] - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - - # Allow time for subscribing before continuing - # await asyncio.sleep(0.01) - - # Step 3) Publish messages - topics_in_msgs_ordered = [] - messages = obj["messages"] - tasks_publish = [] - next_msg_id_func = message_id_generator(0) - - for msg in messages: - topics = msg["topics"] - - data = msg["data"] - node_id = msg["node_id"] - - # Get actual id for sender node (not the id from the test obj) - actual_node_id = str(node_map[node_id].get_id()) - - # Create correctly formatted message - msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) - - # Publish message - tasks_publish.append(asyncio.ensure_future(gossipsub_map[node_id].publish(\ - actual_node_id, msg_talk.SerializeToString()))) - - # For each topic in topics, add topic, msg_talk tuple to ordered test list - # TODO: Update message sender to be correct message sender before - # adding msg_talk to this list - for topic in topics: - topics_in_msgs_ordered.append((topic, msg_talk)) - - # Allow time for publishing before continuing - # await asyncio.sleep(0.4) - tasks_publish.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_publish) - - # Step 4) Check that all messages were received correctly. - # TODO: Check message sender too - for i in range(len(topics_in_msgs_ordered)): - topic, actual_msg = topics_in_msgs_ordered[i] - - # Look at each node in each topic - for node_id in topic_map[topic]: - # Get message from subscription queue - msg_on_node = await queues_map[node_id][topic].get() - assert actual_msg.publish[0].SerializeToString() == msg_on_node.SerializeToString() - - # Success, terminate pending tasks. - await cleanup() +@pytest.mark.parametrize( + "test_case_obj", + floodsub_protocol_pytest_params, +) @pytest.mark.asyncio -async def test_simple_two_nodes_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_two_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"], - "B": ["C"] - }, - "topic_map": { - "topic1": ["B", "C"], - "topic2": ["B", "C"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - }, - { - "topics": ["topic2"], - "data": "Alex is tall", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_two_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" - }, - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_one_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"], - "space": ["2", "3", "4", "5", "6", "7"], - "onions": ["2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["space"], - "data": "foobar", - "node_id": "1" - }, - { - "topics": ["onions"], - "data": "I am allergic", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], - "space": ["1", "2", "3", "4", "5", "6", "7"], - "onions": ["1", "2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["space"], - "data": "foobar", - "node_id": "4" - }, - { - "topics": ["onions"], - "data": "I am allergic", - "node_id": "7" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["3"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3"], - "school": ["1", "2", "3"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3", "4"], - "2": ["1", "3", "4"], - "3": ["1", "2", "4"], - "4": ["1", "2", "3"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4"], - "school": ["1", "2", "3", "4"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar2", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar3", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2"], - "2": ["3"], - "3": ["4"], - "4": ["5"], - "5": ["1"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5"], - "school": ["1", "2", "3", "4", "5"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar2", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar3", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) +async def test_gossipsub_run_with_floodsub_tests(test_case_obj): + await perform_test_from_obj( + test_case_obj, + functools.partial( + GossipSub, + degree=3, + degree_low=2, + degree_high=4, + time_to_live=30, + ) + ) diff --git a/tests/pubsub/test_mcache.py b/tests/pubsub/test_mcache.py index 5446f74..0e73222 100644 --- a/tests/pubsub/test_mcache.py +++ b/tests/pubsub/test_mcache.py @@ -1,14 +1,17 @@ import pytest + from libp2p.pubsub.mcache import MessageCache class Msg: def __init__(self, topicIDs, seqno, from_id): + # pylint: disable=invalid-name self.topicIDs = topicIDs - self.seqno = seqno, + self.seqno = seqno self.from_id = from_id + @pytest.mark.asyncio async def test_mcache(): # Ported from: diff --git a/tests/pubsub/test_subscription.py b/tests/pubsub/test_subscription.py index c8f46f5..0dfdc4d 100644 --- a/tests/pubsub/test_subscription.py +++ b/tests/pubsub/test_subscription.py @@ -5,7 +5,10 @@ from libp2p import new_node from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] +from .configs import FLOODSUB_PROTOCOL_ID + + +SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] TESTING_TOPIC = "TEST_SUBSCRIBE" diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index e406536..93fc235 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,13 +1,18 @@ import asyncio -import multiaddr -import uuid -import random import struct +from typing import ( + Sequence, +) + +import multiaddr + from libp2p import new_node -from libp2p.pubsub.pb import rpc_pb2 -from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.pubsub.pubsub import Pubsub +from libp2p.peer.id import ID from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.pubsub.pubsub import Pubsub + +from tests.utils import connect def message_id_generator(start_val): @@ -17,6 +22,7 @@ def message_id_generator(start_val): :return: message id """ val = start_val + def generator(): # Allow manipulation of val within closure nonlocal val @@ -29,6 +35,20 @@ def message_id_generator(start_val): return generator + +def make_pubsub_msg( + origin_id: ID, + topic_ids: Sequence[str], + data: bytes, + seqno: bytes) -> rpc_pb2.Message: + return rpc_pb2.Message( + from_id=origin_id.to_bytes(), + seqno=seqno, + data=data, + topicIDs=list(topic_ids), + ) + + def generate_RPC_packet(origin_id, topics, msg_content, msg_id): """ Generate RPC packet to send over wire @@ -42,7 +62,7 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): from_id=origin_id.encode('utf-8'), seqno=msg_id, data=msg_content.encode('utf-8'), - ) + ) for topic in topics: message.topicIDs.extend([topic.encode('utf-8')]) @@ -50,13 +70,6 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): packet.publish.extend([message]) return packet -async def connect(node1, node2): - """ - Connect node1 to node2 - """ - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) async def create_libp2p_hosts(num_hosts): """ @@ -78,8 +91,17 @@ async def create_libp2p_hosts(num_hosts): return hosts -def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, degree, degree_low, \ - degree_high, time_to_live, gossip_window, gossip_history, heartbeat_interval): + +def create_pubsub_and_gossipsub_instances( + libp2p_hosts, + supported_protocols, + degree, + degree_low, + degree_high, + time_to_live, + gossip_window, + gossip_history, + heartbeat_interval): pubsubs = [] gossipsubs = [] for node in libp2p_hosts: @@ -93,6 +115,10 @@ def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, deg return pubsubs, gossipsubs + +# FIXME: There is no difference between `sparse_connect` and `dense_connect`, +# before `connect_some` is fixed. + async def sparse_connect(hosts): await connect_some(hosts, 3) @@ -101,6 +127,7 @@ async def dense_connect(hosts): await connect_some(hosts, 10) +# FIXME: `degree` is not used at all async def connect_some(hosts, degree): for i, host in enumerate(hosts): for j, host2 in enumerate(hosts): @@ -123,6 +150,7 @@ async def connect_some(hosts, degree): # j += 1 + async def one_to_all_connect(hosts, central_host_index): for i, host in enumerate(hosts): if i != central_host_index: diff --git a/tests/utils.py b/tests/utils.py index 4efde83..686d086 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,6 +3,16 @@ import asyncio import multiaddr from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr + + +async def connect(node1, node2): + """ + Connect node1 to node2 + """ + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) async def cleanup():