From 08f845cc219a5bac84a7d936832cd83f16f7675f Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sat, 6 Apr 2019 17:48:06 -0400 Subject: [PATCH] Lay groundwork for transition to multiple topics --- examples/sharding/driver.py | 42 ++++++++++++++++++++++++++++------- examples/sharding/receiver.py | 14 +++++++----- examples/sharding/sender.py | 12 +++++----- 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/examples/sharding/driver.py b/examples/sharding/driver.py index a4c1949..bad8539 100644 --- a/examples/sharding/driver.py +++ b/examples/sharding/driver.py @@ -4,11 +4,13 @@ from receiver import ReceiverNode from libp2p.peer.peerinfo import info_from_p2p_addr from tests.utils import cleanup +ACK_PROTOCOL = "/ack/1.0.0" -async def create_receivers(num_receivers): +async def create_receivers(num_receivers, topic_map): receivers = [] + # Create receivers for i in range(num_receivers): - receivers.append(await ReceiverNode.create()) + receivers.append(await ReceiverNode.create(ACK_PROTOCOL, topic_map[i])) return receivers async def connect(node1, node2): @@ -32,22 +34,44 @@ async def create_topology(adjacency_map, sender, receivers): await connect(receivers[source_num].libp2p_node, \ receivers[target_num].libp2p_node) +def get_num_receivers_in_topology(topology): + receiver_ids = [] + for key in topology: + if key != "sender": + if key not in receiver_ids: + receiver_ids.append(key) + for neighbor in topology[key]: + if neighbor not in receiver_ids: + receiver_ids.append(neighbor) + return len(receiver_ids) + async def main(): # Create sender print("Sender created") - sender = await SenderNode.create() + sender = await SenderNode.create(ACK_PROTOCOL) # Create receivers print("Receivers created") - # sender_id = sender.libp2p_node.get_id() - receivers = await create_receivers(2) - # Define topology + # Define connection topology topology = { "sender": [0], - 0: [1] + 0: [1, 2], + 1: [3, 4], + 2: [5, 6] } + num_receivers = get_num_receivers_in_topology(topology) + + # Define topic map + topic_map = {} + for num in range(num_receivers): + topic_map[num] = "1" + + topics = ["1"] + + receivers = await create_receivers(num_receivers, topic_map) + # Create network topology await create_topology(topology, sender, receivers) print("Topology created") @@ -58,11 +82,13 @@ async def main(): for receiver in receivers: print("Starting receiving") asyncio.ensure_future(receiver.start_receiving(sender_info)) + + # Allow time for start receiving to be completed await asyncio.sleep(0.5) # 2) Start sending messages and perform throughput test print("Performing test") - await sender.perform_test(2, 5) + await sender.perform_test(num_receivers, topics, 1) await cleanup() diff --git a/examples/sharding/receiver.py b/examples/sharding/receiver.py index f8699d9..df547ad 100644 --- a/examples/sharding/receiver.py +++ b/examples/sharding/receiver.py @@ -21,7 +21,7 @@ class ReceiverNode(): self.next_msg_id_func = message_id_generator(0) @classmethod - async def create(cls): + async def create(cls, ack_protocol, topic): """ Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub instance to this new node @@ -39,7 +39,10 @@ class ReceiverNode(): self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS) self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a") - self.pubsub_messages = await self.pubsub.subscribe(TOPIC) + self.pubsub_messages = await self.pubsub.subscribe(topic) + self.topic = topic + + self.ack_protocol = ack_protocol return self @@ -50,11 +53,12 @@ class ReceiverNode(): async def start_receiving(self, sender_node_info): await self.libp2p_node.connect(sender_node_info) - ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, ["/ack/1.0.0"]) + ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, [self.ack_protocol]) asyncio.ensure_future(self.wait_for_end(ack_stream)) - # TODO: get peer id self.should_listen = True + ack_msg = self.topic + encoded_ack_msg = ack_msg.encode() while self.should_listen: msg = await self.pubsub_messages.get() - await ack_stream.write("ack".encode()) + await ack_stream.write(encoded_ack_msg) diff --git a/examples/sharding/sender.py b/examples/sharding/sender.py index 43ad7d5..53791db 100644 --- a/examples/sharding/sender.py +++ b/examples/sharding/sender.py @@ -23,7 +23,7 @@ class SenderNode(): self.ack_queue = asyncio.Queue() @classmethod - async def create(cls): + async def create(cls, ack_protocol): """ Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub instance to this new node @@ -56,11 +56,12 @@ class SenderNode(): # Notify receivers test is over await stream.write("end".encode()) # Set handler for acks - self.libp2p_node.set_stream_handler("/ack/1.0.0", ack_stream_handler) + self.ack_protocol = ack_protocol + self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler) return self - async def perform_test(self, num_receivers, time_length): + async def perform_test(self, num_receivers, topics, time_length): # Time and loop start = timer() curr_time = timer() @@ -72,7 +73,7 @@ class SenderNode(): num_fully_ack = 0 while (curr_time - start) < time_length: # Send message (NOTE THIS IS JUST ONE TOPIC) - packet = generate_RPC_packet(my_id, [TOPIC], msg_contents, self.next_msg_id_func()) + packet = generate_RPC_packet(my_id, topics, msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) num_sent += 1 # Wait for acks @@ -83,8 +84,9 @@ class SenderNode(): num_fully_ack += 1 curr_time = timer() + # Do something interesting with test results print("Num sent: " + str(num_sent)) print("Num fully ack: " + str(num_fully_ack)) - # Do something interesting with test results + # End test self.test_being_performed = False