From 9277be98bf9a55f3a73622b8802d592190afc4cc Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Sun, 7 Apr 2019 18:40:01 -0400 Subject: [PATCH] Add end messages into simplified logic --- examples/sharding/driver.py | 2 +- examples/sharding/sender.py | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/examples/sharding/driver.py b/examples/sharding/driver.py index 40242c0..0ca7084 100644 --- a/examples/sharding/driver.py +++ b/examples/sharding/driver.py @@ -99,7 +99,7 @@ async def main(): num_receivers_in_each_topic[topic] = len(topic_map[topic]) print("Performing test") await sender.perform_test(num_receivers_in_each_topic, topics, 1) - + print("All testing completed") await cleanup() diff --git a/examples/sharding/sender.py b/examples/sharding/sender.py index 08e3868..0c848b5 100644 --- a/examples/sharding/sender.py +++ b/examples/sharding/sender.py @@ -45,18 +45,17 @@ class SenderNode(): self.test_being_performed = True - this = self - - all_streams = [] - + self.all_streams = [] async def ack_stream_handler(stream): - all_streams.append(stream) + self.all_streams.append(stream) + while self.test_being_performed: # This Ack is what times out when multi-topic tests finish ack = await stream.read() if ack is not None: await self.ack_queue.put(ack) else: + print("FUCK") break # Reached once test_being_performed is False # Notify receivers test is over @@ -101,20 +100,25 @@ class SenderNode(): curr_time = timer() async def end_all_async(): - # Add None to all queues indicating that we should break the loop + # Add None to ack_queue to break out of the loop. + # Note: This is necessary given the current code or the code will never + # terminate await self.ack_queue.put(None) - for queue in self.topic_ack_queues: - await self.topic_ack_queues[queue].put(None) + + # This is not necessary but is useful for turning off the receivers gracefully + for stream in self.all_streams: + await stream.write("end".encode()) async def perform_test_on_topic(topic): print("Performing test on topic " + topic) start = timer() curr_time = timer() + # Perform test while time is not up here AND # while time is not up in handle_ack_queues, which is checked with the # self.test_being_performed boolean while (curr_time - start) < time_length and self.test_being_performed: - # Send message (NOTE THIS IS JUST ONE TOPIC) + # Send message on single topic packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) @@ -137,8 +141,8 @@ class SenderNode(): nonlocal completed_topics_count, num_topics print("Test completed " + topic) completed_topics_count += 1 - self.test_being_performed = False if completed_topics_count == num_topics: + self.test_being_performed = False print("End all async") await end_all_async()