Remove unused code
This commit is contained in:
parent
fe14c24a89
commit
7bacc8ca08
|
@ -49,7 +49,6 @@ class ReceiverNode():
|
||||||
async def wait_for_end(self, ack_stream):
|
async def wait_for_end(self, ack_stream):
|
||||||
msg = (await ack_stream.read()).decode()
|
msg = (await ack_stream.read()).decode()
|
||||||
if msg == "end":
|
if msg == "end":
|
||||||
print("END RECEIVED, KILL NOW")
|
|
||||||
self.should_listen = False
|
self.should_listen = False
|
||||||
|
|
||||||
async def start_receiving(self, sender_node_info):
|
async def start_receiving(self, sender_node_info):
|
||||||
|
|
|
@ -50,7 +50,6 @@ class SenderNode():
|
||||||
all_streams = []
|
all_streams = []
|
||||||
cleanup_called = False
|
cleanup_called = False
|
||||||
async def stream_cleanup():
|
async def stream_cleanup():
|
||||||
print("CLEANUP CALLED")
|
|
||||||
cleanup_called = True
|
cleanup_called = True
|
||||||
for stream in all_streams:
|
for stream in all_streams:
|
||||||
await stream.write("end".encode())
|
await stream.write("end".encode())
|
||||||
|
@ -62,7 +61,6 @@ class SenderNode():
|
||||||
while self.test_being_performed:
|
while self.test_being_performed:
|
||||||
# This Ack is what times out when multi-topic tests finish
|
# This Ack is what times out when multi-topic tests finish
|
||||||
ack = await stream.read()
|
ack = await stream.read()
|
||||||
# print("READ OCC")
|
|
||||||
if ack is not None:
|
if ack is not None:
|
||||||
await self.ack_queue.put(ack)
|
await self.ack_queue.put(ack)
|
||||||
else:
|
else:
|
||||||
|
@ -70,8 +68,6 @@ class SenderNode():
|
||||||
|
|
||||||
# Reached once test_being_performed is False
|
# Reached once test_being_performed is False
|
||||||
# Notify receivers test is over
|
# Notify receivers test is over
|
||||||
print("TEST STOPPED BEING PERFORMED --> Sending END")
|
|
||||||
# await stream.write("end".encode())
|
|
||||||
if not cleanup_called:
|
if not cleanup_called:
|
||||||
await stream_cleanup()
|
await stream_cleanup()
|
||||||
|
|
||||||
|
@ -83,8 +79,6 @@ class SenderNode():
|
||||||
|
|
||||||
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
|
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
|
||||||
# Time and loop
|
# Time and loop
|
||||||
# start = timer()
|
|
||||||
# curr_time = timer()
|
|
||||||
|
|
||||||
my_id = str(self.libp2p_node.get_id())
|
my_id = str(self.libp2p_node.get_id())
|
||||||
msg_contents = "transaction"
|
msg_contents = "transaction"
|
||||||
|
@ -103,14 +97,11 @@ class SenderNode():
|
||||||
start = timer()
|
start = timer()
|
||||||
curr_time = timer()
|
curr_time = timer()
|
||||||
while (curr_time - start) < time_length:
|
while (curr_time - start) < time_length:
|
||||||
# print("GETTING ACK")
|
|
||||||
ack = await self.ack_queue.get()
|
ack = await self.ack_queue.get()
|
||||||
if ack is None:
|
if ack is None:
|
||||||
break
|
break
|
||||||
# print("DECODING ACK")
|
|
||||||
decoded_ack = ack.decode()
|
decoded_ack = ack.decode()
|
||||||
|
|
||||||
# print("ACK REC IN HANDLE")
|
|
||||||
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
||||||
# print("ADDING TO TOPIC ACK QUEUE " + str(topic))
|
# print("ADDING TO TOPIC ACK QUEUE " + str(topic))
|
||||||
curr_time = timer()
|
curr_time = timer()
|
||||||
|
@ -141,7 +132,6 @@ class SenderNode():
|
||||||
|
|
||||||
# Wait for acks
|
# Wait for acks
|
||||||
num_acks = 0
|
num_acks = 0
|
||||||
# print("PRE WHILE")
|
|
||||||
|
|
||||||
# While number of acks is below threshold AND
|
# While number of acks is below threshold AND
|
||||||
# while time is not up in handle_ack_queues, which is checked with the
|
# while time is not up in handle_ack_queues, which is checked with the
|
||||||
|
@ -149,19 +139,14 @@ class SenderNode():
|
||||||
# TODO: Check safety of this. Does this make sense in the asyncio
|
# TODO: Check safety of this. Does this make sense in the asyncio
|
||||||
# event-driven setting?
|
# event-driven setting?
|
||||||
while num_acks < num_receivers_in_each_topic[topic] and self.test_being_performed:
|
while num_acks < num_receivers_in_each_topic[topic] and self.test_being_performed:
|
||||||
# print("IN WHILE")
|
|
||||||
ack = await self.topic_ack_queues[topic].get()
|
ack = await self.topic_ack_queues[topic].get()
|
||||||
if ack is None:
|
if ack is None:
|
||||||
return
|
return
|
||||||
# print("GOT")
|
|
||||||
num_acks += 1
|
num_acks += 1
|
||||||
num_acks_in_each_topic[topic] += 1
|
num_acks_in_each_topic[topic] += 1
|
||||||
curr_time = timer()
|
curr_time = timer()
|
||||||
self.test_being_performed = False
|
self.test_being_performed = False
|
||||||
|
|
||||||
# THIS IS WHERE CANCELLING IS INITIATED
|
|
||||||
# print("Time passed")
|
|
||||||
print("CANCELING")
|
|
||||||
await end_all_async()
|
await end_all_async()
|
||||||
# await cleanup()
|
# await cleanup()
|
||||||
|
|
||||||
|
@ -171,18 +156,6 @@ class SenderNode():
|
||||||
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
|
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
|
||||||
|
|
||||||
gathered = await asyncio.gather(*tasks, return_exceptions=True)
|
gathered = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
# while (curr_time - start) < time_length:
|
|
||||||
# # Send message (NOTE THIS IS JUST ONE TOPIC)
|
|
||||||
# packet = generate_RPC_packet(my_id, topics, msg_contents, self.next_msg_id_func())
|
|
||||||
# await self.floodsub.publish(my_id, packet.SerializeToString())
|
|
||||||
# num_sent += 1
|
|
||||||
# # Wait for acks
|
|
||||||
# num_acks = 0
|
|
||||||
# while num_acks < num_receivers:
|
|
||||||
# await self.ack_queue.get()
|
|
||||||
# num_acks += 1
|
|
||||||
# num_fully_ack += 1
|
|
||||||
# curr_time = timer()
|
|
||||||
|
|
||||||
# Do something interesting with test results
|
# Do something interesting with test results
|
||||||
print("Num sent: " + str(num_sent_in_each_topic))
|
print("Num sent: " + str(num_sent_in_each_topic))
|
||||||
|
|
Loading…
Reference in New Issue
Block a user