diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 2e1cf88..0c660e7 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -10,6 +10,7 @@ from .kademlia.routed_host import RoutedHost from .transport.upgrader import TransportUpgrader from .transport.tcp.tcp import TCP from .kademlia.network import KademliaServer +from libp2p.security.insecure_security import InsecureTransport async def cleanup_done_tasks(): @@ -71,7 +72,9 @@ def initialize_default_swarm( transport = [multiaddr.Multiaddr(t) for t in transport_opt] # TODO wire muxer up with swarm # muxer = muxer_opt or ["mplex/6.7.0"] - sec = sec_opt or ["secio"] + + # Use passed in security option or the default insecure option + sec = sec_opt or {"/insecure/1.0.0": InsecureTransport("insecure")} peerstore = peerstore_opt or PeerStore() upgrader = TransportUpgrader(sec, transport) swarm_opt = Swarm(id_opt, peerstore, upgrader) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 1700469..66ef1f0 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -1,3 +1,4 @@ +import asyncio from .raw_connection_interface import IRawConnection @@ -12,6 +13,19 @@ class RawConnection(IRawConnection): self._next_id = 0 if initiator else 1 self.initiator = initiator + async def write(self, data): + self.writer.write(data) + self.writer.write("\n".encode()) + await self.writer.drain() + + async def read(self): + line = await self.reader.readline() + adjusted_line = line.decode().rstrip('\n') + + # TODO: figure out a way to remove \n without going back and forth with + # encoding and decoding + return adjusted_line.encode() + def close(self): self.writer.close() diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index b30567b..644554c 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -69,8 +69,10 @@ class Swarm(INetwork): # Transport dials peer (gets back a raw conn) raw_conn = await self.transport.dial(multiaddr, self.self_id) - # Use upgrader to upgrade raw conn to muxed conn - muxed_conn = self.upgrader.upgrade_connection(raw_conn, \ + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure + # the conn and then mux the conn + secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) + muxed_conn = self.upgrader.upgrade_connection(secured_conn, \ self.generic_protocol_handler, peer_id) # Store muxed connection in connections @@ -145,7 +147,11 @@ class Swarm(INetwork): # to appropriate stream handler (using multiaddr) raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'), multiaddr.value_for_protocol('tcp'), reader, writer, False) - muxed_conn = self.upgrader.upgrade_connection(raw_conn, \ + + # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure + # the conn and then mux the conn + secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, False) + muxed_conn = self.upgrader.upgrade_connection(secured_conn, \ self.generic_protocol_handler, peer_id) # Store muxed_conn with peer id diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index dec5493..b93de4f 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -45,7 +45,6 @@ class MultiselectClient(IMultiselectClient): :param stream: stream to communicate with multiselect over :return: selected protocol """ - # Create a communicator to handle all communication across the stream communicator = MultiselectCommunicator(stream) diff --git a/libp2p/security/insecure_security.py b/libp2p/security/insecure_security.py new file mode 100644 index 0000000..0c5debe --- /dev/null +++ b/libp2p/security/insecure_security.py @@ -0,0 +1,44 @@ +from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.security.secure_conn_interface import ISecureConn + +class InsecureTransport(ISecureTransport): + + def __init__(self, transport_id): + self.transport_id = transport_id + + async def secure_inbound(self, conn): + """ + Secure the connection, either locally or by communicating with opposing node via conn, + for an inbound connection (i.e. we are not the initiator) + :return: secure connection object (that implements secure_conn_interface) + """ + insecure_conn = InsecureConn(conn, self.transport_id) + return insecure_conn + + async def secure_outbound(self, conn, peer_id): + """ + Secure the connection, either locally or by communicating with opposing node via conn, + for an inbound connection (i.e. we are the initiator) + :return: secure connection object (that implements secure_conn_interface) + """ + insecure_conn = InsecureConn(conn, self.transport_id) + return insecure_conn + +class InsecureConn(ISecureConn): + + def __init__(self, conn, conn_id): + self.conn = conn + self.details = {} + self.details["id"] = conn_id + + def get_conn(self): + """ + :return: connection object that has been made secure + """ + return self.conn + + def get_security_details(self): + """ + :return: map containing details about the connections security + """ + return self.details diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 6fe4539..81db4cf 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -12,7 +12,7 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa """ class SecurityMultistream(ABC): - def __init__(): + def __init__(self): # Map protocol to secure transport self.transports = {} diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 0d587b5..e660a52 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -11,7 +11,7 @@ class Mplex(IMuxedConn): reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ - def __init__(self, conn, generic_protocol_handler, peer_id): + def __init__(self, secured_conn, generic_protocol_handler, peer_id): """ create a new muxed connection :param conn: an instance of raw connection @@ -19,10 +19,11 @@ class Mplex(IMuxedConn): for new muxed streams :param peer_id: peer_id of peer the connection is to """ - super(Mplex, self).__init__(conn, generic_protocol_handler, peer_id) + super(Mplex, self).__init__(secured_conn, generic_protocol_handler, peer_id) - self.raw_conn = conn - self.initiator = conn.initiator + self.secured_conn = secured_conn + self.raw_conn = secured_conn.get_conn() + self.initiator = self.raw_conn.initiator # Store generic protocol handler self.generic_protocol_handler = generic_protocol_handler diff --git a/libp2p/stream_muxer/muxed_connection_interface.py b/libp2p/stream_muxer/muxed_connection_interface.py index 541fd64..0faf770 100644 --- a/libp2p/stream_muxer/muxed_connection_interface.py +++ b/libp2p/stream_muxer/muxed_connection_interface.py @@ -10,7 +10,7 @@ class IMuxedConn(ABC): def __init__(self, conn, generic_protocol_handler, peer_id): """ create a new muxed connection - :param conn: an instance of raw connection + :param conn: an instance of secured connection :param generic_protocol_handler: generic protocol handler for new muxed streams :param peer_id: peer_id of peer the connection is to diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index 21c0574..f6167ce 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -82,9 +82,10 @@ class TCP(ITransport): await writer.drain() # Await ack for peer id - ack = (await reader.read(1024)).decode() + expected_ack_str = "received peer id" + ack = (await reader.read(len(expected_ack_str))).decode() - if ack != "received peer id": + if ack != expected_ack_str: raise Exception("Receiver did not receive peer id") return RawConnection(host, port, reader, writer, True) diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index f84fb40..68f3762 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,11 +1,17 @@ from libp2p.stream_muxer.mplex.mplex import Mplex +from libp2p.security.security_multistream import SecurityMultistream class TransportUpgrader: # pylint: disable=no-self-use def __init__(self, secOpt, muxerOpt): - self.sec = secOpt + # Store security option + self.security_multistream = SecurityMultistream() + for key in secOpt: + self.security_multistream.add_transport(key, secOpt[key]) + + # Store muxer option self.muxer = muxerOpt def upgrade_listener(self, transport, listeners): @@ -13,12 +19,14 @@ class TransportUpgrader: Upgrade multiaddr listeners to libp2p-transport listeners """ - def upgrade_security(self, conn, peer_id): + async def upgrade_security(self, raw_conn, peer_id, initiator): """ Upgrade conn to be a secured connection """ - # TODO: Do exchange to determine security module - pass + if initiator: + return await self.security_multistream.secure_outbound(raw_conn, peer_id) + else: + return await self.security_multistream.secure_inbound(raw_conn) def upgrade_connection(self, conn, generic_protocol_handler, peer_id): """ diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index 25176f3..8b23e16 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -1,12 +1,28 @@ +import asyncio +import multiaddr import pytest +from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr from tests.utils import cleanup, set_up_nodes_by_transport_opt from libp2p.security.security_multistream import SecurityMultistream -from tests.security.insecure_multistream import InsecureConn, InsecureTransport +from libp2p.security.insecure_security import InsecureConn, InsecureTransport # TODO: Add tests for multiple streams being opened on different # protocols through the same connection +def peer_id_for_node(node): + addr = node.get_addrs()[0] + info = info_from_p2p_addr(addr) + return info.peer_id + +async def connect(node1, node2): + """ + Connect node1 to node2 + """ + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) async def perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator): @@ -15,74 +31,53 @@ async def perform_simple_test(assertion_func, transports_for_initiator, transpor # TODO: implement -- note we need to introduce the notion of communicating over a raw connection # for testing, we do NOT want to communicate over a stream so we can't just create two nodes # and use their conn because our mplex will internally relay messages to a stream - conn = [] + sec_opt1 = dict((str(i), transport) for i, transport in enumerate(transports_for_initiator)) + sec_opt2 = dict((str(i), transport) for i, transport in enumerate(transports_for_noninitiator)) + + node1 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt1) + node2 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt2) + + await node1.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node2.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + await connect(node1, node2) # Fill initiator - sec_multi_initiator = SecurityMultistream() - for i, transport in enumerate(transports_for_initiator): - sec_multi_initiator.add_transport(str(i), transport) + # sec_multi_initiator = SecurityMultistream() + # for i, transport in enumerate(transports_for_initiator): + # sec_multi_initiator.add_transport(str(i), transport) - # Fill non-initiator - sec_multi_noninitiator = SecurityMultistream() - for i, transport in enumerate(transports_for_noninitiator): - sec_multi_noninitiator.add_transport(str(i), transport) + # # Fill non-initiator + # sec_multi_noninitiator = SecurityMultistream() + # for i, transport in enumerate(transports_for_noninitiator): + # sec_multi_noninitiator.add_transport(str(i), transport) - # Perform negotiation - tasks = [] - tasks.append(asyncio.ensure_future(sec_multi_initiator.secure_inbound(conn))) - tasks.append(asyncio.ensure_future(sec_multi_noninitiator.secure_inbound(conn))) - secured_conns = await asyncio.gather(*tasks) + # # Perform negotiation + # tasks = [] + # tasks.append(asyncio.ensure_future(sec_multi_initiator.secure_inbound(conn))) + # tasks.append(asyncio.ensure_future(sec_multi_noninitiator.secure_inbound(conn))) + # mplex_conns = await asyncio.gather(*tasks) + + # Get conns + node1_conn = node1.get_network().connections[peer_id_for_node(node2)] + node2_conn = node2.get_network().connections[peer_id_for_node(node1)] # Perform assertion - for conn in secured_conns: - assertion_func(conn.get_security_details()) + assertion_func(node1_conn.secured_conn.get_security_details()) + assertion_func(node2_conn.secured_conn.get_security_details()) # Success, terminate pending tasks. await cleanup() @pytest.mark.asyncio -async def test_single_protocol_succeeds(): - expected_selected_protocol = "/echo/1.0.0" - await perform_simple_test(expected_selected_protocol, - ["/echo/1.0.0"], ["/echo/1.0.0"]) +async def test_single_security_transport_succeeds(): + transports_for_initiator = [InsecureTransport("foo")] + transports_for_noninitiator = [InsecureTransport("foo")] + def assertion_func(details): + assert details["id"] == "foo" -@pytest.mark.asyncio -async def test_single_protocol_fails(): - with pytest.raises(MultiselectClientError): - await perform_simple_test("", ["/echo/1.0.0"], - ["/potato/1.0.0"]) + await perform_simple_test(assertion_func, + transports_for_initiator, transports_for_noninitiator) - # Cleanup not reached on error - await cleanup() - - -@pytest.mark.asyncio -async def test_multiple_protocol_first_is_valid_succeeds(): - expected_selected_protocol = "/echo/1.0.0" - protocols_for_client = ["/echo/1.0.0", "/potato/1.0.0"] - protocols_for_listener = ["/foo/1.0.0", "/echo/1.0.0"] - await perform_simple_test(expected_selected_protocol, protocols_for_client, - protocols_for_listener) - - -@pytest.mark.asyncio -async def test_multiple_protocol_second_is_valid_succeeds(): - expected_selected_protocol = "/foo/1.0.0" - protocols_for_client = ["/rock/1.0.0", "/foo/1.0.0"] - protocols_for_listener = ["/foo/1.0.0", "/echo/1.0.0"] - await perform_simple_test(expected_selected_protocol, protocols_for_client, - protocols_for_listener) - - -@pytest.mark.asyncio -async def test_multiple_protocol_fails(): - protocols_for_client = ["/rock/1.0.0", "/foo/1.0.0", "/bar/1.0.0"] - protocols_for_listener = ["/aspyn/1.0.0", "/rob/1.0.0", "/zx/1.0.0", "/alex/1.0.0"] - with pytest.raises(MultiselectClientError): - await perform_simple_test("", protocols_for_client, - protocols_for_listener) - - # Cleanup not reached on error - await cleanup()