diff --git a/host/basic_host.py b/host/basic_host.py index 09789f6..5e2bec9 100644 --- a/host/basic_host.py +++ b/host/basic_host.py @@ -55,6 +55,7 @@ class BasicHost(IHost): :param proto_id: protocol id that stream runs on :return: true if successful """ + # TODO: host should return a mux stream not a raw stream stream = self.network.new_stream(peer_id) stream.set_protocol(protocol_id) return stream diff --git a/libp2p/libp2p.py b/libp2p/libp2p.py index 8e4e58d..f14e95e 100644 --- a/libp2p/libp2p.py +++ b/libp2p/libp2p.py @@ -1,13 +1,13 @@ -from ..peer.peerstore import PeerStore -from ..network.swarm import Swarm -from ..host.basic_host import BasicHost -from ..transport.upgrader import TransportUpgrader +from peer.peerstore import PeerStore +from network.swarm import Swarm +from host.basic_host import BasicHost +from transport.upgrader import TransportUpgrader from Crypto.PublicKey import RSA class Libp2p(object): def __init__(self, idOpt, \ - transportOpt = ["/ip4/127.0.0.1/tcp/10000"], \ + transportOpt = ["/ip4/127.0.0.1/tcp/10000/app/1.0.0"], \ muxerOpt = ["mplex/6.7.0"], \ secOpt = ["secio"], \ peerstore = PeerStore()): @@ -15,7 +15,6 @@ class Libp2p(object): if idOpt: self.idOpt = idOpt else: - # TODO generate RSA public key pair new_key = RSA.generate(2048, e=65537) self.idOpt = new_key.publickey().exportKey("PEM") self.private_key = new_key.exportKey("PEM") @@ -29,7 +28,7 @@ class Libp2p(object): swarm = Swarm(self.idOpt, self.peerstore) host = BasicHost(swarm) - upgrader = TransportUpgrader(self.secOpt, self.transportOpt) + upgrader = TransportUpgrader(self.secOpt, self.muxerOpt) # TODO transport upgrade diff --git a/network/network_interface.py b/network/network_interface.py index 17a859b..7ba7641 100644 --- a/network/network_interface.py +++ b/network/network_interface.py @@ -3,8 +3,9 @@ from abc import ABC, abstractmethod class INetwork(ABC): @abstractmethod - def set_stream_handler(self, stream_handler): + def set_stream_handler(self, protocol_id, stream_handler): """ + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance :return: true if successful """ diff --git a/network/swarm.py b/network/swarm.py index 6dcdcb4..9e060f9 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -1,6 +1,8 @@ import uuid from .network_interface import INetwork from .stream.net_stream import NetStream +from .multiaddr import MultiAddr +from .connection.raw_connection import RawConnection class Swarm(INetwork): @@ -10,13 +12,15 @@ class Swarm(INetwork): self.upgrader = upgrader self.connections = dict() self.listeners = dict() + self.stream_handlers = dict() - def set_stream_handler(self, stream_handler): + def set_stream_handler(self, protocol_id, stream_handler): """ + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance :return: true if successful """ - pass + self.stream_handlers[protocol_id] = stream_handler def new_stream(self, peer_id, protocol_id): """ @@ -64,23 +68,47 @@ class Swarm(INetwork): :return: true if at least one success """ - # Create a closure C that takes in a multiaddr and - # returns a function object O that takes in a reader and writer. - # This function O looks up the stream handler - # for the given protocol, creates the net_stream - # for the listener and calls the stream handler function - # passing in the net_stream - # For each multiaddr in args # Check if a listener for multiaddr exists already # If listener already exists, continue # Otherwise, do the following: - # Pass multiaddr into C and get back function H - # listener = transport.create_listener(H) + # Pass multiaddr into conn handler + # Have conn handler delegate to stream handler # Call listener listen with the multiaddr # Map multiaddr to listener - return True + for multiaddr_str in args: + if multiaddr_str in self.listeners: + return True + + multiaddr = MultiAddr(multiaddr_str) + multiaddr_dict = multiaddr.to_options() + + def conn_handler(reader, writer): + # Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr) + raw_conn = RawConnection(multiaddr_dict.host, multiaddr_dict.port, reader, writer) + muxed_conn = self.upgrader.upgrade_connection(raw_conn) + + # TODO: make generic protocols work + muxed_stream, stream_id, protocol_id = muxed_conn.accept_stream() + net_stream = NetStream(muxed_stream) + net_stream.set_protocol(protocol_id) + + # Give to stream handler + # TODO: handle case when stream handler is set + self.stream_handlers[protocol_id](net_stream) + + try: + # Success + listener = self.transport.create_listener(conn_handler) + listener.listen(multiaddr) + return True + except IOError: + # Failed. Continue looping. + print("Failed to connect to: " + multiaddr) + + # No multiaddr succeeded + return False def add_transport(self, transport): # TODO: Support more than one transport diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index 6214363..4357d9d 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -22,7 +22,7 @@ def test_simple_messages(): # associate the peer with local ip address (see default parameters of Libp2p()) hostA.get_peerstore().add_addr("hostB", "/ip4/127.0.0.1/tcp/10000") - stream = hostA.new_stream("hostB", "/echo/1.0.0") + stream = hostA.new_stream("hostB", "/app/1.0.0") message = "hello" stream.write(message.encode()) diff --git a/transport/tcp/tcp.py b/transport/tcp/tcp.py index 7981ede..2984d22 100644 --- a/transport/tcp/tcp.py +++ b/transport/tcp/tcp.py @@ -1,7 +1,7 @@ import asyncio from transport.transport_interface import ITransport from transport.listener_interface import IListener -from transport.connection.raw_connection import RawConnection +from network.connection.raw_connection import RawConnection class TCP(ITransport): @@ -22,17 +22,16 @@ class TCP(ITransport): :return: return True if successful """ # TODO check for exceptions - _multiaddr = multiaddr if "ipfs" in multiaddr.get_protocols(): # ipfs_id = multiaddr.get_ipfs_id() _multiaddr = multiaddr.remove_protocol("ipfs") - self.multiaddrs.append(_multiaddr) - _multiaddr_dict = _multiaddr.to_dict() - _loop = asyncio.get_event_loop() - _coroutine = asyncio.start_server(self.handler, _multiaddr_dict.host,\ - _multiaddr_dict.port, loop=_loop) - self.server = _loop.run_until_complete(_coroutine) + self.multiaddrs.append(multiaddr) + multiaddr_dict = _multiaddr.to_options() + loop = asyncio.get_event_loop() + coroutine = asyncio.start_server(self.handler, multiaddr_dict.host,\ + multiaddr_dict.port, loop=loop) + self.server = loop.run_until_complete(coroutine) return True def get_addrs(self): @@ -72,7 +71,6 @@ class TCP(ITransport): port = _multiaddr_dict.port reader, writer = open_conn(host, port) return RawConnection(host, port, reader, writer) - # TODO dial behavior not fully understood def create_listener(self, handler_function, options=None): """