Fix examples and modify new_node
- Fix examples `chat.py` and `echo.py` - Use trio directly, instead of `trio-asyncio` - Remove redundant code - Change entry API `new_node` to `new_host_trio`
This commit is contained in:
parent
6fe5871d96
commit
3372c32432
|
@ -1,13 +1,10 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
import multiaddr
|
||||
import trio
|
||||
import trio_asyncio
|
||||
|
||||
from libp2p import new_node
|
||||
from libp2p import new_host_trio
|
||||
from libp2p.network.stream.net_stream_interface import INetStream
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from libp2p.typing import TProtocol
|
||||
|
@ -28,60 +25,48 @@ async def read_data(stream: INetStream) -> None:
|
|||
|
||||
|
||||
async def write_data(stream: INetStream) -> None:
|
||||
loop = asyncio.get_event_loop()
|
||||
async_f = trio.wrap_file(sys.stdin)
|
||||
while True:
|
||||
line = await loop.run_in_executor(None, sys.stdin.readline)
|
||||
line = await async_f.readline()
|
||||
await stream.write(line.encode())
|
||||
|
||||
|
||||
async def run(port: int, destination: str, localhost: bool) -> None:
|
||||
if localhost:
|
||||
ip = "127.0.0.1"
|
||||
else:
|
||||
ip = urllib.request.urlopen("https://v4.ident.me/").read().decode("utf8")
|
||||
transport_opt = f"/ip4/{ip}/tcp/{port}"
|
||||
host = new_node(transport_opt=[transport_opt])
|
||||
async def run(port: int, destination: str) -> None:
|
||||
localhost_ip = "127.0.0.1"
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
async with new_host_trio(
|
||||
listen_addrs=[listen_addr]
|
||||
) as host, trio.open_nursery() as nursery:
|
||||
if not destination: # its the server
|
||||
|
||||
await trio_asyncio.run_asyncio(
|
||||
host.get_network().listen, multiaddr.Multiaddr(transport_opt)
|
||||
)
|
||||
async def stream_handler(stream: INetStream) -> None:
|
||||
nursery.start_soon(read_data, stream)
|
||||
nursery.start_soon(write_data, stream)
|
||||
|
||||
if not destination: # its the server
|
||||
host.set_stream_handler(PROTOCOL_ID, stream_handler)
|
||||
|
||||
async def stream_handler(stream: INetStream) -> None:
|
||||
asyncio.ensure_future(read_data(stream))
|
||||
asyncio.ensure_future(write_data(stream))
|
||||
print(
|
||||
f"Run 'python ./examples/chat/chat.py "
|
||||
f"-p {int(port) + 1} "
|
||||
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}' "
|
||||
"on another console."
|
||||
)
|
||||
print("Waiting for incoming connection...")
|
||||
|
||||
host.set_stream_handler(PROTOCOL_ID, stream_handler)
|
||||
else: # its the client
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
# Associate the peer with local ip address
|
||||
await host.connect(info)
|
||||
# Start a stream with the destination.
|
||||
# Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
|
||||
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
|
||||
|
||||
localhost_opt = " --localhost" if localhost else ""
|
||||
nursery.start_soon(read_data, stream)
|
||||
nursery.start_soon(write_data, stream)
|
||||
print("Connected to peer %s" % info.addrs[0])
|
||||
|
||||
print(
|
||||
f"Run 'python ./examples/chat/chat.py"
|
||||
+ localhost_opt
|
||||
+ f" -p {int(port) + 1} -d /ip4/{ip}/tcp/{port}/p2p/{host.get_id().pretty()}'"
|
||||
+ " on another console."
|
||||
)
|
||||
print("Waiting for incoming connection...")
|
||||
|
||||
else: # its the client
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
# Associate the peer with local ip address
|
||||
await trio_asyncio.run_asyncio(host.connect, info)
|
||||
|
||||
# Start a stream with the destination.
|
||||
# Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
|
||||
stream = await trio_asyncio.run_asyncio(
|
||||
host.new_stream, *(info.peer_id, [PROTOCOL_ID])
|
||||
)
|
||||
|
||||
asyncio.ensure_future(read_data(stream))
|
||||
asyncio.ensure_future(write_data(stream))
|
||||
print("Connected to peer %s" % info.addrs[0])
|
||||
|
||||
stopped_event = trio.Event()
|
||||
await stopped_event.wait()
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
@ -95,11 +80,6 @@ def main() -> None:
|
|||
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
|
||||
)
|
||||
parser = argparse.ArgumentParser(description=description)
|
||||
parser.add_argument(
|
||||
"--debug",
|
||||
action="store_true",
|
||||
help="generate the same node ID on every execution",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p", "--port", default=8000, type=int, help="source port number"
|
||||
)
|
||||
|
@ -109,19 +89,15 @@ def main() -> None:
|
|||
type=str,
|
||||
help=f"destination multiaddr string, e.g. {example_maddr}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-l",
|
||||
"--localhost",
|
||||
dest="localhost",
|
||||
action="store_true",
|
||||
help="flag indicating if localhost should be used or an external IP",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.port:
|
||||
raise RuntimeError("was not able to determine a local port")
|
||||
|
||||
trio_asyncio.run(run, *(args.port, args.destination, args.localhost))
|
||||
try:
|
||||
trio.run(run, *(args.port, args.destination))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import urllib.request
|
||||
|
||||
import multiaddr
|
||||
import trio
|
||||
|
||||
from libp2p import new_node
|
||||
from libp2p import new_host_trio
|
||||
from libp2p.crypto.secp256k1 import create_new_key_pair
|
||||
from libp2p.network.stream.net_stream_interface import INetStream
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
|
@ -20,12 +19,9 @@ async def _echo_stream_handler(stream: INetStream) -> None:
|
|||
await stream.close()
|
||||
|
||||
|
||||
async def run(port: int, destination: str, localhost: bool, seed: int = None) -> None:
|
||||
if localhost:
|
||||
ip = "127.0.0.1"
|
||||
else:
|
||||
ip = urllib.request.urlopen("https://v4.ident.me/").read().decode("utf8")
|
||||
transport_opt = f"/ip4/{ip}/tcp/{port}"
|
||||
async def run(port: int, destination: str, seed: int = None) -> None:
|
||||
localhost_ip = "127.0.0.1"
|
||||
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
|
||||
|
||||
if seed:
|
||||
import random
|
||||
|
@ -38,47 +34,44 @@ async def run(port: int, destination: str, localhost: bool, seed: int = None) ->
|
|||
|
||||
secret = secrets.token_bytes(32)
|
||||
|
||||
host = await new_node(
|
||||
key_pair=create_new_key_pair(secret), transport_opt=[transport_opt]
|
||||
)
|
||||
async with new_host_trio(
|
||||
listen_addrs=[listen_addr], key_pair=create_new_key_pair(secret)
|
||||
) as host:
|
||||
|
||||
print(f"I am {host.get_id().to_string()}")
|
||||
print(f"I am {host.get_id().to_string()}")
|
||||
|
||||
await host.get_network().listen(multiaddr.Multiaddr(transport_opt))
|
||||
if not destination: # its the server
|
||||
|
||||
if not destination: # its the server
|
||||
host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler)
|
||||
|
||||
host.set_stream_handler(PROTOCOL_ID, _echo_stream_handler)
|
||||
print(
|
||||
f"Run 'python ./examples/echo/echo.py "
|
||||
f"-p {int(port) + 1} "
|
||||
f"-d /ip4/{localhost_ip}/tcp/{port}/p2p/{host.get_id().pretty()}' "
|
||||
"on another console."
|
||||
)
|
||||
print("Waiting for incoming connections...")
|
||||
await trio.sleep_forever()
|
||||
|
||||
localhost_opt = " --localhost" if localhost else ""
|
||||
else: # its the client
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
# Associate the peer with local ip address
|
||||
await host.connect(info)
|
||||
|
||||
print(
|
||||
f"Run 'python ./examples/echo/echo.py"
|
||||
+ localhost_opt
|
||||
+ f" -p {int(port) + 1} -d /ip4/{ip}/tcp/{port}/p2p/{host.get_id().pretty()}'"
|
||||
+ " on another console."
|
||||
)
|
||||
print("Waiting for incoming connections...")
|
||||
# Start a stream with the destination.
|
||||
# Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
|
||||
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
|
||||
|
||||
else: # its the client
|
||||
maddr = multiaddr.Multiaddr(destination)
|
||||
info = info_from_p2p_addr(maddr)
|
||||
# Associate the peer with local ip address
|
||||
await host.connect(info)
|
||||
msg = b"hi, there!\n"
|
||||
|
||||
# Start a stream with the destination.
|
||||
# Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
|
||||
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
|
||||
await stream.write(msg)
|
||||
# Notify the other side about EOF
|
||||
await stream.close()
|
||||
response = await stream.read()
|
||||
|
||||
msg = b"hi, there!\n"
|
||||
|
||||
await stream.write(msg)
|
||||
# Notify the other side about EOF
|
||||
await stream.close()
|
||||
response = await stream.read()
|
||||
|
||||
print(f"Sent: {msg}")
|
||||
print(f"Got: {response}")
|
||||
print(f"Sent: {msg}")
|
||||
print(f"Got: {response}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
@ -94,11 +87,6 @@ def main() -> None:
|
|||
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
|
||||
)
|
||||
parser = argparse.ArgumentParser(description=description)
|
||||
parser.add_argument(
|
||||
"--debug",
|
||||
action="store_true",
|
||||
help="generate the same node ID on every execution",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p", "--port", default=8000, type=int, help="source port number"
|
||||
)
|
||||
|
@ -108,13 +96,6 @@ def main() -> None:
|
|||
type=str,
|
||||
help=f"destination multiaddr string, e.g. {example_maddr}",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-l",
|
||||
"--localhost",
|
||||
dest="localhost",
|
||||
action="store_true",
|
||||
help="flag indicating if localhost should be used or an external IP",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--seed",
|
||||
|
@ -126,16 +107,10 @@ def main() -> None:
|
|||
if not args.port:
|
||||
raise RuntimeError("was not able to determine a local port")
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
asyncio.ensure_future(
|
||||
run(args.port, args.destination, args.localhost, args.seed)
|
||||
)
|
||||
loop.run_forever()
|
||||
trio.run(run, args.port, args.destination, args.seed)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
from typing import Sequence
|
||||
from typing import AsyncIterator, Sequence
|
||||
|
||||
from async_generator import asynccontextmanager
|
||||
from async_service import background_trio_service
|
||||
|
||||
from libp2p.crypto.keys import KeyPair
|
||||
from libp2p.crypto.rsa import create_new_key_pair
|
||||
from libp2p.host.basic_host import BasicHost
|
||||
from libp2p.host.host_interface import IHost
|
||||
from libp2p.host.routed_host import RoutedHost
|
||||
from libp2p.network.network_interface import INetwork
|
||||
from libp2p.network.network_interface import INetwork, INetworkService
|
||||
from libp2p.network.swarm import Swarm
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peerstore import PeerStore
|
||||
|
@ -30,28 +33,27 @@ def generate_peer_id_from(key_pair: KeyPair) -> ID:
|
|||
|
||||
|
||||
def initialize_default_swarm(
|
||||
key_pair: KeyPair,
|
||||
id_opt: ID = None,
|
||||
transport_opt: Sequence[str] = None,
|
||||
key_pair: KeyPair = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
sec_opt: TSecurityOptions = None,
|
||||
peerstore_opt: IPeerStore = None,
|
||||
) -> Swarm:
|
||||
) -> INetworkService:
|
||||
"""
|
||||
initialize swarm when no swarm is passed in.
|
||||
|
||||
:param id_opt: optional id for host
|
||||
:param transport_opt: optional choice of transport upgrade
|
||||
:param key_pair: optional choice of the ``KeyPair``
|
||||
:param muxer_opt: optional choice of stream muxer
|
||||
:param sec_opt: optional choice of security upgrade
|
||||
:param peerstore_opt: optional peerstore
|
||||
:return: return a default swarm instance
|
||||
"""
|
||||
|
||||
if not id_opt:
|
||||
id_opt = generate_peer_id_from(key_pair)
|
||||
if not key_pair:
|
||||
key_pair = generate_new_rsa_identity()
|
||||
|
||||
# TODO: Parse `transport_opt` to determine transport
|
||||
id_opt = generate_peer_id_from(key_pair)
|
||||
|
||||
# TODO: Parse `listen_addrs` to determine transport
|
||||
transport = TCP()
|
||||
|
||||
muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex}
|
||||
|
@ -67,48 +69,17 @@ def initialize_default_swarm(
|
|||
# Store our key pair in peerstore
|
||||
peerstore.add_key_pair(id_opt, key_pair)
|
||||
|
||||
# TODO: Initialize discovery if not presented
|
||||
return Swarm(id_opt, peerstore, upgrader, transport)
|
||||
|
||||
|
||||
def new_node(
|
||||
key_pair: KeyPair = None,
|
||||
swarm_opt: INetwork = None,
|
||||
transport_opt: Sequence[str] = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
sec_opt: TSecurityOptions = None,
|
||||
peerstore_opt: IPeerStore = None,
|
||||
disc_opt: IPeerRouting = None,
|
||||
) -> BasicHost:
|
||||
def _new_host(swarm_opt: INetwork, disc_opt: IPeerRouting = None) -> IHost:
|
||||
"""
|
||||
create new libp2p node.
|
||||
create new libp2p host.
|
||||
|
||||
:param key_pair: key pair for deriving an identity
|
||||
:param swarm_opt: optional swarm
|
||||
:param id_opt: optional id for host
|
||||
:param transport_opt: optional choice of transport upgrade
|
||||
:param muxer_opt: optional choice of stream muxer
|
||||
:param sec_opt: optional choice of security upgrade
|
||||
:param peerstore_opt: optional peerstore
|
||||
:param disc_opt: optional discovery
|
||||
:return: return a host instance
|
||||
"""
|
||||
|
||||
if not key_pair:
|
||||
key_pair = generate_new_rsa_identity()
|
||||
|
||||
id_opt = generate_peer_id_from(key_pair)
|
||||
|
||||
if not swarm_opt:
|
||||
swarm_opt = initialize_default_swarm(
|
||||
key_pair=key_pair,
|
||||
id_opt=id_opt,
|
||||
transport_opt=transport_opt,
|
||||
muxer_opt=muxer_opt,
|
||||
sec_opt=sec_opt,
|
||||
peerstore_opt=peerstore_opt,
|
||||
)
|
||||
|
||||
# TODO enable support for other host type
|
||||
# TODO routing unimplemented
|
||||
host: IHost # If not explicitly typed, MyPy raises error
|
||||
|
@ -118,3 +89,28 @@ def new_node(
|
|||
host = BasicHost(swarm_opt)
|
||||
|
||||
return host
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def new_host_trio(
|
||||
listen_addrs: Sequence[str],
|
||||
key_pair: KeyPair = None,
|
||||
swarm_opt: INetwork = None,
|
||||
muxer_opt: TMuxerOptions = None,
|
||||
sec_opt: TSecurityOptions = None,
|
||||
peerstore_opt: IPeerStore = None,
|
||||
disc_opt: IPeerRouting = None,
|
||||
) -> AsyncIterator[IHost]:
|
||||
swarm = initialize_default_swarm(
|
||||
key_pair=key_pair,
|
||||
muxer_opt=muxer_opt,
|
||||
sec_opt=sec_opt,
|
||||
peerstore_opt=peerstore_opt,
|
||||
)
|
||||
async with background_trio_service(swarm):
|
||||
await swarm.listen(*listen_addrs)
|
||||
host = _new_host(swarm_opt=swarm, disc_opt=disc_opt)
|
||||
yield host
|
||||
|
||||
|
||||
# TODO: Support asyncio
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, Dict, Sequence
|
||||
|
||||
from async_service import ServiceAPI
|
||||
from multiaddr import Multiaddr
|
||||
|
||||
from libp2p.network.connection.net_connection_interface import INetConn
|
||||
|
@ -70,3 +71,7 @@ class INetwork(ABC):
|
|||
@abstractmethod
|
||||
async def close_peer(self, peer_id: ID) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class INetworkService(INetwork, ServiceAPI):
|
||||
...
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import logging
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from async_service import Service
|
||||
from multiaddr import Multiaddr
|
||||
import trio
|
||||
|
||||
|
@ -25,14 +24,14 @@ from ..exceptions import MultiError
|
|||
from .connection.raw_connection import RawConnection
|
||||
from .connection.swarm_connection import SwarmConn
|
||||
from .exceptions import SwarmException
|
||||
from .network_interface import INetwork
|
||||
from .network_interface import INetworkService
|
||||
from .notifee_interface import INotifee
|
||||
from .stream.net_stream_interface import INetStream
|
||||
|
||||
logger = logging.getLogger("libp2p.network.swarm")
|
||||
|
||||
|
||||
class Swarm(INetwork, Service):
|
||||
class Swarm(INetworkService):
|
||||
|
||||
self_id: ID
|
||||
peerstore: IPeerStore
|
||||
|
|
|
@ -25,9 +25,6 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
|
|||
if not addr:
|
||||
raise InvalidAddrError("`addr` should not be `None`")
|
||||
|
||||
if not isinstance(addr, multiaddr.Multiaddr):
|
||||
raise InvalidAddrError(f"`addr`={addr} should be of type `Multiaddr`")
|
||||
|
||||
parts = addr.split()
|
||||
if not parts:
|
||||
raise InvalidAddrError(
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
from async_service import background_trio_service
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from libp2p import new_node
|
||||
from libp2p import new_host_trio
|
||||
from libp2p.crypto.rsa import create_new_key_pair
|
||||
from libp2p.security.insecure.transport import InsecureSession, InsecureTransport
|
||||
from libp2p.tools.constants import LISTEN_MADDR
|
||||
|
@ -30,16 +29,15 @@ async def perform_simple_test(
|
|||
# for testing, we do NOT want to communicate over a stream so we can't just create two nodes
|
||||
# and use their conn because our mplex will internally relay messages to a stream
|
||||
|
||||
node1 = new_node(key_pair=initiator_key_pair, sec_opt=transports_for_initiator)
|
||||
node2 = new_node(
|
||||
key_pair=noninitiator_key_pair, sec_opt=transports_for_noninitiator
|
||||
)
|
||||
swarm1 = node1.get_network()
|
||||
swarm2 = node2.get_network()
|
||||
async with background_trio_service(swarm1), background_trio_service(swarm2):
|
||||
await swarm1.listen(LISTEN_MADDR)
|
||||
await swarm2.listen(LISTEN_MADDR)
|
||||
|
||||
async with new_host_trio(
|
||||
listen_addrs=[LISTEN_MADDR],
|
||||
key_pair=initiator_key_pair,
|
||||
sec_opt=transports_for_initiator,
|
||||
) as node1, new_host_trio(
|
||||
listen_addrs=[LISTEN_MADDR],
|
||||
key_pair=noninitiator_key_pair,
|
||||
sec_opt=transports_for_noninitiator,
|
||||
) as node2:
|
||||
await connect(node1, node2)
|
||||
|
||||
# Wait a very short period to allow conns to be stored (since the functions
|
||||
|
|
Loading…
Reference in New Issue
Block a user