From 25def0c7d60c2cf351189e18a5238b71d661326d Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sat, 6 Apr 2019 17:16:37 -0400 Subject: [PATCH] Implement end-to-end perform testing --- examples/sharding/driver.py | 74 +++++++++++++++++++++++++++++++++++ examples/sharding/receiver.py | 31 ++++++++++----- examples/sharding/sender.py | 39 +++++++++++------- 3 files changed, 120 insertions(+), 24 deletions(-) create mode 100644 examples/sharding/driver.py diff --git a/examples/sharding/driver.py b/examples/sharding/driver.py new file mode 100644 index 0000000..a4c1949 --- /dev/null +++ b/examples/sharding/driver.py @@ -0,0 +1,74 @@ +import asyncio +from sender import SenderNode +from receiver import ReceiverNode +from libp2p.peer.peerinfo import info_from_p2p_addr +from tests.utils import cleanup + + +async def create_receivers(num_receivers): + receivers = [] + for i in range(num_receivers): + receivers.append(await ReceiverNode.create()) + return receivers + +async def connect(node1, node2): + # node1 connects to node2 + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) + +async def create_topology(adjacency_map, sender, receivers): + # Create network + + # Connect senders to receivers + for target_num in adjacency_map["sender"]: + await connect(sender.libp2p_node, receivers[target_num].libp2p_node) + + # Connect receivers to other receivers + for source_num in adjacency_map: + if source_num != "sender": + target_nums = adjacency_map[source_num] + for target_num in target_nums: + await connect(receivers[source_num].libp2p_node, \ + receivers[target_num].libp2p_node) + +async def main(): + # Create sender + print("Sender created") + sender = await SenderNode.create() + + # Create receivers + print("Receivers created") + # sender_id = sender.libp2p_node.get_id() + receivers = await create_receivers(2) + + # Define topology + topology = { + "sender": [0], + 0: [1] + } + + # Create network topology + await create_topology(topology, sender, receivers) + print("Topology created") + + # Perform throughput test + # 1) Start receivers + sender_info = info_from_p2p_addr(sender.libp2p_node.get_addrs()[0]) + for receiver in receivers: + print("Starting receiving") + asyncio.ensure_future(receiver.start_receiving(sender_info)) + await asyncio.sleep(0.5) + + # 2) Start sending messages and perform throughput test + print("Performing test") + await sender.perform_test(2, 5) + + await cleanup() + + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/sharding/receiver.py b/examples/sharding/receiver.py index 676293b..f8699d9 100644 --- a/examples/sharding/receiver.py +++ b/examples/sharding/receiver.py @@ -1,11 +1,14 @@ import multiaddr +import asyncio from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub - +from tests.pubsub.utils import message_id_generator TOPIC = "eth" +SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] class ReceiverNode(): """ @@ -16,10 +19,9 @@ class ReceiverNode(): def __init__(self): self.next_msg_id_func = message_id_generator(0) - self.node_id = str(uuid.uuid1()) @classmethod - async def create(cls, sender_node_id): + async def create(cls): """ Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub instance to this new node @@ -27,7 +29,7 @@ class ReceiverNode(): We use create as this serves as a factory function and allows us to use async await, unlike the init function """ - self = DummyAccountNode() + self = ReceiverNode() libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) @@ -37,13 +39,22 @@ class ReceiverNode(): self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") - self.pubsub_messages = await pubsub.subscribe(TOPIC) - ack_stream = await node.new_stream(sender_node_id, ["/ack/1.0.0"]) + self.pubsub_messages = await self.pubsub.subscribe(TOPIC) return self - async def start_receiving(): + async def wait_for_end(self, ack_stream): + msg = (await ack_stream.read()).decode() + if msg == "end": + self.should_listen = False + + async def start_receiving(self, sender_node_info): + await self.libp2p_node.connect(sender_node_info) + ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, ["/ack/1.0.0"]) + asyncio.ensure_future(self.wait_for_end(ack_stream)) + # TODO: get peer id - while True: - msg = await pubsub_messages.get() - await ack_stream.write("ack") + self.should_listen = True + while self.should_listen: + msg = await self.pubsub_messages.get() + await ack_stream.write("ack".encode()) diff --git a/examples/sharding/sender.py b/examples/sharding/sender.py index 7efa8fd..43ad7d5 100644 --- a/examples/sharding/sender.py +++ b/examples/sharding/sender.py @@ -10,8 +10,6 @@ from libp2p.pubsub.floodsub import FloodSub SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] TOPIC = "eth" -NUM_RECEIVERS = 10 -TIME_LENGTH = 1000 # seconds, double check class SenderNode(): """ @@ -22,7 +20,6 @@ class SenderNode(): def __init__(self): self.next_msg_id_func = message_id_generator(0) - self.node_id = str(uuid.uuid1()) self.ack_queue = asyncio.Queue() @classmethod @@ -43,21 +40,27 @@ class SenderNode(): self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") - await pubsub.subscribe(TOPIC) + await self.pubsub.subscribe(TOPIC) + + self.test_being_performed = True + + this = self async def ack_stream_handler(stream): - while True: + while self.test_being_performed: ack = await stream.read() - if ack is not None: - await ack_queue.put(ack) + await self.ack_queue.put(ack) + # Reached once test_being_performed is False + # Notify receivers test is over + await stream.write("end".encode()) # Set handler for acks - self.libp2p_node.set_stream_handler("/ack/1", ack_stream_handler) + self.libp2p_node.set_stream_handler("/ack/1.0.0", ack_stream_handler) return self - def perform_test(): + async def perform_test(self, num_receivers, time_length): # Time and loop start = timer() curr_time = timer() @@ -65,15 +68,23 @@ class SenderNode(): my_id = str(self.libp2p_node.get_id()) msg_contents = "transaction" - while (curr_time - start) < TIME_LENGTH: + num_sent = 0 + num_fully_ack = 0 + while (curr_time - start) < time_length: # Send message (NOTE THIS IS JUST ONE TOPIC) packet = generate_RPC_packet(my_id, [TOPIC], msg_contents, self.next_msg_id_func()) - await floodsub.publish(my_id, packet.SerializeToString()) - + await self.floodsub.publish(my_id, packet.SerializeToString()) + num_sent += 1 # Wait for acks num_acks = 0 - while num_acks < NUM_RECEIVERS: + while num_acks < num_receivers: await self.ack_queue.get() num_acks += 1 + num_fully_ack += 1 + curr_time = timer() - curr_time = timer() + print("Num sent: " + str(num_sent)) + print("Num fully ack: " + str(num_fully_ack)) + # Do something interesting with test results + # End test + self.test_being_performed = False