diff --git a/examples/sharding/driver.py b/examples/sharding/driver.py index 0ca7084..55a56b0 100644 --- a/examples/sharding/driver.py +++ b/examples/sharding/driver.py @@ -98,7 +98,7 @@ async def main(): for topic in topic_map: num_receivers_in_each_topic[topic] = len(topic_map[topic]) print("Performing test") - await sender.perform_test(num_receivers_in_each_topic, topics, 1) + await sender.perform_test(num_receivers_in_each_topic, topics, 10) print("All testing completed") await cleanup() diff --git a/examples/sharding/receiver.py b/examples/sharding/receiver.py index 1ec811f..040afec 100644 --- a/examples/sharding/receiver.py +++ b/examples/sharding/receiver.py @@ -47,7 +47,11 @@ class ReceiverNode(): return self async def wait_for_end(self, ack_stream): - msg = (await ack_stream.read()).decode() + # Continue waiting for end message, even if None (i.e. timeout) is received + msg = await ack_stream.read() + while msg is None: + msg = await ack_stream.read() + msg = msg.decode() if msg == "end": self.should_listen = False print("End received") diff --git a/examples/sharding/sender.py b/examples/sharding/sender.py index 0c848b5..bedd23b 100644 --- a/examples/sharding/sender.py +++ b/examples/sharding/sender.py @@ -117,7 +117,7 @@ class SenderNode(): # Perform test while time is not up here AND # while time is not up in handle_ack_queues, which is checked with the # self.test_being_performed boolean - while (curr_time - start) < time_length and self.test_being_performed: + while (curr_time - start) < time_length: # Send message on single topic packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func()) @@ -132,7 +132,7 @@ class SenderNode(): # self.test_being_performed boolean # TODO: Check safety of this. Does this make sense in the asyncio # event-driven setting? - while num_acks < num_receivers_in_each_topic[topic] and self.test_being_performed: + while num_acks < num_receivers_in_each_topic[topic]: ack = await self.topic_ack_queues[topic].get() num_acks += 1 num_acks_in_each_topic[topic] += 1