diff --git a/host/basic_host.py b/host/basic_host.py index 09789f6..5e2bec9 100644 --- a/host/basic_host.py +++ b/host/basic_host.py @@ -55,6 +55,7 @@ class BasicHost(IHost): :param proto_id: protocol id that stream runs on :return: true if successful """ + # TODO: host should return a mux stream not a raw stream stream = self.network.new_stream(peer_id) stream.set_protocol(protocol_id) return stream diff --git a/libp2p/libp2p.py b/libp2p/libp2p.py index ba4cacb..a8f5d4b 100644 --- a/libp2p/libp2p.py +++ b/libp2p/libp2p.py @@ -2,7 +2,7 @@ from peer.peerstore import PeerStore from network.swarm import Swarm from host.basic_host import BasicHost from transport.upgrader import TransportUpgrader -from transport.tcp import TCP +from transport.tcp.tcp import TCP from Crypto.PublicKey import RSA class Libp2p(object): diff --git a/network/network_interface.py b/network/network_interface.py index 17a859b..7ba7641 100644 --- a/network/network_interface.py +++ b/network/network_interface.py @@ -3,8 +3,9 @@ from abc import ABC, abstractmethod class INetwork(ABC): @abstractmethod - def set_stream_handler(self, stream_handler): + def set_stream_handler(self, protocol_id, stream_handler): """ + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance :return: true if successful """ diff --git a/network/swarm.py b/network/swarm.py index ba17b9b..2e24a53 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -1,6 +1,8 @@ import uuid from .network_interface import INetwork from .stream.net_stream import NetStream +from .multiaddr import MultiAddr +from .connection.raw_connection import RawConnection class Swarm(INetwork): @@ -10,13 +12,15 @@ class Swarm(INetwork): self.upgrader = upgrader self.connections = dict() self.listeners = dict() + self.stream_handlers = dict() - def set_stream_handler(self, stream_handler): + def set_stream_handler(self, protocol_id, stream_handler): """ + :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance :return: true if successful """ - pass + self.stream_handlers[protocol_id] = stream_handler def new_stream(self, peer_id, protocol_id): """ @@ -64,23 +68,45 @@ class Swarm(INetwork): :return: true if at least one success """ - # Create a closure C that takes in a multiaddr and - # returns a function object O that takes in a reader and writer. - # This function O looks up the stream handler - # for the given protocol, creates the net_stream - # for the listener and calls the stream handler function - # passing in the net_stream - # For each multiaddr in args # Check if a listener for multiaddr exists already # If listener already exists, continue # Otherwise, do the following: - # Pass multiaddr into C and get back function H - # listener = transport.create_listener(H) + # Pass multiaddr into conn handler + # Have conn handler delegate to stream handler # Call listener listen with the multiaddr # Map multiaddr to listener + for multiaddr_str in args: + if multiaddr_str in self.listeners: + return True - return True + multiaddr = MultiAddr(multiaddr_str) + multiaddr_dict = multiaddr.to_options() + + def conn_handler(reader, writer): + # Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr) + raw_conn = RawConnection(multiaddr_dict.host, multiaddr_dict.port, reader, writer) + muxed_conn = self.upgrader.upgrade_connection(raw_conn, False) + + muxed_stream, stream_id, protocol_id = muxed_conn.accept_stream() + net_stream = NetStream(muxed_stream) + net_stream.set_protocol(protocol_id) + + # Give to stream handler + # TODO: handle case when stream handler is set + self.stream_handlers[protocol_id](net_stream) + + try: + # Success + listener = self.transport.create_listener(conn_handler) + listener.listen(multiaddr) + return True + except IOError: + # Failed. Continue looping. + print("Failed to connect to: " + multiaddr) + + # No multiaddr succeeded + return False def add_transport(self, transport): # TODO: Support more than one transport diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index 6214363..4357d9d 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -22,7 +22,7 @@ def test_simple_messages(): # associate the peer with local ip address (see default parameters of Libp2p()) hostA.get_peerstore().add_addr("hostB", "/ip4/127.0.0.1/tcp/10000") - stream = hostA.new_stream("hostB", "/echo/1.0.0") + stream = hostA.new_stream("hostB", "/app/1.0.0") message = "hello" stream.write(message.encode()) diff --git a/transport/tcp/tcp.py b/transport/tcp/tcp.py index 7981ede..2984d22 100644 --- a/transport/tcp/tcp.py +++ b/transport/tcp/tcp.py @@ -1,7 +1,7 @@ import asyncio from transport.transport_interface import ITransport from transport.listener_interface import IListener -from transport.connection.raw_connection import RawConnection +from network.connection.raw_connection import RawConnection class TCP(ITransport): @@ -22,17 +22,16 @@ class TCP(ITransport): :return: return True if successful """ # TODO check for exceptions - _multiaddr = multiaddr if "ipfs" in multiaddr.get_protocols(): # ipfs_id = multiaddr.get_ipfs_id() _multiaddr = multiaddr.remove_protocol("ipfs") - self.multiaddrs.append(_multiaddr) - _multiaddr_dict = _multiaddr.to_dict() - _loop = asyncio.get_event_loop() - _coroutine = asyncio.start_server(self.handler, _multiaddr_dict.host,\ - _multiaddr_dict.port, loop=_loop) - self.server = _loop.run_until_complete(_coroutine) + self.multiaddrs.append(multiaddr) + multiaddr_dict = _multiaddr.to_options() + loop = asyncio.get_event_loop() + coroutine = asyncio.start_server(self.handler, multiaddr_dict.host,\ + multiaddr_dict.port, loop=loop) + self.server = loop.run_until_complete(coroutine) return True def get_addrs(self): @@ -72,7 +71,6 @@ class TCP(ITransport): port = _multiaddr_dict.port reader, writer = open_conn(host, port) return RawConnection(host, port, reader, writer) - # TODO dial behavior not fully understood def create_listener(self, handler_function, options=None): """ diff --git a/transport/upgrader.py b/transport/upgrader.py index 05e2218..6cd5b2e 100644 --- a/transport/upgrader.py +++ b/transport/upgrader.py @@ -24,5 +24,4 @@ class TransportUpgrader(object): # Default to mplex # TODO do exchange to determine multiplexer - return MuxedConn(conn, initiator)