Not working as of now. Debugging
This commit is contained in:
parent
08f845cc21
commit
82f881a49a
|
@ -8,9 +8,16 @@ ACK_PROTOCOL = "/ack/1.0.0"
|
||||||
|
|
||||||
async def create_receivers(num_receivers, topic_map):
|
async def create_receivers(num_receivers, topic_map):
|
||||||
receivers = []
|
receivers = []
|
||||||
|
|
||||||
|
# From topic_map (topic -> list of receivers), create (receiver -> topic)
|
||||||
|
receiver_to_topic_map = {}
|
||||||
|
for topic in topic_map:
|
||||||
|
for receiver in topic_map[topic]:
|
||||||
|
receiver_to_topic_map[receiver] = topic
|
||||||
|
|
||||||
# Create receivers
|
# Create receivers
|
||||||
for i in range(num_receivers):
|
for i in range(num_receivers):
|
||||||
receivers.append(await ReceiverNode.create(ACK_PROTOCOL, topic_map[i]))
|
receivers.append(await ReceiverNode.create(ACK_PROTOCOL, receiver_to_topic_map[i]))
|
||||||
return receivers
|
return receivers
|
||||||
|
|
||||||
async def connect(node1, node2):
|
async def connect(node1, node2):
|
||||||
|
@ -55,20 +62,19 @@ async def main():
|
||||||
|
|
||||||
# Define connection topology
|
# Define connection topology
|
||||||
topology = {
|
topology = {
|
||||||
"sender": [0],
|
"sender": [0, 1]
|
||||||
0: [1, 2],
|
|
||||||
1: [3, 4],
|
|
||||||
2: [5, 6]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
num_receivers = get_num_receivers_in_topology(topology)
|
num_receivers = get_num_receivers_in_topology(topology)
|
||||||
|
|
||||||
# Define topic map
|
# Define topic map
|
||||||
topic_map = {}
|
topic_map = {
|
||||||
for num in range(num_receivers):
|
# "1": [x for x in range(num_receivers)]
|
||||||
topic_map[num] = "1"
|
"1": [0],
|
||||||
|
"2": [1]
|
||||||
|
}
|
||||||
|
|
||||||
topics = ["1"]
|
topics = topic_map.keys()
|
||||||
|
|
||||||
receivers = await create_receivers(num_receivers, topic_map)
|
receivers = await create_receivers(num_receivers, topic_map)
|
||||||
|
|
||||||
|
@ -87,13 +93,16 @@ async def main():
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
# 2) Start sending messages and perform throughput test
|
# 2) Start sending messages and perform throughput test
|
||||||
|
# Determine number of receivers in each topic
|
||||||
|
num_receivers_in_each_topic = {}
|
||||||
|
for topic in topic_map:
|
||||||
|
num_receivers_in_each_topic[topic] = len(topic_map[topic])
|
||||||
print("Performing test")
|
print("Performing test")
|
||||||
await sender.perform_test(num_receivers, topics, 1)
|
await sender.perform_test(num_receivers_in_each_topic, topics, 1)
|
||||||
|
|
||||||
await cleanup()
|
await cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
loop.run_until_complete(main())
|
loop.run_until_complete(main())
|
||||||
|
|
|
@ -49,6 +49,7 @@ class ReceiverNode():
|
||||||
async def wait_for_end(self, ack_stream):
|
async def wait_for_end(self, ack_stream):
|
||||||
msg = (await ack_stream.read()).decode()
|
msg = (await ack_stream.read()).decode()
|
||||||
if msg == "end":
|
if msg == "end":
|
||||||
|
print("END RECEIVED, KILL NOW")
|
||||||
self.should_listen = False
|
self.should_listen = False
|
||||||
|
|
||||||
async def start_receiving(self, sender_node_info):
|
async def start_receiving(self, sender_node_info):
|
||||||
|
|
|
@ -3,6 +3,7 @@ import multiaddr
|
||||||
|
|
||||||
from timeit import default_timer as timer
|
from timeit import default_timer as timer
|
||||||
|
|
||||||
|
from tests.utils import cleanup
|
||||||
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
|
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
|
||||||
from libp2p import new_node
|
from libp2p import new_node
|
||||||
from libp2p.pubsub.pubsub import Pubsub
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
@ -48,12 +49,15 @@ class SenderNode():
|
||||||
|
|
||||||
async def ack_stream_handler(stream):
|
async def ack_stream_handler(stream):
|
||||||
while self.test_being_performed:
|
while self.test_being_performed:
|
||||||
|
print("WAITING ON READ")
|
||||||
ack = await stream.read()
|
ack = await stream.read()
|
||||||
|
print("READ OCC")
|
||||||
if ack is not None:
|
if ack is not None:
|
||||||
await self.ack_queue.put(ack)
|
await self.ack_queue.put(ack)
|
||||||
|
|
||||||
# Reached once test_being_performed is False
|
# Reached once test_being_performed is False
|
||||||
# Notify receivers test is over
|
# Notify receivers test is over
|
||||||
|
print("TEST STOPPED BEING PERFORMED")
|
||||||
await stream.write("end".encode())
|
await stream.write("end".encode())
|
||||||
# Set handler for acks
|
# Set handler for acks
|
||||||
self.ack_protocol = ack_protocol
|
self.ack_protocol = ack_protocol
|
||||||
|
@ -61,32 +65,99 @@ class SenderNode():
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def perform_test(self, num_receivers, topics, time_length):
|
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
|
||||||
# Time and loop
|
# Time and loop
|
||||||
start = timer()
|
# start = timer()
|
||||||
curr_time = timer()
|
# curr_time = timer()
|
||||||
|
|
||||||
my_id = str(self.libp2p_node.get_id())
|
my_id = str(self.libp2p_node.get_id())
|
||||||
msg_contents = "transaction"
|
msg_contents = "transaction"
|
||||||
|
|
||||||
num_sent = 0
|
num_sent_in_each_topic = {}
|
||||||
num_fully_ack = 0
|
num_acks_in_each_topic = {}
|
||||||
while (curr_time - start) < time_length:
|
for topic in topics:
|
||||||
# Send message (NOTE THIS IS JUST ONE TOPIC)
|
num_sent_in_each_topic[topic] = 0
|
||||||
packet = generate_RPC_packet(my_id, topics, msg_contents, self.next_msg_id_func())
|
num_acks_in_each_topic[topic] = 0
|
||||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
|
||||||
num_sent += 1
|
self.topic_ack_queues = {}
|
||||||
# Wait for acks
|
for topic in topics:
|
||||||
num_acks = 0
|
self.topic_ack_queues[topic] = asyncio.Queue()
|
||||||
while num_acks < num_receivers:
|
|
||||||
await self.ack_queue.get()
|
async def handle_ack_queues():
|
||||||
num_acks += 1
|
start = timer()
|
||||||
num_fully_ack += 1
|
|
||||||
curr_time = timer()
|
curr_time = timer()
|
||||||
|
while (curr_time - start) < time_length:
|
||||||
|
# print("GETTING ACK")
|
||||||
|
ack = await self.ack_queue.get()
|
||||||
|
# print("DECODING ACK")
|
||||||
|
decoded_ack = ack.decode()
|
||||||
|
|
||||||
|
# print("ACK REC IN HANDLE")
|
||||||
|
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
||||||
|
# print("ADDING TO TOPIC ACK QUEUE " + str(topic))
|
||||||
|
curr_time = timer()
|
||||||
|
print("EXI HANDLE ACK QUES")
|
||||||
|
self.test_being_performed = False
|
||||||
|
|
||||||
|
gathered = None
|
||||||
|
|
||||||
|
async def perform_test_on_topic(topic):
|
||||||
|
print("Performing test on topic " + topic)
|
||||||
|
start = timer()
|
||||||
|
curr_time = timer()
|
||||||
|
# Perform test while time is not up here AND
|
||||||
|
# while time is not up in handle_ack_queues, which is checked with the
|
||||||
|
# self.test_being_performed boolean
|
||||||
|
while (curr_time - start) < time_length and self.test_being_performed:
|
||||||
|
# Send message (NOTE THIS IS JUST ONE TOPIC)
|
||||||
|
packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func())
|
||||||
|
# print("PUBLISHED")
|
||||||
|
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||||
|
num_sent_in_each_topic[topic] += 1
|
||||||
|
|
||||||
|
# Wait for acks
|
||||||
|
num_acks = 0
|
||||||
|
# print("PRE WHILE")
|
||||||
|
|
||||||
|
# While number of acks is below threshold AND
|
||||||
|
# while time is not up in handle_ack_queues, which is checked with the
|
||||||
|
# self.test_being_performed boolean
|
||||||
|
# TODO: Check safety of this. Does this make sense in the asyncio
|
||||||
|
# event-driven setting?
|
||||||
|
while num_acks < num_receivers_in_each_topic[topic] and self.test_being_performed:
|
||||||
|
# print("IN WHILE")
|
||||||
|
await self.topic_ack_queues[topic].get()
|
||||||
|
# print("GOT")
|
||||||
|
num_acks += 1
|
||||||
|
num_acks_in_each_topic[topic] += 1
|
||||||
|
curr_time = timer()
|
||||||
|
self.test_being_performed = False
|
||||||
|
print("Time passed")
|
||||||
|
print("CANCELING")
|
||||||
|
# await cleanup()
|
||||||
|
|
||||||
|
tasks = [asyncio.ensure_future(handle_ack_queues())]
|
||||||
|
|
||||||
|
for topic in topics:
|
||||||
|
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
|
||||||
|
|
||||||
|
gathered = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
# while (curr_time - start) < time_length:
|
||||||
|
# # Send message (NOTE THIS IS JUST ONE TOPIC)
|
||||||
|
# packet = generate_RPC_packet(my_id, topics, msg_contents, self.next_msg_id_func())
|
||||||
|
# await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||||
|
# num_sent += 1
|
||||||
|
# # Wait for acks
|
||||||
|
# num_acks = 0
|
||||||
|
# while num_acks < num_receivers:
|
||||||
|
# await self.ack_queue.get()
|
||||||
|
# num_acks += 1
|
||||||
|
# num_fully_ack += 1
|
||||||
|
# curr_time = timer()
|
||||||
|
|
||||||
# Do something interesting with test results
|
# Do something interesting with test results
|
||||||
print("Num sent: " + str(num_sent))
|
print("Num sent: " + str(num_sent_in_each_topic))
|
||||||
print("Num fully ack: " + str(num_fully_ack))
|
print("Num fully ack: " + str(num_acks_in_each_topic))
|
||||||
|
|
||||||
# End test
|
# End test
|
||||||
self.test_being_performed = False
|
self.test_being_performed = False
|
||||||
|
|
Loading…
Reference in New Issue
Block a user