From ba358335df89da3280a0c1c925aa26d7a514694a Mon Sep 17 00:00:00 2001 From: Robert Zajac Date: Sat, 6 Apr 2019 14:23:52 -0400 Subject: [PATCH] scaffolding sender/receiver --- examples/sharding/receiver.py | 29 +++++++++++++++ examples/sharding/sender.py | 70 +++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 examples/sharding/receiver.py create mode 100644 examples/sharding/sender.py diff --git a/examples/sharding/receiver.py b/examples/sharding/receiver.py new file mode 100644 index 0000000..797eef0 --- /dev/null +++ b/examples/sharding/receiver.py @@ -0,0 +1,29 @@ +import multiaddr + +from libp2p import new_node +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub + + +TOPIC = "eth" + + +def main(): + node = await new_node(transport_opt=["TODO"]) + await node.get_network().listen(multiaddr.Multiaddr("TODO")) + + floodsub = FloodSub(["/floodsub/1.0.0"]) + pubsub = Pubsub(node, floodsub, "unnecessary-id") + + pubsub_messages = await pubsub.subscribe(TOPIC) + + # TODO: get peer id + ack_stream = await node.new_stream("TODO", ["/ack/1"]) + + while True: + msg = await pubsub_messages.get() + await ack_stream.write("ack") + + +if __name__ == '__main__': + main() diff --git a/examples/sharding/sender.py b/examples/sharding/sender.py new file mode 100644 index 0000000..1ce7782 --- /dev/null +++ b/examples/sharding/sender.py @@ -0,0 +1,70 @@ +import asyncio +import multiaddr + +from timeit import default_timer as timer + +from tests.pubsub.utils import generate_RPC_packet, message_id_generator +from libp2p import new_node +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.floodsub import FloodSub + + +TOPIC = "eth" +NUM_RECEIVERS = 10 +TIME_LENGTH = 1000 # seconds, double check +ack_queue = asyncio.Queue() + + +def main(): + node = await new_node(transport_opt=["TODO"]) + await node.get_network().listen(multiaddr.Multiaddr("TODO")) + + floodsub = FloodSub(["/floodsub/1.0.0"]) + pubsub = Pubsub(node, floodsub, "unnecessary-id") + + next_msg_id_func = message_id_generator(0) + + """ + if is_sharded: + # Subscribe to certain topics + # TODO + pass + else: + # Subscribe to just one topic + await pubsub.subscribe(TOPIC) + """ + + await pubsub.subscribe(TOPIC) + + async def ack_stream_handler(stream): + while True: + ack = await stream.read() + + if ack is not None: + await ack_queue.put(ack) + + # Set handler for acks + node.set_stream_handler("/ack/1", ack_stream_handler) + + # Time and loop + start = timer() + curr_time = timer() + + while (curr_time - start) < TIME_LENGTH: + # Send message (NOTE THIS IS JUST ONE TOPIC) + my_id = str(node.get_id()) + msg_contents = "transaction" + packet = generate_RPC_packet(my_id, [TOPIC], msg_contents, next_msg_id_func()) + await floodsub.publish(my_id, packet.SerializeToString()) + + # Wait for acks + num_acks = 0 + while num_acks < NUM_RECEIVERS: + await ack_queue.get() + num_acks += 1 + + curr_time = timer() + + +if __name__ == '__main__': + main()