Handle StreamClosed in pub/gossip/flood-sub

This commit is contained in:
NIC619 2019-11-12 18:10:41 +08:00
parent c4f9ce6bb3
commit 9be9b4bbfc
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17
3 changed files with 27 additions and 5 deletions

View File

@ -1,6 +1,7 @@
import logging import logging
from typing import Iterable, List, Sequence from typing import Iterable, List, Sequence
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from libp2p.utils import encode_varint_prefixed from libp2p.utils import encode_varint_prefixed
@ -89,7 +90,10 @@ class FloodSub(IPubsubRouter):
stream = self.pubsub.peers[peer_id] stream = self.pubsub.peers[peer_id]
# FIXME: We should add a `WriteMsg` similar to write delimited messages. # FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
async def join(self, topic: str) -> None: async def join(self, topic: str) -> None:
""" """

View File

@ -4,6 +4,7 @@ import logging
import random import random
from typing import Any, Dict, Iterable, List, Sequence, Set from typing import Any, Dict, Iterable, List, Sequence, Set
from libp2p.network.stream.exceptions import StreamClosed
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub import floodsub from libp2p.pubsub import floodsub
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
@ -188,7 +189,12 @@ class GossipSub(IPubsubRouter):
# FIXME: We should add a `WriteMsg` similar to write delimited messages. # FIXME: We should add a `WriteMsg` similar to write delimited messages.
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages. # TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) try:
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id)
# TODO: also remove peer info from pubsub
self.remove_peer(peer_id)
def _get_peers_to_send( def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID

View File

@ -20,7 +20,7 @@ from libp2p.exceptions import ParseError, ValidationError
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.io.exceptions import IncompleteReadError from libp2p.io.exceptions import IncompleteReadError
from libp2p.network.exceptions import SwarmException from libp2p.network.exceptions import SwarmException
from libp2p.network.stream.exceptions import StreamEOF, StreamReset from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset
from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
@ -279,13 +279,19 @@ class Pubsub:
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(encode_varint_prefixed(hello.SerializeToString())) try:
await stream.write(encode_varint_prefixed(hello.SerializeToString()))
except StreamClosed:
logger.debug("Fail to add new peer %s: stream closed", peer_id)
del self.peers[peer_id]
return
# TODO: Check EOF of this stream. # TODO: Check EOF of this stream.
# TODO: Check if the peer in black list. # TODO: Check if the peer in black list.
try: try:
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
except Exception as error: except Exception as error:
logger.debug("fail to add new peer %s, error %s", peer_id, error) logger.debug("fail to add new peer %s, error %s", peer_id, error)
del self.peers[peer_id]
return return
logger.debug("added new peer %s", peer_id) logger.debug("added new peer %s", peer_id)
@ -429,7 +435,13 @@ class Pubsub:
# Broadcast message # Broadcast message
for stream in self.peers.values(): for stream in self.peers.values():
# Write message to stream # Write message to stream
await stream.write(encode_varint_prefixed(raw_msg)) try:
await stream.write(encode_varint_prefixed(raw_msg))
except StreamClosed:
peer_id = stream.muxed_conn.peer_id
logger.debug("Fail to message peer %s: stream closed", peer_id)
del self.peers[peer_id]
self.router.remove_peer(peer_id)
async def publish(self, topic_id: str, data: bytes) -> None: async def publish(self, topic_id: str, data: bytes) -> None:
""" """