Merge pull request #351 from NIC619/handle_stream_io_error

Handle stream io error
This commit is contained in:
NIC Lin 2019-11-17 15:09:45 +08:00 committed by GitHub
commit 64c49f809f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 53 additions and 13 deletions

View File

@ -1,7 +1,7 @@
import asyncio import asyncio
import logging import logging
from libp2p.network.stream.exceptions import StreamEOF, StreamReset from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset
from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.id import ID as PeerID from libp2p.peer.id import ID as PeerID
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
@ -35,7 +35,11 @@ async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool:
logger.debug("Received ping from %s with data: 0x%s", peer_id, payload.hex()) logger.debug("Received ping from %s with data: 0x%s", peer_id, payload.hex())
try:
await stream.write(payload) await stream.write(payload)
except StreamClosed:
logger.debug("Fail to respond to ping from %s: stream closed", peer_id)
raise
return True return True

View File

@ -3,6 +3,7 @@ import logging
from multiaddr import Multiaddr from multiaddr import Multiaddr
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.typing import StreamHandlerFn, TProtocol from libp2p.typing import StreamHandlerFn, TProtocol
@ -43,7 +44,11 @@ def identify_handler_for(host: IHost) -> StreamHandlerFn:
protobuf = _mk_identify_protobuf(host) protobuf = _mk_identify_protobuf(host)
response = protobuf.SerializeToString() response = protobuf.SerializeToString()
try:
await stream.write(response) await stream.write(response)
except StreamClosed:
logger.debug("Fail to respond to %s request: stream closed", ID)
else:
await stream.close() await stream.close()
logger.debug("successfully handled request for %s from %s", ID, peer_id) logger.debug("successfully handled request for %s from %s", ID, peer_id)

View File

@ -1,6 +1,7 @@
import logging import logging
from typing import Iterable, List, Sequence from typing import Iterable, List, Sequence
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed from libp2p.utils import encode_varint_prefixed
@ -89,7 +90,10 @@ class FloodSub(IPubsubRouter):
stream = self.pubsub.peers[peer_id] stream = self.pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages. # FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
async def join(self, topic: str) -> None: async def join(self, topic: str) -> None:
""" """

View File

@ -4,6 +4,7 @@ import logging
import random import random
from typing import Any, Dict, Iterable, List, Sequence, Set from typing import Any, Dict, Iterable, List, Sequence, Set
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub import floodsub from libp2p.pubsub import floodsub
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
@ -188,7 +189,11 @@ class GossipSub(IPubsubRouter):
# FIXME: We should add a `WriteMsg` similar to write delimited messages. # FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
self.pubsub._handle_dead_peer(peer_id)
def _get_peers_to_send( def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
@ -512,7 +517,14 @@ class GossipSub(IPubsubRouter):
peer_stream = self.pubsub.peers[sender_peer_id] peer_stream = self.pubsub.peers[sender_peer_id]
# 4) And write the packet to the stream # 4) And write the packet to the stream
try:
await peer_stream.write(encode_varint_prefixed(rpc_msg)) await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed:
logger.debug(
"Fail to responed to iwant request from %s: stream closed",
sender_peer_id,
)
self.pubsub._handle_dead_peer(sender_peer_id)
async def handle_graft( async def handle_graft(
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
@ -596,4 +608,8 @@ class GossipSub(IPubsubRouter):
peer_stream = self.pubsub.peers[to_peer] peer_stream = self.pubsub.peers[to_peer]
# Write rpc to stream # Write rpc to stream
try:
await peer_stream.write(encode_varint_prefixed(rpc_msg)) await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed:
logger.debug("Fail to emit control message to %s: stream closed", to_peer)
self.pubsub._handle_dead_peer(to_peer)

View File

@ -20,7 +20,7 @@ from libp2p.exceptions import ParseError, ValidationError
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.io.exceptions import IncompleteReadError from libp2p.io.exceptions import IncompleteReadError
from libp2p.network.exceptions import SwarmException from libp2p.network.exceptions import SwarmException
from libp2p.network.stream.exceptions import StreamEOF, StreamReset from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset
from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
@ -279,13 +279,19 @@ class Pubsub:
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
try:
await stream.write(encode_varint_prefixed(hello.SerializeToString())) await stream.write(encode_varint_prefixed(hello.SerializeToString()))
except StreamClosed:
logger.debug("Fail to add new peer %s: stream closed", peer_id)
del self.peers[peer_id]
return
# TODO: Check EOF of this stream. # TODO: Check EOF of this stream.
# TODO: Check if the peer in black list. # TODO: Check if the peer in black list.
try: try:
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
except Exception as error: except Exception as error:
logger.debug("fail to add new peer %s, error %s", peer_id, error) logger.debug("fail to add new peer %s, error %s", peer_id, error)
del self.peers[peer_id]
return return
logger.debug("added new peer %s", peer_id) logger.debug("added new peer %s", peer_id)
@ -429,7 +435,12 @@ class Pubsub:
# Broadcast message # Broadcast message
for stream in self.peers.values(): for stream in self.peers.values():
# Write message to stream # Write message to stream
try:
await stream.write(encode_varint_prefixed(raw_msg)) await stream.write(encode_varint_prefixed(raw_msg))
except StreamClosed:
peer_id = stream.muxed_conn.peer_id
logger.debug("Fail to message peer %s: stream closed", peer_id)
self._handle_dead_peer(peer_id)
async def publish(self, topic_id: str, data: bytes) -> None: async def publish(self, topic_id: str, data: bytes) -> None:
""" """

View File

@ -58,7 +58,7 @@ setuptools.setup(
+ py_classifiers, + py_classifiers,
python_requires=">=3.7,<4", python_requires=">=3.7,<4",
install_requires=[ install_requires=[
"pycryptodome>=3.8.2,<4.0.0", "pycryptodome>=3.9.2,<4.0.0",
"base58>=1.0.3,<2.0.0", "base58>=1.0.3,<2.0.0",
"pymultihash>=0.8.2", "pymultihash>=0.8.2",
"multiaddr>=0.0.8,<0.1.0", "multiaddr>=0.0.8,<0.1.0",