Implement end-to-end perform testing
This commit is contained in:
parent
66427cd6f5
commit
25def0c7d6
74
examples/sharding/driver.py
Normal file
74
examples/sharding/driver.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
import asyncio
|
||||
from sender import SenderNode
|
||||
from receiver import ReceiverNode
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from tests.utils import cleanup
|
||||
|
||||
|
||||
async def create_receivers(num_receivers):
|
||||
receivers = []
|
||||
for i in range(num_receivers):
|
||||
receivers.append(await ReceiverNode.create())
|
||||
return receivers
|
||||
|
||||
async def connect(node1, node2):
|
||||
# node1 connects to node2
|
||||
addr = node2.get_addrs()[0]
|
||||
info = info_from_p2p_addr(addr)
|
||||
await node1.connect(info)
|
||||
|
||||
async def create_topology(adjacency_map, sender, receivers):
|
||||
# Create network
|
||||
|
||||
# Connect senders to receivers
|
||||
for target_num in adjacency_map["sender"]:
|
||||
await connect(sender.libp2p_node, receivers[target_num].libp2p_node)
|
||||
|
||||
# Connect receivers to other receivers
|
||||
for source_num in adjacency_map:
|
||||
if source_num != "sender":
|
||||
target_nums = adjacency_map[source_num]
|
||||
for target_num in target_nums:
|
||||
await connect(receivers[source_num].libp2p_node, \
|
||||
receivers[target_num].libp2p_node)
|
||||
|
||||
async def main():
|
||||
# Create sender
|
||||
print("Sender created")
|
||||
sender = await SenderNode.create()
|
||||
|
||||
# Create receivers
|
||||
print("Receivers created")
|
||||
# sender_id = sender.libp2p_node.get_id()
|
||||
receivers = await create_receivers(2)
|
||||
|
||||
# Define topology
|
||||
topology = {
|
||||
"sender": [0],
|
||||
0: [1]
|
||||
}
|
||||
|
||||
# Create network topology
|
||||
await create_topology(topology, sender, receivers)
|
||||
print("Topology created")
|
||||
|
||||
# Perform throughput test
|
||||
# 1) Start receivers
|
||||
sender_info = info_from_p2p_addr(sender.libp2p_node.get_addrs()[0])
|
||||
for receiver in receivers:
|
||||
print("Starting receiving")
|
||||
asyncio.ensure_future(receiver.start_receiving(sender_info))
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# 2) Start sending messages and perform throughput test
|
||||
print("Performing test")
|
||||
await sender.perform_test(2, 5)
|
||||
|
||||
await cleanup()
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
|
@ -1,11 +1,14 @@
|
|||
import multiaddr
|
||||
import asyncio
|
||||
|
||||
from libp2p import new_node
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from libp2p.pubsub.pubsub import Pubsub
|
||||
from libp2p.pubsub.floodsub import FloodSub
|
||||
|
||||
from tests.pubsub.utils import message_id_generator
|
||||
|
||||
TOPIC = "eth"
|
||||
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||
|
||||
class ReceiverNode():
|
||||
"""
|
||||
|
@ -16,10 +19,9 @@ class ReceiverNode():
|
|||
|
||||
def __init__(self):
|
||||
self.next_msg_id_func = message_id_generator(0)
|
||||
self.node_id = str(uuid.uuid1())
|
||||
|
||||
@classmethod
|
||||
async def create(cls, sender_node_id):
|
||||
async def create(cls):
|
||||
"""
|
||||
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
|
||||
instance to this new node
|
||||
|
@ -27,7 +29,7 @@ class ReceiverNode():
|
|||
We use create as this serves as a factory function and allows us
|
||||
to use async await, unlike the init function
|
||||
"""
|
||||
self = DummyAccountNode()
|
||||
self = ReceiverNode()
|
||||
|
||||
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
|
@ -37,13 +39,22 @@ class ReceiverNode():
|
|||
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||
|
||||
self.pubsub_messages = await pubsub.subscribe(TOPIC)
|
||||
ack_stream = await node.new_stream(sender_node_id, ["/ack/1.0.0"])
|
||||
self.pubsub_messages = await self.pubsub.subscribe(TOPIC)
|
||||
|
||||
return self
|
||||
|
||||
async def start_receiving():
|
||||
async def wait_for_end(self, ack_stream):
|
||||
msg = (await ack_stream.read()).decode()
|
||||
if msg == "end":
|
||||
self.should_listen = False
|
||||
|
||||
async def start_receiving(self, sender_node_info):
|
||||
await self.libp2p_node.connect(sender_node_info)
|
||||
ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, ["/ack/1.0.0"])
|
||||
asyncio.ensure_future(self.wait_for_end(ack_stream))
|
||||
|
||||
# TODO: get peer id
|
||||
while True:
|
||||
msg = await pubsub_messages.get()
|
||||
await ack_stream.write("ack")
|
||||
self.should_listen = True
|
||||
while self.should_listen:
|
||||
msg = await self.pubsub_messages.get()
|
||||
await ack_stream.write("ack".encode())
|
||||
|
|
|
@ -10,8 +10,6 @@ from libp2p.pubsub.floodsub import FloodSub
|
|||
|
||||
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||
TOPIC = "eth"
|
||||
NUM_RECEIVERS = 10
|
||||
TIME_LENGTH = 1000 # seconds, double check
|
||||
|
||||
class SenderNode():
|
||||
"""
|
||||
|
@ -22,7 +20,6 @@ class SenderNode():
|
|||
|
||||
def __init__(self):
|
||||
self.next_msg_id_func = message_id_generator(0)
|
||||
self.node_id = str(uuid.uuid1())
|
||||
self.ack_queue = asyncio.Queue()
|
||||
|
||||
@classmethod
|
||||
|
@ -43,21 +40,27 @@ class SenderNode():
|
|||
|
||||
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||
await pubsub.subscribe(TOPIC)
|
||||
await self.pubsub.subscribe(TOPIC)
|
||||
|
||||
self.test_being_performed = True
|
||||
|
||||
this = self
|
||||
|
||||
async def ack_stream_handler(stream):
|
||||
while True:
|
||||
while self.test_being_performed:
|
||||
ack = await stream.read()
|
||||
|
||||
if ack is not None:
|
||||
await ack_queue.put(ack)
|
||||
await self.ack_queue.put(ack)
|
||||
|
||||
# Reached once test_being_performed is False
|
||||
# Notify receivers test is over
|
||||
await stream.write("end".encode())
|
||||
# Set handler for acks
|
||||
self.libp2p_node.set_stream_handler("/ack/1", ack_stream_handler)
|
||||
self.libp2p_node.set_stream_handler("/ack/1.0.0", ack_stream_handler)
|
||||
|
||||
return self
|
||||
|
||||
def perform_test():
|
||||
async def perform_test(self, num_receivers, time_length):
|
||||
# Time and loop
|
||||
start = timer()
|
||||
curr_time = timer()
|
||||
|
@ -65,15 +68,23 @@ class SenderNode():
|
|||
my_id = str(self.libp2p_node.get_id())
|
||||
msg_contents = "transaction"
|
||||
|
||||
while (curr_time - start) < TIME_LENGTH:
|
||||
num_sent = 0
|
||||
num_fully_ack = 0
|
||||
while (curr_time - start) < time_length:
|
||||
# Send message (NOTE THIS IS JUST ONE TOPIC)
|
||||
packet = generate_RPC_packet(my_id, [TOPIC], msg_contents, self.next_msg_id_func())
|
||||
await floodsub.publish(my_id, packet.SerializeToString())
|
||||
|
||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||
num_sent += 1
|
||||
# Wait for acks
|
||||
num_acks = 0
|
||||
while num_acks < NUM_RECEIVERS:
|
||||
while num_acks < num_receivers:
|
||||
await self.ack_queue.get()
|
||||
num_acks += 1
|
||||
num_fully_ack += 1
|
||||
curr_time = timer()
|
||||
|
||||
curr_time = timer()
|
||||
print("Num sent: " + str(num_sent))
|
||||
print("Num fully ack: " + str(num_fully_ack))
|
||||
# Do something interesting with test results
|
||||
# End test
|
||||
self.test_being_performed = False
|
||||
|
|
Loading…
Reference in New Issue
Block a user