Handle errors from
- `read_delim` - `read_varint_prefixed_bytes` - `decode_uvarint_from_stream`
This commit is contained in:
parent
905a473ac3
commit
879f193aa1
@ -1,6 +1,9 @@
|
||||
from libp2p.exceptions import ParseError
|
||||
from libp2p.io.abc import ReadWriteCloser
|
||||
from libp2p.io.exceptions import IOException
|
||||
from libp2p.utils import encode_delim, read_delim
|
||||
|
||||
from .exceptions import MultiselectCommunicatorError
|
||||
from .multiselect_communicator_interface import IMultiselectCommunicator
|
||||
|
||||
|
||||
@ -15,5 +18,11 @@ class MultiselectCommunicator(IMultiselectCommunicator):
|
||||
await self.read_writer.write(msg_bytes)
|
||||
|
||||
async def read(self) -> str:
|
||||
data = await read_delim(self.read_writer)
|
||||
try:
|
||||
data = await read_delim(self.read_writer)
|
||||
# `IOException` includes `IncompleteReadError` and `StreamError`
|
||||
except (ParseError, IOException, ValueError):
|
||||
raise MultiselectCommunicatorError(
|
||||
"fail to read from multiselect communicator"
|
||||
)
|
||||
return data.decode()
|
||||
|
@ -16,12 +16,14 @@ from typing import (
|
||||
import base58
|
||||
from lru import LRU
|
||||
|
||||
from libp2p.exceptions import ValidationError
|
||||
from libp2p.exceptions import ParseError, ValidationError
|
||||
from libp2p.host.host_interface import IHost
|
||||
from libp2p.io.exceptions import IncompleteReadError
|
||||
from libp2p.network.stream.net_stream_interface import INetStream
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.typing import TProtocol
|
||||
from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes
|
||||
from libp2p.network.stream.exceptions import StreamEOF, StreamReset
|
||||
|
||||
from .pb import rpc_pb2
|
||||
from .pubsub_notifee import PubsubNotifee
|
||||
@ -154,7 +156,13 @@ class Pubsub:
|
||||
peer_id = stream.mplex_conn.peer_id
|
||||
|
||||
while True:
|
||||
incoming: bytes = await read_varint_prefixed_bytes(stream)
|
||||
try:
|
||||
incoming: bytes = await read_varint_prefixed_bytes(stream)
|
||||
except (ParseError, IncompleteReadError) as error:
|
||||
logger.debug(
|
||||
"read corrupted data from peer %s, error=%s", peer_id, error
|
||||
)
|
||||
continue
|
||||
rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
|
||||
rpc_incoming.ParseFromString(incoming)
|
||||
if rpc_incoming.publish:
|
||||
@ -228,7 +236,13 @@ class Pubsub:
|
||||
on one of the supported pubsub protocols.
|
||||
:param stream: newly created stream
|
||||
"""
|
||||
await self.continuously_read_stream(stream)
|
||||
try:
|
||||
await self.continuously_read_stream(stream)
|
||||
except (StreamEOF, StreamReset) as error:
|
||||
logger.debug("fail to read from stream, error=%s", error)
|
||||
stream.reset()
|
||||
# TODO: what to do when the stream is terminated?
|
||||
# disconnect the peer?
|
||||
|
||||
async def _handle_new_peer(self, peer_id: ID) -> None:
|
||||
stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
|
||||
|
@ -2,6 +2,8 @@ import asyncio
|
||||
from typing import Any # noqa: F401
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from libp2p.exceptions import ParseError
|
||||
from libp2p.io.exceptions import IncompleteReadError
|
||||
from libp2p.network.typing import GenericProtocolHandlerFn
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.security.secure_conn_interface import ISecureConn
|
||||
@ -248,13 +250,15 @@ class Mplex(IMuxedConn):
|
||||
# FIXME: No timeout is used in Go implementation.
|
||||
# Timeout is set to a relatively small value to alleviate wait time to exit
|
||||
# loop in handle_incoming
|
||||
header = await decode_uvarint_from_stream(self.secured_conn)
|
||||
# TODO: Handle the case of EOF and other exceptions?
|
||||
try:
|
||||
header = await decode_uvarint_from_stream(self.secured_conn)
|
||||
except ParseError:
|
||||
return None, None, None
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
read_varint_prefixed_bytes(self.secured_conn), timeout=5
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
except (ParseError, IncompleteReadError, asyncio.TimeoutError):
|
||||
# TODO: Investigate what we should do if time is out.
|
||||
return None, None, None
|
||||
|
||||
|
@ -65,10 +65,6 @@ def encode_varint_prefixed(msg_bytes: bytes) -> bytes:
|
||||
async def read_varint_prefixed_bytes(reader: Reader) -> bytes:
|
||||
len_msg = await decode_uvarint_from_stream(reader)
|
||||
data = await read_exactly(reader, len_msg)
|
||||
if len(data) != len_msg:
|
||||
raise ValueError(
|
||||
f"failed to read enough bytes: len_msg={len_msg}, data={data!r}"
|
||||
)
|
||||
return data
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user