Merge pull request #252 from mhchia/fix/add-negotiation-when-upgrading-to-mplex

Negotiate multiplexer protocol when upgrading to `MuxedConn`
This commit is contained in:
Kevin Mai-Husan Chia 2019-08-21 12:30:22 +08:00 committed by GitHub
commit 6b05d9ab8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 136 additions and 36 deletions

View File

@ -13,8 +13,10 @@ from libp2p.peer.peerstore import PeerStore
from libp2p.peer.peerstore_interface import IPeerStore from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.interfaces import IPeerRouting
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from libp2p.security.insecure.transport import InsecureTransport from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.secure_transport_interface import ISecureTransport
from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex
from libp2p.stream_muxer.muxer_multistream import MuxerClassType
from libp2p.transport.tcp.tcp import TCP from libp2p.transport.tcp.tcp import TCP
from libp2p.transport.upgrader import TransportUpgrader from libp2p.transport.upgrader import TransportUpgrader
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
@ -72,7 +74,7 @@ def initialize_default_swarm(
key_pair: KeyPair, key_pair: KeyPair,
id_opt: ID = None, id_opt: ID = None,
transport_opt: Sequence[str] = None, transport_opt: Sequence[str] = None,
muxer_opt: Sequence[str] = None, muxer_opt: Mapping[TProtocol, MuxerClassType] = None,
sec_opt: Mapping[TProtocol, ISecureTransport] = None, sec_opt: Mapping[TProtocol, ISecureTransport] = None,
peerstore_opt: IPeerStore = None, peerstore_opt: IPeerStore = None,
disc_opt: IPeerRouting = None, disc_opt: IPeerRouting = None,
@ -91,23 +93,20 @@ def initialize_default_swarm(
if not id_opt: if not id_opt:
id_opt = generate_peer_id_from_rsa_identity(key_pair) id_opt = generate_peer_id_from_rsa_identity(key_pair)
# TODO parse transport_opt to determine transport # TODO: Parse `transport_opt` to determine transport
transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
transport = TCP() transport = TCP()
# TODO TransportUpgrader is not doing anything really muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex}
# TODO parse muxer and sec to pass into TransportUpgrader
muxer = muxer_opt or ["mplex/6.7.0"]
security_transports_by_protocol = sec_opt or { security_transports_by_protocol = sec_opt or {
TProtocol("insecure/1.0.0"): InsecureTransport(key_pair) PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair)
} }
upgrader = TransportUpgrader(security_transports_by_protocol, muxer) upgrader = TransportUpgrader(
security_transports_by_protocol, muxer_transports_by_protocol
)
peerstore = peerstore_opt or PeerStore() peerstore = peerstore_opt or PeerStore()
# TODO: Initialize discovery if not presented # TODO: Initialize discovery if not presented
swarm_opt = Swarm(id_opt, peerstore, upgrader, transport, disc_opt) return Swarm(id_opt, peerstore, upgrader, transport, disc_opt)
return swarm_opt
async def new_node( async def new_node(
@ -115,7 +114,7 @@ async def new_node(
swarm_opt: INetwork = None, swarm_opt: INetwork = None,
id_opt: ID = None, id_opt: ID = None,
transport_opt: Sequence[str] = None, transport_opt: Sequence[str] = None,
muxer_opt: Sequence[str] = None, muxer_opt: Mapping[TProtocol, MuxerClassType] = None,
sec_opt: Mapping[TProtocol, ISecureTransport] = None, sec_opt: Mapping[TProtocol, ISecureTransport] = None,
peerstore_opt: IPeerStore = None, peerstore_opt: IPeerStore = None,
disc_opt: IPeerRouting = None, disc_opt: IPeerRouting = None,

View File

@ -111,7 +111,7 @@ class Swarm(INetwork):
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
# the conn and then mux the conn # the conn and then mux the conn
secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True)
muxed_conn = self.upgrader.upgrade_connection( muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id secured_conn, self.generic_protocol_handler, peer_id
) )
@ -204,7 +204,7 @@ class Swarm(INetwork):
secured_conn = await self.upgrader.upgrade_security( secured_conn = await self.upgrader.upgrade_security(
raw_conn, peer_id, False raw_conn, peer_id, False
) )
muxed_conn = self.upgrader.upgrade_connection( muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id secured_conn, self.generic_protocol_handler, peer_id
) )

View File

@ -456,8 +456,6 @@ class GossipSub(IPubsubRouter):
""" """
Checks the seen set and requests unknown messages with an IWANT message. Checks the seen set and requests unknown messages with an IWANT message.
""" """
# from_id_bytes = ihave_msg.from_id
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache
seen_seqnos_and_peers = [ seen_seqnos_and_peers = [
seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys() seqno_and_from for seqno_and_from in self.pubsub.seen_messages.keys()

View File

@ -3,6 +3,9 @@ from libp2p.peer.id import ID
from libp2p.security.base_session import BaseSession from libp2p.security.base_session import BaseSession
from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.typing import TProtocol
PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/1.0.0")
class InsecureSession(BaseSession): class InsecureSession(BaseSession):

View File

@ -14,8 +14,6 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa
class AbstractSecureConn(ABC): class AbstractSecureConn(ABC):
conn: IRawConnection
@abstractmethod @abstractmethod
def get_local_peer(self) -> ID: def get_local_peer(self) -> ID:
pass pass

View File

@ -1,5 +1,6 @@
from abc import ABC from abc import ABC
from typing import Dict, Mapping from collections import OrderedDict
from typing import Mapping
from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.peer.id import ID from libp2p.peer.id import ID
@ -20,14 +21,20 @@ Relevant go repo: https://github.com/libp2p/go-conn-security/blob/master/interfa
class SecurityMultistream(ABC): class SecurityMultistream(ABC):
transports: Dict[TProtocol, ISecureTransport] """
SSMuxer is a multistream stream security transport multiplexer.
Go implementation: github.com/libp2p/go-conn-security-multistream/ssms.go
"""
# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.
transports: "OrderedDict[TProtocol, ISecureTransport]"
multiselect: Multiselect multiselect: Multiselect
multiselect_client: MultiselectClient multiselect_client: MultiselectClient
def __init__( def __init__(
self, secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport] self, secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport]
) -> None: ) -> None:
self.transports = {} self.transports = OrderedDict()
self.multiselect = Multiselect() self.multiselect = Multiselect()
self.multiselect_client = MultiselectClient() self.multiselect_client = MultiselectClient()
@ -35,8 +42,17 @@ class SecurityMultistream(ABC):
self.add_transport(protocol, transport) self.add_transport(protocol, transport)
def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> None: def add_transport(self, protocol: TProtocol, transport: ISecureTransport) -> None:
"""
Add a protocol and its corresponding transport to multistream-select(multiselect).
The order that a protocol is added is exactly the precedence it is negotiated in
multiselect.
:param protocol: the protocol name, which is negotiated in multiselect.
:param transport: the corresponding transportation to the ``protocol``.
"""
# If protocol is already added before, remove it and add it again.
if protocol in self.transports:
del self.transports[protocol]
self.transports[protocol] = transport self.transports[protocol] = transport
# Note: None is added as the handler for the given protocol since # Note: None is added as the handler for the given protocol since
# we only care about selecting the protocol, not any handler function # we only care about selecting the protocol, not any handler function
self.multiselect.add_handler(protocol, None) self.multiselect.add_handler(protocol, None)

View File

@ -8,12 +8,15 @@ from libp2p.network.typing import GenericProtocolHandlerFn
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
from libp2p.typing import TProtocol
from .constants import HeaderTags from .constants import HeaderTags
from .exceptions import StreamNotFound from .exceptions import StreamNotFound
from .mplex_stream import MplexStream from .mplex_stream import MplexStream
from .utils import decode_uvarint_from_stream, encode_uvarint from .utils import decode_uvarint_from_stream, encode_uvarint
MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0")
class Mplex(IMuxedConn): class Mplex(IMuxedConn):
""" """

View File

@ -0,0 +1,79 @@
from collections import OrderedDict
from typing import Mapping, Type
from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.network.typing import GenericProtocolHandlerFn
from libp2p.peer.id import ID
from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator
from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.typing import TProtocol
from .abc import IMuxedConn
MuxerClassType = Type[IMuxedConn]
# FIXME: add negotiate timeout to `MuxerMultistream`
DEFAULT_NEGOTIATE_TIMEOUT = 60
class MuxerMultistream:
"""
MuxerMultistream is a multistream stream muxed transport multiplexer.
go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go
"""
# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.
transports: "OrderedDict[TProtocol, MuxerClassType]"
multiselect: Multiselect
multiselect_client: MultiselectClient
def __init__(
self, muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType]
) -> None:
self.transports = OrderedDict()
self.multiselect = Multiselect()
self.multiselect_client = MultiselectClient()
for protocol, transport in muxer_transports_by_protocol.items():
self.add_transport(protocol, transport)
def add_transport(self, protocol: TProtocol, transport: MuxerClassType) -> None:
"""
Add a protocol and its corresponding transport to multistream-select(multiselect).
The order that a protocol is added is exactly the precedence it is negotiated in
multiselect.
:param protocol: the protocol name, which is negotiated in multiselect.
:param transport: the corresponding transportation to the ``protocol``.
"""
# If protocol is already added before, remove it and add it again.
if protocol in self.transports:
del self.transports[protocol]
self.transports[protocol] = transport
self.multiselect.add_handler(protocol, None)
async def select_transport(self, conn: IRawConnection) -> MuxerClassType:
"""
Select a transport that both us and the node on the
other end of conn support and agree on
:param conn: conn to choose a transport over
:return: selected muxer transport
"""
protocol: TProtocol
communicator = RawConnectionCommunicator(conn)
if conn.initiator:
protocol = await self.multiselect_client.select_one_of(
tuple(self.transports.keys()), communicator
)
else:
protocol, _ = await self.multiselect.negotiate(communicator)
return self.transports[protocol]
async def new_conn(
self,
conn: ISecureConn,
generic_protocol_handler: GenericProtocolHandlerFn,
peer_id: ID,
) -> IMuxedConn:
transport_class = await self.select_transport(conn)
return transport_class(conn, generic_protocol_handler, peer_id)

