Add end messages into simplified logic
This commit is contained in:
parent
470f5e6e51
commit
9277be98bf
@ -99,7 +99,7 @@ async def main():
|
||||
num_receivers_in_each_topic[topic] = len(topic_map[topic])
|
||||
print("Performing test")
|
||||
await sender.perform_test(num_receivers_in_each_topic, topics, 1)
|
||||
|
||||
print("All testing completed")
|
||||
await cleanup()
|
||||
|
||||
|
||||
|
@ -45,18 +45,17 @@ class SenderNode():
|
||||
|
||||
self.test_being_performed = True
|
||||
|
||||
this = self
|
||||
|
||||
all_streams = []
|
||||
|
||||
self.all_streams = []
|
||||
async def ack_stream_handler(stream):
|
||||
all_streams.append(stream)
|
||||
self.all_streams.append(stream)
|
||||
|
||||
while self.test_being_performed:
|
||||
# This Ack is what times out when multi-topic tests finish
|
||||
ack = await stream.read()
|
||||
if ack is not None:
|
||||
await self.ack_queue.put(ack)
|
||||
else:
|
||||
print("FUCK")
|
||||
break
|
||||
# Reached once test_being_performed is False
|
||||
# Notify receivers test is over
|
||||
@ -101,20 +100,25 @@ class SenderNode():
|
||||
curr_time = timer()
|
||||
|
||||
async def end_all_async():
|
||||
# Add None to all queues indicating that we should break the loop
|
||||
# Add None to ack_queue to break out of the loop.
|
||||
# Note: This is necessary given the current code or the code will never
|
||||
# terminate
|
||||
await self.ack_queue.put(None)
|
||||
for queue in self.topic_ack_queues:
|
||||
await self.topic_ack_queues[queue].put(None)
|
||||
|
||||
# This is not necessary but is useful for turning off the receivers gracefully
|
||||
for stream in self.all_streams:
|
||||
await stream.write("end".encode())
|
||||
|
||||
async def perform_test_on_topic(topic):
|
||||
print("Performing test on topic " + topic)
|
||||
start = timer()
|
||||
curr_time = timer()
|
||||
|
||||
# Perform test while time is not up here AND
|
||||
# while time is not up in handle_ack_queues, which is checked with the
|
||||
# self.test_being_performed boolean
|
||||
while (curr_time - start) < time_length and self.test_being_performed:
|
||||
# Send message (NOTE THIS IS JUST ONE TOPIC)
|
||||
# Send message on single topic
|
||||
packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func())
|
||||
|
||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||
@ -137,8 +141,8 @@ class SenderNode():
|
||||
nonlocal completed_topics_count, num_topics
|
||||
print("Test completed " + topic)
|
||||
completed_topics_count += 1
|
||||
self.test_being_performed = False
|
||||
if completed_topics_count == num_topics:
|
||||
self.test_being_performed = False
|
||||
print("End all async")
|
||||
await end_all_async()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user