From db8eae74e5fbc7617e9b89f96cb6706c8d2e9621 Mon Sep 17 00:00:00 2001 From: Stuckinaboot Date: Tue, 9 Apr 2019 21:37:25 -0400 Subject: [PATCH] Add receiver ID handling --- examples/sharding/receiver.py | 9 ++++++--- examples/sharding/receiver_driver.py | 11 ++++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/examples/sharding/receiver.py b/examples/sharding/receiver.py index f9cdd7c..efcca4a 100644 --- a/examples/sharding/receiver.py +++ b/examples/sharding/receiver.py @@ -6,6 +6,7 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub from tests.pubsub.utils import message_id_generator +from libp2p.peer.id import ID TOPIC = "eth" SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] @@ -21,7 +22,7 @@ class ReceiverNode(): self.next_msg_id_func = message_id_generator(0) @classmethod - async def create(cls, ack_protocol, topic): + async def create(cls, node_id, transport_opt_str, ack_protocol, topic): """ Create a new ReceiverNode and attach a libp2p node, a floodsub, and a pubsub instance to this new node @@ -31,8 +32,10 @@ class ReceiverNode(): """ self = ReceiverNode() - libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + id_opt = ID("peer-" + node_id) + + libp2p_node = await new_node(id_opt=id_opt, transport_opt=[transport_opt_str]) + await libp2p_node.get_network().listen(multiaddr.Multiaddr(transport_opt_str)) self.libp2p_node = libp2p_node diff --git a/examples/sharding/receiver_driver.py b/examples/sharding/receiver_driver.py index b9ccd50..f6dfd6b 100644 --- a/examples/sharding/receiver_driver.py +++ b/examples/sharding/receiver_driver.py @@ -2,6 +2,7 @@ import asyncio import json import multiaddr import sys +from libp2p.peer.id import ID from sender import SenderNode from receiver import ReceiverNode from libp2p.peer.peerinfo import info_from_p2p_addr @@ -16,7 +17,6 @@ python receiver_driver.py topology_config.json "my_node_id" async def connect(node1, node2_addr): # node1 connects to node2 - print(node2_addr) info = info_from_p2p_addr(node2_addr) await node1.connect(info) @@ -79,16 +79,21 @@ async def main(): # Create Receiver Node print("Creating receiver") - receiver_node = await ReceiverNode.create(ACK_PROTOCOL, my_topic) + my_transport_opt_str = topology_config_dict["node_id_map"][my_node_id] + receiver_node = await ReceiverNode.create(my_node_id, my_transport_opt_str, ACK_PROTOCOL, my_topic) print("Receiver created") + # TODO: sleep for like 15 seconds to let other nodes start up # Connect receiver node to all other relevant receiver nodes for neighbor in topology_config_dict["topology"][my_node_id]: neighbor_addr_str = topology_config_dict["node_id_map"][neighbor] + # Add p2p part + neighbor_addr_str += "/p2p/" + ID("peer-" + neighbor).pretty() + # Convert neighbor_addr_str to multiaddr neighbor_addr = multiaddr.Multiaddr(neighbor_addr_str) - await connect(receiver_node, neighbor_addr) + await connect(receiver_node.libp2p_node, neighbor_addr) # Get sender info as multiaddr sender_addr_str = topology_config_dict["node_id_map"]["sender"]