View File

@ -67,13 +67,14 @@ class TCP(ITransport):
dial a transport to peer listening on multiaddr dial a transport to peer listening on multiaddr
:param maddr: multiaddr of peer :param maddr: multiaddr of peer
:param self_id: peer_id of the dialer (to send to receiver) :param self_id: peer_id of the dialer (to send to receiver)
:return: True if successful :return: `RawConnection` if successful
""" """
host = maddr.value_for_protocol("ip4") host = maddr.value_for_protocol("ip4")
port = maddr.value_for_protocol("tcp") port = maddr.value_for_protocol("tcp")
reader, writer = await asyncio.open_connection(host, int(port)) reader, writer = await asyncio.open_connection(host, int(port))
# TODO: Change this `sending peer id` process to `/plaintext/2.0.0`
# First: send our peer ID so receiver knows it # First: send our peer ID so receiver knows it
writer.write(self_id.to_base58().encode()) writer.write(self_id.to_base58().encode())
await writer.drain() await writer.drain()

View File

@ -1,4 +1,4 @@
from typing import Mapping, Sequence from typing import Mapping
from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.network.typing import GenericProtocolHandlerFn
@ -6,7 +6,8 @@ from libp2p.peer.id import ID
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.secure_transport_interface import ISecureTransport
from libp2p.security.security_multistream import SecurityMultistream from libp2p.security.security_multistream import SecurityMultistream
from libp2p.stream_muxer.mplex.mplex import Mplex from libp2p.stream_muxer.abc import IMuxedConn
from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from .listener_interface import IListener from .listener_interface import IListener
@ -15,41 +16,43 @@ from .transport_interface import ITransport
class TransportUpgrader: class TransportUpgrader:
security_multistream: SecurityMultistream security_multistream: SecurityMultistream
muxer: Sequence[str] muxer_multistream: MuxerMultistream
def __init__( def __init__(
self, self,
secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport], secure_transports_by_protocol: Mapping[TProtocol, ISecureTransport],
muxerOpt: Sequence[str], muxer_transports_by_protocol: Mapping[TProtocol, MuxerClassType],
): ):
self.security_multistream = SecurityMultistream(secure_transports_by_protocol) self.security_multistream = SecurityMultistream(secure_transports_by_protocol)
self.muxer = muxerOpt self.muxer_multistream = MuxerMultistream(muxer_transports_by_protocol)
def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None: def upgrade_listener(self, transport: ITransport, listeners: IListener) -> None:
""" """
Upgrade multiaddr listeners to libp2p-transport listeners Upgrade multiaddr listeners to libp2p-transport listeners
""" """
# TODO: Figure out what to do with this function.
pass pass
async def upgrade_security( async def upgrade_security(
self, raw_conn: IRawConnection, peer_id: ID, initiator: bool self, raw_conn: IRawConnection, peer_id: ID, initiator: bool
) -> ISecureConn: ) -> ISecureConn:
""" """
Upgrade conn to be a secured connection Upgrade conn to a secured connection
""" """
if initiator: if initiator:
return await self.security_multistream.secure_outbound(raw_conn, peer_id) return await self.security_multistream.secure_outbound(raw_conn, peer_id)
return await self.security_multistream.secure_inbound(raw_conn) return await self.security_multistream.secure_inbound(raw_conn)
@staticmethod async def upgrade_connection(
def upgrade_connection( self,
conn: ISecureConn, conn: ISecureConn,
generic_protocol_handler: GenericProtocolHandlerFn, generic_protocol_handler: GenericProtocolHandlerFn,
peer_id: ID, peer_id: ID,
) -> Mplex: ) -> IMuxedConn:
""" """
Upgrade raw connection to muxed connection Upgrade secured connection to a muxed connection
""" """
# TODO do exchange to determine multiplexer return await self.muxer_multistream.new_conn(
return Mplex(conn, generic_protocol_handler, peer_id) conn, generic_protocol_handler, peer_id
)