Add multi-topic benchmark capability
This commit is contained in:
parent
82f881a49a
commit
f570b19db8
@ -62,16 +62,17 @@ async def main():
|
|||||||
|
|
||||||
# Define connection topology
|
# Define connection topology
|
||||||
topology = {
|
topology = {
|
||||||
"sender": [0, 1]
|
"sender": [0, 2, 4],
|
||||||
|
0: [1],
|
||||||
|
2: [3],
|
||||||
|
4: [5]
|
||||||
}
|
}
|
||||||
|
|
||||||
num_receivers = get_num_receivers_in_topology(topology)
|
num_receivers = get_num_receivers_in_topology(topology)
|
||||||
|
|
||||||
# Define topic map
|
# Define topic map
|
||||||
topic_map = {
|
topic_map = {
|
||||||
# "1": [x for x in range(num_receivers)]
|
"1": [0, 1, 2, 3, 4, 5]
|
||||||
"1": [0],
|
|
||||||
"2": [1]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
topics = topic_map.keys()
|
topics = topic_map.keys()
|
||||||
|
@ -63,3 +63,4 @@ class ReceiverNode():
|
|||||||
while self.should_listen:
|
while self.should_listen:
|
||||||
msg = await self.pubsub_messages.get()
|
msg = await self.pubsub_messages.get()
|
||||||
await ack_stream.write(encoded_ack_msg)
|
await ack_stream.write(encoded_ack_msg)
|
||||||
|
print("Receiver closed")
|
||||||
|
@ -47,18 +47,34 @@ class SenderNode():
|
|||||||
|
|
||||||
this = self
|
this = self
|
||||||
|
|
||||||
|
all_streams = []
|
||||||
|
cleanup_called = False
|
||||||
|
async def stream_cleanup():
|
||||||
|
print("CLEANUP CALLED")
|
||||||
|
cleanup_called = True
|
||||||
|
for stream in all_streams:
|
||||||
|
await stream.write("end".encode())
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
async def ack_stream_handler(stream):
|
async def ack_stream_handler(stream):
|
||||||
|
all_streams.append(stream)
|
||||||
while self.test_being_performed:
|
while self.test_being_performed:
|
||||||
print("WAITING ON READ")
|
# This Ack is what times out when multi-topic tests finish
|
||||||
ack = await stream.read()
|
ack = await stream.read()
|
||||||
print("READ OCC")
|
# print("READ OCC")
|
||||||
if ack is not None:
|
if ack is not None:
|
||||||
await self.ack_queue.put(ack)
|
await self.ack_queue.put(ack)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
# Reached once test_being_performed is False
|
# Reached once test_being_performed is False
|
||||||
# Notify receivers test is over
|
# Notify receivers test is over
|
||||||
print("TEST STOPPED BEING PERFORMED")
|
print("TEST STOPPED BEING PERFORMED --> Sending END")
|
||||||
await stream.write("end".encode())
|
# await stream.write("end".encode())
|
||||||
|
if not cleanup_called:
|
||||||
|
await stream_cleanup()
|
||||||
|
|
||||||
# Set handler for acks
|
# Set handler for acks
|
||||||
self.ack_protocol = ack_protocol
|
self.ack_protocol = ack_protocol
|
||||||
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
|
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
|
||||||
@ -89,6 +105,8 @@ class SenderNode():
|
|||||||
while (curr_time - start) < time_length:
|
while (curr_time - start) < time_length:
|
||||||
# print("GETTING ACK")
|
# print("GETTING ACK")
|
||||||
ack = await self.ack_queue.get()
|
ack = await self.ack_queue.get()
|
||||||
|
if ack is None:
|
||||||
|
break
|
||||||
# print("DECODING ACK")
|
# print("DECODING ACK")
|
||||||
decoded_ack = ack.decode()
|
decoded_ack = ack.decode()
|
||||||
|
|
||||||
@ -96,11 +114,17 @@ class SenderNode():
|
|||||||
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
||||||
# print("ADDING TO TOPIC ACK QUEUE " + str(topic))
|
# print("ADDING TO TOPIC ACK QUEUE " + str(topic))
|
||||||
curr_time = timer()
|
curr_time = timer()
|
||||||
print("EXI HANDLE ACK QUES")
|
# print("EXI HANDLE ACK QUES")
|
||||||
self.test_being_performed = False
|
self.test_being_performed = False
|
||||||
|
|
||||||
gathered = None
|
gathered = None
|
||||||
|
|
||||||
|
async def end_all_async():
|
||||||
|
# Add None to all queues indicating that we should break the loop
|
||||||
|
await self.ack_queue.put(None)
|
||||||
|
for queue in self.topic_ack_queues:
|
||||||
|
await self.topic_ack_queues[queue].put(None)
|
||||||
|
|
||||||
async def perform_test_on_topic(topic):
|
async def perform_test_on_topic(topic):
|
||||||
print("Performing test on topic " + topic)
|
print("Performing test on topic " + topic)
|
||||||
start = timer()
|
start = timer()
|
||||||
@ -126,14 +150,19 @@ class SenderNode():
|
|||||||
# event-driven setting?
|
# event-driven setting?
|
||||||
while num_acks < num_receivers_in_each_topic[topic] and self.test_being_performed:
|
while num_acks < num_receivers_in_each_topic[topic] and self.test_being_performed:
|
||||||
# print("IN WHILE")
|
# print("IN WHILE")
|
||||||
await self.topic_ack_queues[topic].get()
|
ack = await self.topic_ack_queues[topic].get()
|
||||||
|
if ack is None:
|
||||||
|
return
|
||||||
# print("GOT")
|
# print("GOT")
|
||||||
num_acks += 1
|
num_acks += 1
|
||||||
num_acks_in_each_topic[topic] += 1
|
num_acks_in_each_topic[topic] += 1
|
||||||
curr_time = timer()
|
curr_time = timer()
|
||||||
self.test_being_performed = False
|
self.test_being_performed = False
|
||||||
print("Time passed")
|
|
||||||
|
# THIS IS WHERE CANCELLING IS INITIATED
|
||||||
|
# print("Time passed")
|
||||||
print("CANCELING")
|
print("CANCELING")
|
||||||
|
await end_all_async()
|
||||||
# await cleanup()
|
# await cleanup()
|
||||||
|
|
||||||
tasks = [asyncio.ensure_future(handle_ack_queues())]
|
tasks = [asyncio.ensure_future(handle_ack_queues())]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user