py-libp2p/examples/sharding/sender.py

160 lines
5.7 KiB
Python
Raw Normal View History

2019-04-07 02:23:52 +08:00
import asyncio
import multiaddr
from timeit import default_timer as timer
2019-04-07 11:09:26 +08:00
from tests.utils import cleanup
2019-04-07 02:23:52 +08:00
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
2019-04-07 02:23:52 +08:00
TOPIC = "eth"
class SenderNode():
"""
2019-04-08 09:29:45 +08:00
SenderNode which pings ReceiverNodes continuously to perform benchmarking
"""
def __init__(self):
self.next_msg_id_func = message_id_generator(0)
self.ack_queue = asyncio.Queue()
2019-04-07 02:23:52 +08:00
@classmethod
async def create(cls, ack_protocol):
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
2019-04-07 02:23:52 +08:00
We use create as this serves as a factory function and allows us
to use async await, unlike the init function
"""
self = SenderNode()
2019-04-07 02:23:52 +08:00
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
2019-04-07 02:23:52 +08:00
self.libp2p_node = libp2p_node
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
2019-04-07 05:16:37 +08:00
await self.pubsub.subscribe(TOPIC)
self.test_being_performed = True
2019-04-08 06:40:01 +08:00
self.all_streams = []
async def ack_stream_handler(stream):
2019-04-08 06:40:01 +08:00
self.all_streams.append(stream)
2019-04-07 05:16:37 +08:00
while self.test_being_performed:
2019-04-08 02:25:59 +08:00
# This Ack is what times out when multi-topic tests finish
ack = await stream.read()
if ack is not None:
2019-04-07 05:16:37 +08:00
await self.ack_queue.put(ack)
2019-04-08 04:36:05 +08:00
else:
2019-04-08 06:40:01 +08:00
print("FUCK")
2019-04-08 04:36:05 +08:00
break
2019-04-07 05:16:37 +08:00
# Reached once test_being_performed is False
# Notify receivers test is over
2019-04-08 03:24:56 +08:00
print("Writing end")
await stream.write("end".encode())
2019-04-08 02:25:59 +08:00
# Set handler for acks
self.ack_protocol = ack_protocol
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
2019-04-07 02:23:52 +08:00
return self
2019-04-07 02:23:52 +08:00
2019-04-07 11:09:26 +08:00
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
# Time and loop
2019-04-07 02:23:52 +08:00
my_id = str(self.libp2p_node.get_id())
2019-04-07 02:23:52 +08:00
msg_contents = "transaction"
2019-04-07 11:09:26 +08:00
num_sent_in_each_topic = {}
num_acks_in_each_topic = {}
for topic in topics:
num_sent_in_each_topic[topic] = 0
num_acks_in_each_topic[topic] = 0
self.topic_ack_queues = {}
for topic in topics:
self.topic_ack_queues[topic] = asyncio.Queue()
2019-04-08 03:24:56 +08:00
completed_topics_count = 0
num_topics = len(topics)
2019-04-07 11:09:26 +08:00
async def handle_ack_queues():
start = timer()
curr_time = timer()
2019-04-08 03:24:56 +08:00
print("Handling ack queues")
nonlocal completed_topics_count, num_topics
while completed_topics_count < num_topics:
2019-04-07 11:09:26 +08:00
ack = await self.ack_queue.get()
decoded_ack = ack.decode()
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
2019-04-08 03:24:56 +08:00
curr_time = timer()
2019-04-07 11:09:26 +08:00
2019-04-08 02:25:59 +08:00
async def end_all_async():
2019-04-08 06:40:01 +08:00
# Add None to ack_queue to break out of the loop.
# Note: This is necessary given the current code or the code will never
# terminate
2019-04-08 02:25:59 +08:00
await self.ack_queue.put(None)
2019-04-08 06:40:01 +08:00
# This is not necessary but is useful for turning off the receivers gracefully
for stream in self.all_streams:
await stream.write("end".encode())
2019-04-08 02:25:59 +08:00
2019-04-07 11:09:26 +08:00
async def perform_test_on_topic(topic):
print("Performing test on topic " + topic)
start = timer()
2019-04-07 05:16:37 +08:00
curr_time = timer()
2019-04-08 06:40:01 +08:00
2019-04-07 11:09:26 +08:00
# Perform test while time is not up here AND
# while time is not up in handle_ack_queues, which is checked with the
# self.test_being_performed boolean
2019-04-08 08:56:59 +08:00
while (curr_time - start) < time_length:
2019-04-08 06:40:01 +08:00
# Send message on single topic
2019-04-07 11:09:26 +08:00
packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func())
2019-04-08 03:24:56 +08:00
2019-04-07 11:09:26 +08:00
await self.floodsub.publish(my_id, packet.SerializeToString())
num_sent_in_each_topic[topic] += 1
# Wait for acks
num_acks = 0
# While number of acks is below threshold AND
# while time is not up in handle_ack_queues, which is checked with the
# self.test_being_performed boolean
# TODO: Check safety of this. Does this make sense in the asyncio
# event-driven setting?
2019-04-08 08:56:59 +08:00
while num_acks < num_receivers_in_each_topic[topic]:
2019-04-08 02:25:59 +08:00
ack = await self.topic_ack_queues[topic].get()
2019-04-07 11:09:26 +08:00
num_acks += 1
num_acks_in_each_topic[topic] += 1
curr_time = timer()
2019-04-08 02:25:59 +08:00
2019-04-08 03:24:56 +08:00
nonlocal completed_topics_count, num_topics
print("Test completed " + topic)
completed_topics_count += 1
if completed_topics_count == num_topics:
2019-04-08 06:40:01 +08:00
self.test_being_performed = False
2019-04-08 03:24:56 +08:00
print("End all async")
await end_all_async()
2019-04-07 11:09:26 +08:00
tasks = [asyncio.ensure_future(handle_ack_queues())]
for topic in topics:
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
gathered = await asyncio.gather(*tasks, return_exceptions=True)
2019-04-07 05:16:37 +08:00
# Do something interesting with test results
2019-04-07 11:09:26 +08:00
print("Num sent: " + str(num_sent_in_each_topic))
print("Num fully ack: " + str(num_acks_in_each_topic))
2019-04-07 05:16:37 +08:00
# End test
self.test_being_performed = False