Lay groundwork for transition to multiple topics

This commit is contained in:
Stuckinaboot 2019-04-06 17:48:06 -04:00
parent 25def0c7d6
commit 08f845cc21
3 changed files with 50 additions and 18 deletions

View File

@ -4,11 +4,13 @@ from receiver import ReceiverNode
from libp2p.peer.peerinfo import info_from_p2p_addr
from tests.utils import cleanup
ACK_PROTOCOL = "/ack/1.0.0"
async def create_receivers(num_receivers):
async def create_receivers(num_receivers, topic_map):
receivers = []
# Create receivers
for i in range(num_receivers):
receivers.append(await ReceiverNode.create())
receivers.append(await ReceiverNode.create(ACK_PROTOCOL, topic_map[i]))
return receivers
async def connect(node1, node2):
@ -32,22 +34,44 @@ async def create_topology(adjacency_map, sender, receivers):
await connect(receivers[source_num].libp2p_node, \
receivers[target_num].libp2p_node)
def get_num_receivers_in_topology(topology):
receiver_ids = []
for key in topology:
if key != "sender":
if key not in receiver_ids:
receiver_ids.append(key)
for neighbor in topology[key]:
if neighbor not in receiver_ids:
receiver_ids.append(neighbor)
return len(receiver_ids)
async def main():
# Create sender
print("Sender created")
sender = await SenderNode.create()
sender = await SenderNode.create(ACK_PROTOCOL)
# Create receivers
print("Receivers created")
# sender_id = sender.libp2p_node.get_id()
receivers = await create_receivers(2)
# Define topology
# Define connection topology
topology = {
"sender": [0],
0: [1]
0: [1, 2],
1: [3, 4],
2: [5, 6]
}
num_receivers = get_num_receivers_in_topology(topology)
# Define topic map
topic_map = {}
for num in range(num_receivers):
topic_map[num] = "1"
topics = ["1"]
receivers = await create_receivers(num_receivers, topic_map)
# Create network topology
await create_topology(topology, sender, receivers)
print("Topology created")
@ -58,11 +82,13 @@ async def main():
for receiver in receivers:
print("Starting receiving")
asyncio.ensure_future(receiver.start_receiving(sender_info))
# Allow time for start receiving to be completed
await asyncio.sleep(0.5)
# 2) Start sending messages and perform throughput test
print("Performing test")
await sender.perform_test(2, 5)
await sender.perform_test(num_receivers, topics, 1)
await cleanup()

View File

@ -21,7 +21,7 @@ class ReceiverNode():
self.next_msg_id_func = message_id_generator(0)
@classmethod
async def create(cls):
async def create(cls, ack_protocol, topic):
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
@ -39,7 +39,10 @@ class ReceiverNode():
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
self.pubsub_messages = await self.pubsub.subscribe(TOPIC)
self.pubsub_messages = await self.pubsub.subscribe(topic)
self.topic = topic
self.ack_protocol = ack_protocol
return self
@ -50,11 +53,12 @@ class ReceiverNode():
async def start_receiving(self, sender_node_info):
await self.libp2p_node.connect(sender_node_info)
ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, ["/ack/1.0.0"])
ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, [self.ack_protocol])
asyncio.ensure_future(self.wait_for_end(ack_stream))
# TODO: get peer id
self.should_listen = True
ack_msg = self.topic
encoded_ack_msg = ack_msg.encode()
while self.should_listen:
msg = await self.pubsub_messages.get()
await ack_stream.write("ack".encode())
await ack_stream.write(encoded_ack_msg)

View File

@ -23,7 +23,7 @@ class SenderNode():
self.ack_queue = asyncio.Queue()
@classmethod
async def create(cls):
async def create(cls, ack_protocol):
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
@ -56,11 +56,12 @@ class SenderNode():
# Notify receivers test is over
await stream.write("end".encode())
# Set handler for acks
self.libp2p_node.set_stream_handler("/ack/1.0.0", ack_stream_handler)
self.ack_protocol = ack_protocol
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
return self
async def perform_test(self, num_receivers, time_length):
async def perform_test(self, num_receivers, topics, time_length):
# Time and loop
start = timer()
curr_time = timer()
@ -72,7 +73,7 @@ class SenderNode():
num_fully_ack = 0
while (curr_time - start) < time_length:
# Send message (NOTE THIS IS JUST ONE TOPIC)
packet = generate_RPC_packet(my_id, [TOPIC], msg_contents, self.next_msg_id_func())
packet = generate_RPC_packet(my_id, topics, msg_contents, self.next_msg_id_func())
await self.floodsub.publish(my_id, packet.SerializeToString())
num_sent += 1
# Wait for acks
@ -83,8 +84,9 @@ class SenderNode():
num_fully_ack += 1
curr_time = timer()
# Do something interesting with test results
print("Num sent: " + str(num_sent))
print("Num fully ack: " + str(num_fully_ack))
# Do something interesting with test results
# End test
self.test_being_performed = False