Add basic support for multiaddr addresses and improvement around peer id (#75)

* Improved peer ID construction and usage

* peer id object is directly passed to the network

no need to cast from a string to an ID

* don't base64 encode the peer id when loading from public key

* use proper multiaddr address

- keep multiaddr object into peerstore instead of string
- update network code to use new multiaddr lib
- update tests and example

* don't instanciate peerstore object in constructor

This has side effect where the same peerstore
is used for different instance of Libp2p

* add connect method to basic_host

* use zaibon's fork of sbuss/py-multiaddr

* lint
This commit is contained in:
Christophe de Carvalho 2018-11-29 16:06:40 +01:00 committed by Robert Zajac
parent 7fa674dee2
commit 611de28aca
15 changed files with 279 additions and 74 deletions

View File

@ -8,6 +8,8 @@ import asyncio
import click
from libp2p.libp2p import *
from network.multiaddr import MultiAddr
from peer.peerinfo import info_from_p2p_addr
# TODO: change once muxed_connection supports extracting protocol id from messages
PROTOCOL_ID = '/echo/1.0.0'
@ -37,7 +39,7 @@ async def write_data(stream):
async def run(port, destination):
if not destination:
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s/p2p/hostA" % port])
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s" % port])
async def stream_handler(stream):
asyncio.ensure_future(read_data(stream))
@ -48,38 +50,31 @@ async def run(port, destination):
port = None
for listener in host.network.listeners.values():
for addr in listener.get_addrs():
addr_dict = addr.to_options()
if addr_dict['transport'] == 'tcp':
port = addr_dict['port']
break
port = int(addr.value_for_protocol('tcp'))
if not port:
raise RuntimeError("was not able to find the actual local port")
print("Run './examples/chat/chat.py --port %s -d /ip4/127.0.0.1/tcp/%s/p2p/%s' on another console.\n" % (int(port)+1, port, host.get_id().pretty()))
print("Run './examples/chat/chat.py --port %s -d /ip4/127.0.0.1/tcp/%s/p2p/%s' on another console.\n" %
(int(port)+1, port, host.get_id().pretty()))
print("You can replace 127.0.0.1 with public IP as well.")
print("\nWaiting for incoming connection\n\n")
else:
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s/p2p/hostB" % port])
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s" % port])
# TODO: improve multiaddr module to have proper function to do this
multiaddr = MultiAddr(destination)
ss = multiaddr.get_multiaddr_string().split('/')
peer_id = ss[-1]
addr = '/'.join(ss[:-2])
# Associate the peer with local ip address (see default parameters of Libp2p())
host.get_peerstore().add_addr(peer_id, addr, 10)
m = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(m)
# Associate the peer with local ip address
await host.connect(info)
# Start a stream with the destination.
# Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
stream = await host.new_stream(peer_id, [PROTOCOL_ID])
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])
asyncio.ensure_future(read_data(stream))
asyncio.ensure_future(write_data(stream))
print("Already connected to peer %s" % addr)
print("Already connected to peer %s" % info.addrs[0])
@click.command()

View File

@ -1,11 +1,13 @@
from .host_interface import IHost
import multiaddr
from .host_interface import IHost
# Upon host creation, host takes in options,
# including the list of addresses on which to listen.
# Host then parses these options and delegates to its Network instance,
# telling it to listen on the given listen addresses.
class BasicHost(IHost):
# default options constructor
@ -36,6 +38,18 @@ class BasicHost(IHost):
:return: mux instance of host
"""
def get_addrs(self):
"""
:return: all the multiaddr addresses this host is listening too
"""
p2p_part = multiaddr.Multiaddr('/ipfs/{}'.format(self.get_id().pretty()))
addrs = []
for transport in self.network.listeners.values():
for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part))
return addrs
def set_stream_handler(self, protocol_id, stream_handler):
"""
set stream handler for host
@ -45,7 +59,6 @@ class BasicHost(IHost):
"""
return self.network.set_stream_handler(protocol_id, stream_handler)
# protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on
async def new_stream(self, peer_id, protocol_ids):
@ -56,3 +69,22 @@ class BasicHost(IHost):
"""
stream = await self.network.new_stream(peer_id, protocol_ids)
return stream
async def connect(self, peer_info):
"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.
:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# there is already a connection to this peer
if peer_info.peer_id in self.network.connections:
return
await self.network.dial_peer(peer_info.peer_id)

View File

