commit
1571bfac07
|
@ -8,3 +8,9 @@ class ValidationError(BaseLibp2pError):
|
||||||
|
|
||||||
class ParseError(BaseLibp2pError):
|
class ParseError(BaseLibp2pError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MultiError(BaseLibp2pError):
|
||||||
|
"""Raised with multiple exceptions."""
|
||||||
|
|
||||||
|
# todo: find some way for this to fancy-print all encapsulated errors
|
||||||
|
|
|
@ -19,6 +19,7 @@ from libp2p.transport.transport_interface import ITransport
|
||||||
from libp2p.transport.upgrader import TransportUpgrader
|
from libp2p.transport.upgrader import TransportUpgrader
|
||||||
from libp2p.typing import StreamHandlerFn
|
from libp2p.typing import StreamHandlerFn
|
||||||
|
|
||||||
|
from ..exceptions import MultiError
|
||||||
from .connection.raw_connection import RawConnection
|
from .connection.raw_connection import RawConnection
|
||||||
from .connection.swarm_connection import SwarmConn
|
from .connection.swarm_connection import SwarmConn
|
||||||
from .exceptions import SwarmException
|
from .exceptions import SwarmException
|
||||||
|
@ -87,21 +88,51 @@ class Swarm(INetwork):
|
||||||
try:
|
try:
|
||||||
# Get peer info from peer store
|
# Get peer info from peer store
|
||||||
addrs = self.peerstore.addrs(peer_id)
|
addrs = self.peerstore.addrs(peer_id)
|
||||||
except PeerStoreError:
|
except PeerStoreError as error:
|
||||||
raise SwarmException(f"No known addresses to peer {peer_id}")
|
raise SwarmException(f"No known addresses to peer {peer_id}") from error
|
||||||
|
|
||||||
if not addrs:
|
if not addrs:
|
||||||
raise SwarmException(f"No known addresses to peer {peer_id}")
|
raise SwarmException(f"No known addresses to peer {peer_id}")
|
||||||
|
|
||||||
multiaddr = addrs[0]
|
exceptions: List[SwarmException] = []
|
||||||
|
|
||||||
|
# Try all known addresses
|
||||||
|
for multiaddr in addrs:
|
||||||
|
try:
|
||||||
|
return await self.dial_addr(multiaddr, peer_id)
|
||||||
|
except SwarmException as e:
|
||||||
|
exceptions.append(e)
|
||||||
|
logger.debug(
|
||||||
|
"encountered swarm exception when trying to connect to %s, "
|
||||||
|
"trying next address...",
|
||||||
|
multiaddr,
|
||||||
|
exc_info=e,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Tried all addresses, raising exception.
|
||||||
|
raise SwarmException(
|
||||||
|
f"unable to connect to {peer_id}, no addresses established a successful connection "
|
||||||
|
"(with exceptions)"
|
||||||
|
) from MultiError(exceptions)
|
||||||
|
|
||||||
|
async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
|
||||||
|
"""
|
||||||
|
dial_addr try to create a connection to peer_id with addr.
|
||||||
|
|
||||||
|
:param addr: the address we want to connect with
|
||||||
|
:param peer_id: the peer we want to connect to
|
||||||
|
:raises SwarmException: raised when an error occurs
|
||||||
|
:return: network connection
|
||||||
|
"""
|
||||||
|
|
||||||
# Dial peer (connection to peer does not yet exist)
|
# Dial peer (connection to peer does not yet exist)
|
||||||
# Transport dials peer (gets back a raw conn)
|
# Transport dials peer (gets back a raw conn)
|
||||||
try:
|
try:
|
||||||
raw_conn = await self.transport.dial(multiaddr)
|
raw_conn = await self.transport.dial(addr)
|
||||||
except OpenConnectionError as error:
|
except OpenConnectionError as error:
|
||||||
logger.debug("fail to dial peer %s over base transport", peer_id)
|
logger.debug("fail to dial peer %s over base transport", peer_id)
|
||||||
raise SwarmException(
|
raise SwarmException(
|
||||||
"fail to open connection to peer %s", peer_id
|
f"fail to open connection to peer {peer_id}"
|
||||||
) from error
|
) from error
|
||||||
|
|
||||||
logger.debug("dialed peer %s over base transport", peer_id)
|
logger.debug("dialed peer %s over base transport", peer_id)
|
||||||
|
@ -137,7 +168,6 @@ class Swarm(INetwork):
|
||||||
async def new_stream(self, peer_id: ID) -> INetStream:
|
async def new_stream(self, peer_id: ID) -> INetStream:
|
||||||
"""
|
"""
|
||||||
:param peer_id: peer_id of destination
|
:param peer_id: peer_id of destination
|
||||||
:param protocol_id: protocol id
|
|
||||||
:raises SwarmException: raised when an error occurs
|
:raises SwarmException: raised when an error occurs
|
||||||
:return: net stream instance
|
:return: net stream instance
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
from multiaddr import Multiaddr
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from libp2p.network.exceptions import SwarmException
|
from libp2p.network.exceptions import SwarmException
|
||||||
|
@ -91,3 +92,59 @@ async def test_swarm_remove_conn(swarm_pair):
|
||||||
# Test: Remove twice. There should not be errors.
|
# Test: Remove twice. There should not be errors.
|
||||||
swarm_0.remove_conn(conn_0)
|
swarm_0.remove_conn(conn_0)
|
||||||
assert swarm_1.get_peer_id() not in swarm_0.connections
|
assert swarm_1.get_peer_id() not in swarm_0.connections
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_swarm_multiaddr(is_host_secure):
|
||||||
|
swarms = await SwarmFactory.create_batch_and_listen(is_host_secure, 3)
|
||||||
|
|
||||||
|
def clear():
|
||||||
|
swarms[0].peerstore.clear_addrs(swarms[1].get_peer_id())
|
||||||
|
|
||||||
|
clear()
|
||||||
|
# No addresses
|
||||||
|
with pytest.raises(SwarmException):
|
||||||
|
await swarms[0].dial_peer(swarms[1].get_peer_id())
|
||||||
|
|
||||||
|
clear()
|
||||||
|
# Wrong addresses
|
||||||
|
swarms[0].peerstore.add_addrs(
|
||||||
|
swarms[1].get_peer_id(), [Multiaddr("/ip4/0.0.0.0/tcp/9999")], 10000
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(SwarmException):
|
||||||
|
await swarms[0].dial_peer(swarms[1].get_peer_id())
|
||||||
|
|
||||||
|
clear()
|
||||||
|
# Multiple wrong addresses
|
||||||
|
swarms[0].peerstore.add_addrs(
|
||||||
|
swarms[1].get_peer_id(),
|
||||||
|
[Multiaddr("/ip4/0.0.0.0/tcp/9999"), Multiaddr("/ip4/0.0.0.0/tcp/9998")],
|
||||||
|
10000,
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(SwarmException):
|
||||||
|
await swarms[0].dial_peer(swarms[1].get_peer_id())
|
||||||
|
|
||||||
|
# Test one address
|
||||||
|
addrs = tuple(
|
||||||
|
addr
|
||||||
|
for transport in swarms[1].listeners.values()
|
||||||
|
for addr in transport.get_addrs()
|
||||||
|
)
|
||||||
|
|
||||||
|
swarms[0].peerstore.add_addrs(swarms[1].get_peer_id(), addrs[:1], 10000)
|
||||||
|
await swarms[0].dial_peer(swarms[1].get_peer_id())
|
||||||
|
|
||||||
|
# Test multiple addresses
|
||||||
|
addrs = tuple(
|
||||||
|
addr
|
||||||
|
for transport in swarms[1].listeners.values()
|
||||||
|
for addr in transport.get_addrs()
|
||||||
|
)
|
||||||
|
|
||||||
|
swarms[0].peerstore.add_addrs(swarms[1].get_peer_id(), addrs + addrs, 10000)
|
||||||
|
await swarms[0].dial_peer(swarms[1].get_peer_id())
|
||||||
|
|
||||||
|
for swarm in swarms:
|
||||||
|
await swarm.close()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user