71 lines
3.0 KiB
Python
71 lines
3.0 KiB
Python
from collections import OrderedDict
|
|
|
|
from libp2p.network.connection.raw_connection_interface import IRawConnection
|
|
from libp2p.peer.id import ID
|
|
from libp2p.protocol_muxer.multiselect import Multiselect
|
|
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
|
|
from libp2p.protocol_muxer.multiselect_communicator import MultiselectCommunicator
|
|
from libp2p.security.secure_conn_interface import ISecureConn
|
|
from libp2p.transport.typing import TMuxerClass, TMuxerOptions
|
|
from libp2p.typing import TProtocol
|
|
|
|
from .abc import IMuxedConn
|
|
|
|
# FIXME: add negotiate timeout to `MuxerMultistream`
|
|
DEFAULT_NEGOTIATE_TIMEOUT = 60
|
|
|
|
|
|
class MuxerMultistream:
|
|
"""
|
|
MuxerMultistream is a multistream stream muxed transport multiplexer.
|
|
go implementation: github.com/libp2p/go-stream-muxer-multistream/multistream.go
|
|
"""
|
|
|
|
# NOTE: Can be changed to `typing.OrderedDict` since Python 3.7.2.
|
|
transports: "OrderedDict[TProtocol, TMuxerClass]"
|
|
multiselect: Multiselect
|
|
multiselect_client: MultiselectClient
|
|
|
|
def __init__(self, muxer_transports_by_protocol: TMuxerOptions = None) -> None:
|
|
self.transports = OrderedDict()
|
|
self.multiselect = Multiselect()
|
|
self.multiselect_client = MultiselectClient()
|
|
if muxer_transports_by_protocol is not None:
|
|
for protocol, transport in muxer_transports_by_protocol.items():
|
|
self.add_transport(protocol, transport)
|
|
|
|
def add_transport(self, protocol: TProtocol, transport: TMuxerClass) -> None:
|
|
"""
|
|
Add a protocol and its corresponding transport to multistream-select(multiselect).
|
|
The order that a protocol is added is exactly the precedence it is negotiated in
|
|
multiselect.
|
|
:param protocol: the protocol name, which is negotiated in multiselect.
|
|
:param transport: the corresponding transportation to the ``protocol``.
|
|
"""
|
|
# If protocol is already added before, remove it and add it again.
|
|
if protocol in self.transports:
|
|
del self.transports[protocol]
|
|
self.transports[protocol] = transport
|
|
self.multiselect.add_handler(protocol, None)
|
|
|
|
async def select_transport(self, conn: IRawConnection) -> TMuxerClass:
|
|
"""
|
|
Select a transport that both us and the node on the
|
|
other end of conn support and agree on
|
|
:param conn: conn to choose a transport over
|
|
:return: selected muxer transport
|
|
"""
|
|
protocol: TProtocol
|
|
communicator = MultiselectCommunicator(conn)
|
|
if conn.initiator:
|
|
protocol = await self.multiselect_client.select_one_of(
|
|
tuple(self.transports.keys()), communicator
|
|
)
|
|
else:
|
|
protocol, _ = await self.multiselect.negotiate(communicator)
|
|
return self.transports[protocol]
|
|
|
|
async def new_conn(self, conn: ISecureConn, peer_id: ID) -> IMuxedConn:
|
|
transport_class = await self.select_transport(conn)
|
|
return transport_class(conn, peer_id)
|