Handle MultiselectError
in stream_muxer.accept_stream
This commit is contained in:
parent
eaa74c4e26
commit
76af835af8
|
@ -6,6 +6,7 @@ from libp2p.exceptions import ParseError
|
||||||
from libp2p.io.exceptions import IncompleteReadError
|
from libp2p.io.exceptions import IncompleteReadError
|
||||||
from libp2p.network.typing import GenericProtocolHandlerFn
|
from libp2p.network.typing import GenericProtocolHandlerFn
|
||||||
from libp2p.peer.id import ID
|
from libp2p.peer.id import ID
|
||||||
|
from libp2p.protocol_muxer.exceptions import MultiselectError
|
||||||
from libp2p.security.secure_conn_interface import ISecureConn
|
from libp2p.security.secure_conn_interface import ISecureConn
|
||||||
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
|
||||||
from libp2p.typing import TProtocol
|
from libp2p.typing import TProtocol
|
||||||
|
@ -102,12 +103,6 @@ class Mplex(IMuxedConn):
|
||||||
self.next_channel_id += 1
|
self.next_channel_id += 1
|
||||||
return next_id
|
return next_id
|
||||||
|
|
||||||
async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream:
|
|
||||||
async with self.streams_lock:
|
|
||||||
stream = MplexStream(name, stream_id, self)
|
|
||||||
self.streams[stream_id] = stream
|
|
||||||
return stream
|
|
||||||
|
|
||||||
async def open_stream(self) -> IMuxedStream:
|
async def open_stream(self) -> IMuxedStream:
|
||||||
"""
|
"""
|
||||||
creates a new muxed_stream
|
creates a new muxed_stream
|
||||||
|
@ -117,17 +112,28 @@ class Mplex(IMuxedConn):
|
||||||
stream_id = StreamID(channel_id=channel_id, is_initiator=True)
|
stream_id = StreamID(channel_id=channel_id, is_initiator=True)
|
||||||
# Default stream name is the `channel_id`
|
# Default stream name is the `channel_id`
|
||||||
name = str(channel_id)
|
name = str(channel_id)
|
||||||
stream = await self._initialize_stream(stream_id, name)
|
async with self.streams_lock:
|
||||||
|
stream = MplexStream(name, stream_id, self)
|
||||||
await self.send_message(HeaderTags.NewStream, name.encode(), stream_id)
|
await self.send_message(HeaderTags.NewStream, name.encode(), stream_id)
|
||||||
|
# TODO: is there a way to know if the peer accepted the stream?
|
||||||
|
# then we can safely register the stream
|
||||||
|
self.streams[stream_id] = stream
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def accept_stream(self, stream_id: StreamID, name: str) -> None:
|
async def accept_stream(self, stream_id: StreamID, name: str) -> None:
|
||||||
"""
|
"""
|
||||||
accepts a muxed stream opened by the other end
|
accepts a muxed stream opened by the other end
|
||||||
"""
|
"""
|
||||||
stream = await self._initialize_stream(stream_id, name)
|
async with self.streams_lock:
|
||||||
|
stream = MplexStream(name, stream_id, self)
|
||||||
# Perform protocol negotiation for the stream.
|
# Perform protocol negotiation for the stream.
|
||||||
self._tasks.append(asyncio.ensure_future(self.generic_protocol_handler(stream)))
|
try:
|
||||||
|
await self.generic_protocol_handler(stream)
|
||||||
|
except MultiselectError:
|
||||||
|
# TODO: what to do when stream protocol negotiation fail?
|
||||||
|
return
|
||||||
|
|
||||||
|
self.streams[stream_id] = stream
|
||||||
|
|
||||||
async def send_message(
|
async def send_message(
|
||||||
self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID
|
self, flag: HeaderTags, data: Optional[bytes], stream_id: StreamID
|
||||||
|
@ -180,7 +186,11 @@ class Mplex(IMuxedConn):
|
||||||
# `NewStream` for the same id is received twice...
|
# `NewStream` for the same id is received twice...
|
||||||
# TODO: Shutdown
|
# TODO: Shutdown
|
||||||
pass
|
pass
|
||||||
await self.accept_stream(stream_id, message.decode())
|
self._tasks.append(
|
||||||
|
asyncio.ensure_future(
|
||||||
|
self.accept_stream(stream_id, message.decode())
|
||||||
|
)
|
||||||
|
)
|
||||||
elif flag in (
|
elif flag in (
|
||||||
HeaderTags.MessageInitiator.value,
|
HeaderTags.MessageInitiator.value,
|
||||||
HeaderTags.MessageReceiver.value,
|
HeaderTags.MessageReceiver.value,
|
||||||
|
|
Loading…
Reference in New Issue
Block a user