From 6e01a7da31470408486037dba64d9fe32d9b9a7f Mon Sep 17 00:00:00 2001 From: mhchia Date: Sun, 26 Jan 2020 16:44:42 +0800 Subject: [PATCH] PR feedback: async with host.run() --- examples/chat/chat.py | 7 +-- examples/echo/echo.py | 7 +-- libp2p/__init__.py | 64 ++++++++------------- libp2p/host/basic_host.py | 26 +++++++-- libp2p/host/host_interface.py | 16 +++++- libp2p/host/routed_host.py | 4 +- tests/host/test_basic_host.py | 4 +- tests/security/test_security_multistream.py | 18 +++--- 8 files changed, 75 insertions(+), 71 deletions(-) diff --git a/examples/chat/chat.py b/examples/chat/chat.py index 80dcc86..41aad92 100755 --- a/examples/chat/chat.py +++ b/examples/chat/chat.py @@ -4,7 +4,7 @@ import sys import multiaddr import trio -from libp2p import new_host_trio +from libp2p import new_host from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.typing import TProtocol @@ -34,9 +34,8 @@ async def write_data(stream: INetStream) -> None: async def run(port: int, destination: str) -> None: localhost_ip = "127.0.0.1" listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}") - async with new_host_trio( - listen_addrs=[listen_addr] - ) as host, trio.open_nursery() as nursery: + host = new_host() + async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery: if not destination: # its the server async def stream_handler(stream: INetStream) -> None: diff --git a/examples/echo/echo.py b/examples/echo/echo.py index b39f813..5ea8ab4 100644 --- a/examples/echo/echo.py +++ b/examples/echo/echo.py @@ -3,7 +3,7 @@ import argparse import multiaddr import trio -from libp2p import new_host_trio +from libp2p import new_host from libp2p.crypto.secp256k1 import create_new_key_pair from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.peerinfo import info_from_p2p_addr @@ -34,9 +34,8 @@ async def run(port: int, destination: str, seed: int = None) -> None: secret = secrets.token_bytes(32) - async with new_host_trio( - listen_addrs=[listen_addr], key_pair=create_new_key_pair(secret) - ) as host: + host = new_host(key_pair=create_new_key_pair(secret)) + async with host.run(listen_addrs=[listen_addr]): print(f"I am {host.get_id().to_string()}") diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 658fceb..4d91b9d 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -1,14 +1,9 @@ -from typing import AsyncIterator, Sequence - -from async_generator import asynccontextmanager -from async_service import background_trio_service - from libp2p.crypto.keys import KeyPair from libp2p.crypto.rsa import create_new_key_pair from libp2p.host.basic_host import BasicHost from libp2p.host.host_interface import IHost from libp2p.host.routed_host import RoutedHost -from libp2p.network.network_interface import INetwork, INetworkService +from libp2p.network.network_interface import INetworkService from libp2p.network.swarm import Swarm from libp2p.peer.id import ID from libp2p.peer.peerstore import PeerStore @@ -32,14 +27,14 @@ def generate_peer_id_from(key_pair: KeyPair) -> ID: return ID.from_pubkey(public_key) -def initialize_default_swarm( +def new_swarm( key_pair: KeyPair = None, muxer_opt: TMuxerOptions = None, sec_opt: TSecurityOptions = None, peerstore_opt: IPeerStore = None, ) -> INetworkService: """ - initialize swarm when no swarm is passed in. + Create a swarm instance based on the parameters. :param key_pair: optional choice of the ``KeyPair`` :param muxer_opt: optional choice of stream muxer @@ -48,7 +43,7 @@ def initialize_default_swarm( :return: return a default swarm instance """ - if not key_pair: + if key_pair is None: key_pair = generate_new_rsa_identity() id_opt = generate_peer_id_from(key_pair) @@ -72,45 +67,32 @@ def initialize_default_swarm( return Swarm(id_opt, peerstore, upgrader, transport) -def _new_host(swarm_opt: INetwork, disc_opt: IPeerRouting = None) -> IHost: - """ - create new libp2p host. - - :param swarm_opt: optional swarm - :param disc_opt: optional discovery - :return: return a host instance - """ - # TODO enable support for other host type - # TODO routing unimplemented - host: IHost # If not explicitly typed, MyPy raises error - if disc_opt: - host = RoutedHost(swarm_opt, disc_opt) - else: - host = BasicHost(swarm_opt) - - return host - - -@asynccontextmanager -async def new_host_trio( - listen_addrs: Sequence[str], +def new_host( key_pair: KeyPair = None, - swarm_opt: INetwork = None, muxer_opt: TMuxerOptions = None, sec_opt: TSecurityOptions = None, peerstore_opt: IPeerStore = None, disc_opt: IPeerRouting = None, -) -> AsyncIterator[IHost]: - swarm = initialize_default_swarm( +) -> IHost: + """ + Create a new libp2p host based on the given parameters. + + :param key_pair: optional choice of the ``KeyPair`` + :param muxer_opt: optional choice of stream muxer + :param sec_opt: optional choice of security upgrade + :param peerstore_opt: optional peerstore + :param disc_opt: optional discovery + :return: return a host instance + """ + swarm = new_swarm( key_pair=key_pair, muxer_opt=muxer_opt, sec_opt=sec_opt, peerstore_opt=peerstore_opt, ) - async with background_trio_service(swarm): - await swarm.listen(*listen_addrs) - host = _new_host(swarm_opt=swarm, disc_opt=disc_opt) - yield host - - -# TODO: Support asyncio + host: IHost + if disc_opt: + host = RoutedHost(swarm, disc_opt) + else: + host = BasicHost(swarm) + return host diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 253394e..6386fb8 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -1,12 +1,14 @@ import logging -from typing import TYPE_CHECKING, List, Sequence +from typing import TYPE_CHECKING, AsyncIterator, List, Sequence +from async_generator import asynccontextmanager +from async_service import background_trio_service import multiaddr from libp2p.crypto.keys import PrivateKey, PublicKey from libp2p.host.defaults import get_default_protocols from libp2p.host.exceptions import StreamFailure -from libp2p.network.network_interface import INetwork +from libp2p.network.network_interface import INetworkService from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo @@ -39,7 +41,7 @@ class BasicHost(IHost): right after a stream is initialized. """ - _network: INetwork + _network: INetworkService peerstore: IPeerStore multiselect: Multiselect @@ -47,7 +49,7 @@ class BasicHost(IHost): def __init__( self, - network: INetwork, + network: INetworkService, default_protocols: "OrderedDict[TProtocol, StreamHandlerFn]" = None, ) -> None: self._network = network @@ -70,7 +72,7 @@ class BasicHost(IHost): def get_private_key(self) -> PrivateKey: return self.peerstore.privkey(self.get_id()) - def get_network(self) -> INetwork: + def get_network(self) -> INetworkService: """ :return: network instance of host """ @@ -101,6 +103,20 @@ class BasicHost(IHost): addrs.append(addr.encapsulate(p2p_part)) return addrs + @asynccontextmanager + async def run( + self, listen_addrs: Sequence[multiaddr.Multiaddr] + ) -> AsyncIterator[None]: + """ + run the host instance and listen to ``listen_addrs``. + + :param listen_addrs: a sequence of multiaddrs that we want to listen to + """ + network = self.get_network() + async with background_trio_service(network): + await network.listen(*listen_addrs) + yield + def set_stream_handler( self, protocol_id: TProtocol, stream_handler: StreamHandlerFn ) -> None: diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 43f4ac4..59146e7 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -1,10 +1,10 @@ from abc import ABC, abstractmethod -from typing import Any, List, Sequence +from typing import Any, AsyncContextManager, List, Sequence import multiaddr from libp2p.crypto.keys import PrivateKey, PublicKey -from libp2p.network.network_interface import INetwork +from libp2p.network.network_interface import INetworkService from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.peer.peerinfo import PeerInfo @@ -31,7 +31,7 @@ class IHost(ABC): """ @abstractmethod - def get_network(self) -> INetwork: + def get_network(self) -> INetworkService: """ :return: network instance of host """ @@ -49,6 +49,16 @@ class IHost(ABC): :return: all the multiaddr addresses this host is listening to """ + @abstractmethod + def run( + self, listen_addrs: Sequence[multiaddr.Multiaddr] + ) -> AsyncContextManager[None]: + """ + run the host instance and listen to ``listen_addrs``. + + :param listen_addrs: a sequence of multiaddrs that we want to listen to + """ + @abstractmethod def set_stream_handler( self, protocol_id: TProtocol, stream_handler: StreamHandlerFn diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py index 78b6fa5..91264c7 100644 --- a/libp2p/host/routed_host.py +++ b/libp2p/host/routed_host.py @@ -1,6 +1,6 @@ from libp2p.host.basic_host import BasicHost from libp2p.host.exceptions import ConnectionFailure -from libp2p.network.network_interface import INetwork +from libp2p.network.network_interface import INetworkService from libp2p.peer.peerinfo import PeerInfo from libp2p.routing.interfaces import IPeerRouting @@ -10,7 +10,7 @@ from libp2p.routing.interfaces import IPeerRouting class RoutedHost(BasicHost): _router: IPeerRouting - def __init__(self, network: INetwork, router: IPeerRouting): + def __init__(self, network: INetworkService, router: IPeerRouting): super().__init__(network) self._router = router diff --git a/tests/host/test_basic_host.py b/tests/host/test_basic_host.py index 1eec04a..55605ed 100644 --- a/tests/host/test_basic_host.py +++ b/tests/host/test_basic_host.py @@ -1,4 +1,4 @@ -from libp2p import initialize_default_swarm +from libp2p import new_swarm from libp2p.crypto.rsa import create_new_key_pair from libp2p.host.basic_host import BasicHost from libp2p.host.defaults import get_default_protocols @@ -6,7 +6,7 @@ from libp2p.host.defaults import get_default_protocols def test_default_protocols(): key_pair = create_new_key_pair() - swarm = initialize_default_swarm(key_pair) + swarm = new_swarm(key_pair) host = BasicHost(swarm) mux = host.get_mux() diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index 8d46610..cd968ac 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -1,7 +1,7 @@ import pytest import trio -from libp2p import new_host_trio +from libp2p import new_host from libp2p.crypto.rsa import create_new_key_pair from libp2p.security.insecure.transport import InsecureSession, InsecureTransport from libp2p.tools.constants import LISTEN_MADDR @@ -29,15 +29,13 @@ async def perform_simple_test( # for testing, we do NOT want to communicate over a stream so we can't just create two nodes # and use their conn because our mplex will internally relay messages to a stream - async with new_host_trio( - listen_addrs=[LISTEN_MADDR], - key_pair=initiator_key_pair, - sec_opt=transports_for_initiator, - ) as node1, new_host_trio( - listen_addrs=[LISTEN_MADDR], - key_pair=noninitiator_key_pair, - sec_opt=transports_for_noninitiator, - ) as node2: + node1 = new_host(key_pair=initiator_key_pair, sec_opt=transports_for_initiator) + node2 = new_host( + key_pair=noninitiator_key_pair, sec_opt=transports_for_noninitiator + ) + async with node1.run(listen_addrs=[LISTEN_MADDR]), node2.run( + listen_addrs=[LISTEN_MADDR] + ): await connect(node1, node2) # Wait a very short period to allow conns to be stored (since the functions