Update error handling of pubsub stream handler

This commit is contained in:
NIC619 2019-11-04 21:17:54 +08:00
parent 97b3aca535
commit d36e323703
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17

View File

@ -163,13 +163,7 @@ class Pubsub:
peer_id = stream.muxed_conn.peer_id peer_id = stream.muxed_conn.peer_id
while True: while True:
try:
incoming: bytes = await read_varint_prefixed_bytes(stream) incoming: bytes = await read_varint_prefixed_bytes(stream)
except (ParseError, IncompleteReadError) as error:
logger.debug(
"read corrupted data from peer %s, error=%s", peer_id, error
)
continue
rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming) rpc_incoming.ParseFromString(incoming)
if rpc_incoming.publish: if rpc_incoming.publish:
@ -252,13 +246,18 @@ class Pubsub:
:param stream: newly created stream :param stream: newly created stream
""" """
peer_id = stream.muxed_conn.peer_id
# Error handling pattern reference:
# https://github.com/libp2p/go-libp2p-pubsub/blob/534fe2f382d8dd75dab89ddb0760542546c9f24e/comm.go#L38-L46 # noqa: E501
try: try:
await self.continuously_read_stream(stream) await self.continuously_read_stream(stream)
except (StreamEOF, StreamReset) as error: except StreamEOF as error:
logger.debug("fail to read from stream, error=%s", error) stream.close()
await stream.reset() logger.debug("fail to read from peer %s, error=%s", peer_id, error)
# TODO: what to do when the stream is terminated? except (ParseError, IncompleteReadError, StreamReset) as error:
# disconnect the peer? stream.reset()
logger.debug("read corrupted data from peer %s, error=%s", peer_id, error)
async def _handle_new_peer(self, peer_id: ID) -> None: async def _handle_new_peer(self, peer_id: ID) -> None:
try: try: