diff --git a/examples/sharding/receiver_driver.py b/examples/sharding/receiver_driver.py new file mode 100644 index 0000000..b9ccd50 --- /dev/null +++ b/examples/sharding/receiver_driver.py @@ -0,0 +1,106 @@ +import asyncio +import json +import multiaddr +import sys +from sender import SenderNode +from receiver import ReceiverNode +from libp2p.peer.peerinfo import info_from_p2p_addr +from tests.utils import cleanup + +ACK_PROTOCOL = "/ack/1.0.0" + +""" +Driver is called in the following way +python receiver_driver.py topology_config.json "my_node_id" +""" + +async def connect(node1, node2_addr): + # node1 connects to node2 + print(node2_addr) + info = info_from_p2p_addr(node2_addr) + await node1.connect(info) + +async def main(): + """ + Read in topology config file, which contains + a map of node IDs to peer IDs, an adjacency list (named topology) using node IDs, + a map of node IDs to topics, and ACK_PROTOCOL + + { + "node_id_map": { + "sender": "sender multiaddr", + "some id 0": "some multiaddr", + "some id 1": "some multiaddr", + ... + }, + "topology": { + "sender": ["some id 0", "some id 1", ...], + "0": ["some id 0", "some id 1", ...], + "1": ["some id 0", "some id 1", ...], + ... + }, + "topic_map": { + "some id 0": "some topic name 1", + "some id 1": "some topic name 2", + "some id 2": "some topic name 3" + }, + "ACK_PROTOCOL": "some ack protocol" + } + + Ex. + + { + "node_id_map": { + "sender": "/ip4/127.0.0.1/tcp/8000", + "0": "/ip4/127.0.0.1/tcp/8001", + "1": "/ip4/127.0.0.1/tcp/8002", + "2": "/ip4/127.0.0.1/tcp/8003" + }, + "topology": { + "sender": ["0"], + "0": ["1", "2"], + "1": ["0"], + "2": ["0"] + }, + "topic_map": { + "0": "topic1", + "1": "topic1", + "2": "topic1" + }, + "ACK_PROTOCOL": "/ack/1.0.0" + } + + """ + topology_config_dict = json.loads(open(sys.argv[1]).read()) + my_node_id = sys.argv[2] + + # Get my topic + my_topic = topology_config_dict["topic_map"][my_node_id] + + # Create Receiver Node + print("Creating receiver") + receiver_node = await ReceiverNode.create(ACK_PROTOCOL, my_topic) + print("Receiver created") + + # Connect receiver node to all other relevant receiver nodes + for neighbor in topology_config_dict["topology"][my_node_id]: + neighbor_addr_str = topology_config_dict["node_id_map"][neighbor] + + # Convert neighbor_addr_str to multiaddr + neighbor_addr = multiaddr.Multiaddr(neighbor_addr_str) + await connect(receiver_node, neighbor_addr) + + # Get sender info as multiaddr + sender_addr_str = topology_config_dict["node_id_map"]["sender"] + + # Convert sender_info_str to multiaddr + sender_addr = multiaddr.Multiaddr(sender_addr_str) + + # Start listening for messages from sender + print("Start receiving called") + asyncio.ensure_future(receiver.start_receiving(sender_addr)) + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + loop.close()