@ -21,6 +21,12 @@ class IHost(ABC):
:return: mux instance of host
"""
@abstractmethod
def get_addrs(self):
"""
:return: all the multiaddr addresses this host is listening too
"""
@abstractmethod
def set_stream_handler(self, protocol_id, stream_handler):
"""
@ -39,3 +45,16 @@ class IHost(ABC):
:param protocol_ids: protocol ids that stream can run on
:return: true if successful
"""
@abstractmethod
def connect(self, peer_info):
"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.
:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""

View File

@ -1,5 +1,7 @@
from Crypto.PublicKey import RSA
import multiaddr
from peer.peerstore import PeerStore
from peer.id import id_from_public_key
from network.swarm import Swarm
from host.basic_host import BasicHost
from transport.upgrader import TransportUpgrader
@ -11,10 +13,11 @@ async def new_node(id_opt=None, transport_opt=None, \
if id_opt is None:
new_key = RSA.generate(2048, e=65537)
id_opt = new_key.publickey().exportKey("PEM")
id_opt = id_from_public_key(new_key.publickey())
# private_key = new_key.exportKey("PEM")
transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
transport_opt = [multiaddr.Multiaddr(t) for t in transport_opt]
muxer_opt = muxer_opt or ["mplex/6.7.0"]
sec_opt = sec_opt or ["secio"]
peerstore = peerstore or PeerStore()

View File

@ -9,6 +9,16 @@ class INetwork(ABC):
:return: the peer id
"""
@abstractmethod
def dial_peer(self, peer_id):
"""
dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id
:return: muxed connection
"""
@abstractmethod
def set_stream_handler(self, protocol_id, stream_handler):
"""

View File

@ -1,18 +1,15 @@
from peer.id import ID
from protocol_muxer.multiselect_client import MultiselectClient
from protocol_muxer.multiselect import Multiselect
from .network_interface import INetwork
from .stream.net_stream import NetStream
from .multiaddr import MultiAddr
from .connection.raw_connection import RawConnection
class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop
def __init__(self, my_peer_id, peerstore, upgrader):
self._my_peer_id = my_peer_id
self.self_id = ID(my_peer_id)
def __init__(self, peer_id, peerstore, upgrader):
self.self_id = peer_id
self.peerstore = peerstore
self.upgrader = upgrader
self.connections = dict()
@ -36,6 +33,39 @@ class Swarm(INetwork):
self.multiselect.add_handler(protocol_id, stream_handler)
return True
async def dial_peer(self, peer_id):
"""
dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id
:return: muxed connection
"""
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)
if not addrs:
raise SwarmException("No known addresses to peer")
# TODO: define logic to choose which address to use, or try them all ?
multiaddr = addrs[0]
if peer_id in self.connections:
# If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
muxed_conn = self.connections[peer_id]
else:
# Transport dials peer (gets back a raw conn)
raw_conn = await self.transport.dial(multiaddr)
# Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)
# Store muxed connection in connections
self.connections[peer_id] = muxed_conn
return muxed_conn
async def new_stream(self, peer_id, protocol_ids):
"""
:param peer_id: peer_id of destination
@ -50,19 +80,7 @@ class Swarm(INetwork):
multiaddr = addrs[0]
if peer_id in self.connections:
# If muxed connection already exists for peer_id,
# set muxed connection equal to existing muxed connection
muxed_conn = self.connections[peer_id]
else:
# Transport dials peer (gets back a raw conn)
raw_conn = await self.transport.dial(MultiAddr(multiaddr))
# Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)
# Store muxed connection in connections
self.connections[peer_id] = muxed_conn
muxed_conn = await self.dial_peer(peer_id)
# Use muxed conn to open stream, which returns
# a muxed stream
@ -92,18 +110,15 @@ class Swarm(INetwork):
Call listener listen with the multiaddr
Map multiaddr to listener
"""
for multiaddr_str in args:
if multiaddr_str in self.listeners:
for multiaddr in args:
if str(multiaddr) in self.listeners:
return True
multiaddr = MultiAddr(multiaddr_str)
multiaddr_dict = multiaddr.to_options()
async def conn_handler(reader, writer):
# Upgrade reader/write to a net_stream and pass \
# to appropriate stream handler (using multiaddr)
raw_conn = RawConnection(multiaddr_dict['host'], \
multiaddr_dict['port'], reader, writer)
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
multiaddr.value_for_protocol('tcp'), reader, writer)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
# TODO: Remove protocol id from muxed_conn accept stream or
@ -124,7 +139,7 @@ class Swarm(INetwork):
try:
# Success
listener = self.transport.create_listener(conn_handler)
self.listeners[multiaddr_str] = listener
self.listeners[str(multiaddr)] = listener
await listener.listen(multiaddr)
return True
except IOError:
@ -138,5 +153,6 @@ class Swarm(INetwork):
# TODO: Support more than one transport
self.transport = transport
class SwarmException(Exception):
pass

View File

@ -1,4 +1,14 @@
import base58
import multihash
# MaxInlineKeyLength is the maximum length a key can be for it to be inlined in
# the peer ID.
# * When `len(pubKey.Bytes()) <= MaxInlineKeyLength`, the peer ID is the
# identity multihash hash of the public key.
# * When `len(pubKey.Bytes()) > MaxInlineKeyLength`, the peer ID is the
# sha2-256 multihash of the public key.
MAX_INLINE_KEY_LENGTH = 42
class ID:
@ -15,3 +25,42 @@ class ID:
return "<peer.ID %s*%s>" % (pid[:2], pid[len(pid)-6:])
__repr__ = __str__
def __eq__(self, other):
#pylint: disable=protected-access
return self._id_str == other._id_str
def __hash__(self):
return hash(self._id_str)
def id_b58_encode(peer_id):
"""
return a b58-encoded string
"""
#pylint: disable=protected-access
return base58.b58encode(peer_id._id_str).decode()
def id_b58_decode(peer_id_str):
"""
return a base58-decoded peer ID
"""
return ID(base58.b58decode(peer_id_str))
def id_from_public_key(key):
# export into binary format
key_bin = key.exportKey("DER")
algo = multihash.Func.sha2_256
# TODO: seems identity is not yet supported in pymultihash
# if len(b) <= MAX_INLINE_KEY_LENGTH:
# algo multihash.func.identity
mh_digest = multihash.digest(key_bin, algo)
return ID(mh_digest.encode())
def id_from_private_key(key):
return id_from_public_key(key.publickey())

View File

@ -1,5 +1,42 @@
import multiaddr
import multiaddr.util
from peer.id import id_b58_decode
from peer.peerdata import PeerData
class PeerInfo:
# pylint: disable=too-few-public-methods
def __init__(self, peer_id, peer_data):
self.peer_id = peer_id
self.addrs = peer_data.get_addrs()
def info_from_p2p_addr(addr):
if not addr:
raise InvalidAddrError()
parts = multiaddr.util.split(addr)
if not parts:
raise InvalidAddrError()
ipfspart = parts[-1]
if ipfspart.protocols()[0].code != multiaddr.protocols.P_IPFS:
raise InvalidAddrError()
# make sure the /ipfs value parses as a peer.ID
peer_id_str = ipfspart.value_for_protocol(multiaddr.protocols.P_IPFS)
peer_id = id_b58_decode(peer_id_str)
# we might have received just an / ipfs part, which means there's no addr.
if len(parts) > 1:
addr = multiaddr.util.join(parts[:-1])
peer_data = PeerData()
peer_data.addrs = [addr]
peer_data.protocols = [p.code for p in addr.protocols()]
return PeerInfo(peer_id, peer_data)
class InvalidAddrError(ValueError):
pass

View File

@ -42,7 +42,7 @@ class PeerStore(IPeerStore):
peer.set_protocols(protocols)
def peers(self):
return self.peer_map.keys()
return list(self.peer_map.keys())
def get(self, peer_id, key):
if peer_id in self.peer_map:

View File

@ -4,4 +4,6 @@ pytest
pycryptodome
pytest-asyncio
click
base58
base58
pymultihash
py-multiaddr

View File

@ -1,11 +1,14 @@
import multiaddr
import pytest
from libp2p.libp2p import new_node
from peer.peerinfo import info_from_p2p_addr
@pytest.mark.asyncio
async def test_simple_messages():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8001/ipfs/node_a"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8000/ipfs/node_b"])
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8001"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8000"])
async def stream_handler(stream):
while True:
@ -19,10 +22,9 @@ async def test_simple_messages():
node_b.set_stream_handler("/echo/1.0.0", stream_handler)
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addr("node_b", "/ip4/127.0.0.1/tcp/8000", 10)
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
print("node_a about to open stream")
stream = await node_a.new_stream("node_b", ["/echo/1.0.0"])
stream = await node_a.new_stream(node_b.get_id(), ["/echo/1.0.0"])
messages = ["hello" + str(x) for x in range(10)]
for message in messages:
@ -36,10 +38,11 @@ async def test_simple_messages():
# Success, terminate pending tasks.
return
@pytest.mark.asyncio
async def test_double_response():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8002/ipfs/node_a"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8003/ipfs/node_b"])
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8002"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8003"])
async def stream_handler(stream):
while True:
@ -57,9 +60,9 @@ async def test_double_response():
node_b.set_stream_handler("/echo/1.0.0", stream_handler)
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addr("node_b", "/ip4/127.0.0.1/tcp/8003", 10)
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
print("node_a about to open stream")
stream = await node_a.new_stream("node_b", ["/echo/1.0.0"])
stream = await node_a.new_stream(node_b.get_id(), ["/echo/1.0.0"])
messages = ["hello" + str(x) for x in range(10)]
for message in messages:
await stream.write(message.encode())
@ -76,3 +79,26 @@ async def test_double_response():
# Success, terminate pending tasks.
return
@pytest.mark.asyncio
async def test_host_connect():
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8001/"])
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/8000/"])
assert not node_a.get_peerstore().peers()
addr = node_b.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node_a.connect(info)
assert len(node_a.get_peerstore().peers()) == 1
await node_a.connect(info)
# make sure we don't do double connection
assert len(node_a.get_peerstore().peers()) == 1
assert node_b.get_id() in node_a.get_peerstore().peers()
ma_node_b = multiaddr.Multiaddr('/ipfs/%s' % node_b.get_id().pretty())
for addr in node_a.get_peerstore().addrs(node_b.get_id()):
assert addr.encapsulate(ma_node_b) in node_b.get_addrs()

View File

@ -0,0 +1,9 @@
from Crypto.PublicKey import RSA
from peer.id import id_from_private_key, id_from_public_key
def test_id_from_private_key():
key = RSA.generate(2048, e=65537)
id_from_pub = id_from_public_key(key.publickey())
id_from_priv = id_from_private_key(key)
assert id_from_pub == id_from_priv

View File

@ -0,0 +1,11 @@
import multiaddr
from peer.peerinfo import info_from_p2p_addr
def test_info_from_p2p_addr():
# pylint: disable=line-too-long
m_addr = multiaddr.Multiaddr('/ip4/127.0.0.1/tcp/8000/ipfs/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ')
info = info_from_p2p_addr(m_addr)
assert info.peer_id.pretty() == '3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ'
assert len(info.addrs) == 1
assert str(info.addrs[0]) == '/ip4/127.0.0.1/tcp/8000'

View File

@ -14,8 +14,8 @@ from protocol_muxer.multiselect_client import MultiselectClientError
async def perform_simple_test(expected_selected_protocol, \
protocols_for_client, protocols_with_handlers, \
node_a_port, node_b_port):
transport_opt_a = ["/ip4/127.0.0.1/tcp/" + str(node_a_port) + "/ipfs/node_a"]
transport_opt_b = ["/ip4/127.0.0.1/tcp/" + str(node_b_port) + "/ipfs/node_b"]
transport_opt_a = ["/ip4/127.0.0.1/tcp/" + str(node_a_port)]
transport_opt_b = ["/ip4/127.0.0.1/tcp/" + str(node_b_port)]
node_a = await new_node(\
transport_opt=transport_opt_a)
node_b = await new_node(\
@ -34,9 +34,9 @@ async def perform_simple_test(expected_selected_protocol, \
node_b.set_stream_handler(protocol, stream_handler)
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addr("node_b", "/ip4/127.0.0.1/tcp/" + str(node_b_port), 10)
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
stream = await node_a.new_stream("node_b", protocols_for_client)
stream = await node_a.new_stream(node_b.get_id(), protocols_for_client)
messages = ["hello" + str(x) for x in range(10)]
for message in messages:
await stream.write(message.encode())

View File

@ -23,15 +23,12 @@ class TCP(ITransport):
"""
_multiaddr = multiaddr
# TODO check for exceptions
if "ipfs" in _multiaddr.get_protocols():
# ipfs_id = multiaddr.get_ipfs_id()
_multiaddr.remove_protocol("ipfs")
_multiaddr = _multiaddr.decapsulate('/ipfs')
self.multiaddrs.append(_multiaddr)
multiaddr_dict = _multiaddr.to_options()
coroutine = asyncio.start_server(self.handler, multiaddr_dict['host'],\
multiaddr_dict['port'])
coroutine = asyncio.start_server(self.handler,
_multiaddr.value_for_protocol('ip4'),
_multiaddr.value_for_protocol('tcp'))
self.server = await coroutine
return True
@ -67,9 +64,8 @@ class TCP(ITransport):
:param options: optional object
:return: True if successful
"""
_multiaddr_dict = multiaddr.to_options()
host = _multiaddr_dict['host']
port = _multiaddr_dict['port']
host = multiaddr.value_for_protocol('ip4')
port = int(multiaddr.value_for_protocol('tcp'))
reader, writer = await asyncio.open_connection(host, port)
@ -79,7 +75,7 @@ class TCP(ITransport):
"""
create listener on transport
:param options: optional object with properties the listener must have
:param handler_function: a function called when a new conntion is received
:param handler_function: a function called when a new connection is received
that takes a connection as argument which implements interface-connection
:return: a listener object that implements listener_interface.py
"""