diff --git a/libp2p/transport/listener_interface.py b/libp2p/transport/listener_interface.py index 5f84ef6..8e2d86b 100644 --- a/libp2p/transport/listener_interface.py +++ b/libp2p/transport/listener_interface.py @@ -1,9 +1,15 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import List + from multiaddr import Multiaddr + class IListener(ABC): @abstractmethod - def listen(self, maddr): + async def listen(self, maddr: "Multiaddr") -> bool: """ put listener in listening mode and wait for incoming connections :param maddr: multiaddr of peer @@ -11,18 +17,16 @@ class IListener(ABC): """ @abstractmethod - def get_addrs(self): + def get_addrs(self) -> "List[Multiaddr]": """ retrieve list of addresses the listener is listening on :return: return list of addrs """ @abstractmethod - def close(self, options=None): + def close(self) -> bool: """ close the listener such that no more connections can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections :return: return True if successful """ diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index c163bea..918f712 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -1,68 +1,79 @@ import asyncio -import multiaddr +from multiaddr import Multiaddr from libp2p.network.connection.raw_connection import RawConnection -from ..listener_interface import IListener -from ..transport_interface import ITransport +from libp2p.transport.listener_interface import IListener +from libp2p.transport.transport_interface import ITransport + +from typing import TYPE_CHECKING, List, Callable + +if TYPE_CHECKING: + from socket import socket + from libp2p.transport.typing import THandler + from libp2p.network.connection.raw_connection_interface import IRawConnection + + +class TCPListener(IListener): + multiaddrs: List[Multiaddr] + server = None + handler = None + + def __init__(self, handler_function: "THandler" = None) -> None: + self.multiaddrs = [] + self.server = None + self.handler = handler_function + + async def listen(self, maddr: Multiaddr) -> bool: + """ + put listener in listening mode and wait for incoming connections + :param maddr: maddr of peer + :return: return True if successful + """ + self.server = await asyncio.start_server( + self.handler, + maddr.value_for_protocol("ip4"), + maddr.value_for_protocol("tcp"), + ) + socket = self.server.sockets[0] + self.multiaddrs.append(_multiaddr_from_socket(socket)) + + return True + + def get_addrs(self) -> List[Multiaddr]: + """ + retrieve list of addresses the listener is listening on + :return: return list of addrs + """ + # TODO check if server is listening + return self.multiaddrs + + def close(self) -> bool: + """ + close the listener such that no more connections + can be open on this transport instance + :return: return True if successful + """ + if self.server is None: + return False + self.server.close() + _loop = asyncio.get_event_loop() + _loop.run_until_complete(self.server.wait_closed()) + _loop.close() + self.server = None + return True class TCP(ITransport): - def __init__(self): - self.listener = self.Listener() + def __init__(self) -> None: + self.listener = TCPListener() - class Listener(IListener): - def __init__(self, handler_function=None): - self.multiaddrs = [] - self.server = None - self.handler = handler_function - - async def listen(self, maddr): - """ - put listener in listening mode and wait for incoming connections - :param maddr: maddr of peer - :return: return True if successful - """ - self.server = await asyncio.start_server( - self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp") - ) - socket = self.server.sockets[0] - self.multiaddrs.append(_multiaddr_from_socket(socket)) - - return True - - def get_addrs(self): - """ - retrieve list of addresses the listener is listening on - :return: return list of addrs - """ - # TODO check if server is listening - return self.multiaddrs - - def close(self, options=None): - """ - close the listener such that no more connections - can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections - :return: return True if successful - """ - if self.server is None: - return False - self.server.close() - _loop = asyncio.get_event_loop() - _loop.run_until_complete(self.server.wait_closed()) - _loop.close() - self.server = None - return True - - async def dial(self, maddr, self_id, options=None): + async def dial(self, maddr: Multiaddr, self_id: ID) -> "IRawConnection": """ dial a transport to peer listening on multiaddr :param maddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :param options: optional object :return: True if successful """ host = maddr.value_for_protocol("ip4") @@ -81,18 +92,17 @@ class TCP(ITransport): if ack != expected_ack_str: raise Exception("Receiver did not receive peer id") - return RawConnection(host, port, reader, writer, True) + return RawConnection(host, str(port), reader, writer, True) - def create_listener(self, handler_function, options=None): + def create_listener(self, handler_function: "THandler") -> TCPListener: """ create listener on transport - :param options: optional object with properties the listener must have :param handler_function: a function called when a new connection is received that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ - return self.Listener(handler_function) + return TCPListener(handler_function) -def _multiaddr_from_socket(socket): - return multiaddr.Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) +def _multiaddr_from_socket(socket: "socket") -> Multiaddr: + return Multiaddr("/ip4/%s/tcp/%s" % socket.getsockname()) diff --git a/libp2p/transport/transport_interface.py b/libp2p/transport/transport_interface.py index b067e5b..b79d246 100644 --- a/libp2p/transport/transport_interface.py +++ b/libp2p/transport/transport_interface.py @@ -1,22 +1,29 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from multiaddr import Multiaddr + from libp2p.peer.id import ID + from libp2p.network.connection.raw_connection_interface import IRawConnection + from .listener_interface import IListener + from .typing import THandler + class ITransport(ABC): @abstractmethod - def dial(self, maddr, self_id, options=None): + async def dial(self, maddr: "Multiaddr", self_id: "ID") -> "IRawConnection": """ dial a transport to peer listening on multiaddr :param multiaddr: multiaddr of peer :param self_id: peer_id of the dialer (to send to receiver) - :param options: optional object :return: list of multiaddrs """ @abstractmethod - def create_listener(self, handler_function, options=None): + def create_listener(self, handler_function: "THandler") -> "IListener": """ create listener on transport - :param options: optional object with properties the listener must have :param handler_function: a function called when a new conntion is received that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py diff --git a/libp2p/transport/typing.py b/libp2p/transport/typing.py new file mode 100644 index 0000000..9fd36b9 --- /dev/null +++ b/libp2p/transport/typing.py @@ -0,0 +1,6 @@ +from asyncio import StreamReader, StreamWriter +from typing import NewType, Callable + + +THandler = Callable[[StreamReader, StreamWriter], None] + diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index f14d7fc..3cbdad4 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -1,9 +1,28 @@ from libp2p.security.security_multistream import SecurityMultistream from libp2p.stream_muxer.mplex.mplex import Mplex +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + + from typing import Dict, Sequence + from libp2p.network.connection.raw_connection_interface import IRawConnection + from libp2p.network.swarm import GenericProtocolHandlerFn + from libp2p.peer.id import ID + from libp2p.security.secure_conn_interface import ISecureConn + from libp2p.security.secure_transport_interface import ISecureTransport + from libp2p.security.security_multistream import TProtocol + from .transport_interface import ITransport + from .listener_interface import IListener + class TransportUpgrader: - def __init__(self, secOpt, muxerOpt): + security_multistream: SecurityMultistream + muxer: "Sequence[str]" + + def __init__( + self, secOpt: "Dict[TProtocol, ISecureTransport]", muxerOpt: "Sequence[str]" + ) -> None: # Store security option self.security_multistream = SecurityMultistream() for key in secOpt: @@ -12,12 +31,15 @@ class TransportUpgrader: # Store muxer option self.muxer = muxerOpt - def upgrade_listener(self, transport, listeners): + def upgrade_listener(self, transport: "ITransport", listeners: "IListener") -> None: """ Upgrade multiaddr listeners to libp2p-transport listeners """ + pass - async def upgrade_security(self, raw_conn, peer_id, initiator): + async def upgrade_security( + self, raw_conn: "IRawConnection", peer_id: "ID", initiator: bool + ) -> "ISecureConn": """ Upgrade conn to be a secured connection """ @@ -26,7 +48,12 @@ class TransportUpgrader: return await self.security_multistream.secure_inbound(raw_conn) - def upgrade_connection(self, conn, generic_protocol_handler, peer_id): + @staticmethod + def upgrade_connection( + conn: "IRawConnection", + generic_protocol_handler: "GenericProtocolHandlerFn", + peer_id: "ID", + ) -> "Mplex": """ Upgrade raw connection to muxed connection """