Apply PR feedback:

handle pubsub dead peer when stream closed in gossipsub
This commit is contained in:
NIC619 2019-11-16 17:03:04 +08:00
parent ccc7879422
commit ace5ef69a8
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17
2 changed files with 8 additions and 5 deletions

View File

@ -193,8 +193,7 @@ class GossipSub(IPubsubRouter):
await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString())) await stream.write(encode_varint_prefixed(rpc_msg.SerializeToString()))
except StreamClosed: except StreamClosed:
logger.debug("Fail to publish message to %s: stream closed", peer_id) logger.debug("Fail to publish message to %s: stream closed", peer_id)
# TODO: also remove peer info from pubsub self.pubsub._handle_dead_peer(peer_id)
self.remove_peer(peer_id)
def _get_peers_to_send( def _get_peers_to_send(
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
@ -521,7 +520,11 @@ class GossipSub(IPubsubRouter):
try: try:
await peer_stream.write(encode_varint_prefixed(rpc_msg)) await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed: except StreamClosed:
logger.debug("Fail to responed to iwant request from %s: stream closed", sender_peer_id) logger.debug(
"Fail to responed to iwant request from %s: stream closed",
sender_peer_id,
)
self.pubsub._handle_dead_peer(sender_peer_id)
async def handle_graft( async def handle_graft(
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
@ -609,3 +612,4 @@ class GossipSub(IPubsubRouter):
await peer_stream.write(encode_varint_prefixed(rpc_msg)) await peer_stream.write(encode_varint_prefixed(rpc_msg))
except StreamClosed: except StreamClosed:
logger.debug("Fail to emit control message to %s: stream closed", to_peer) logger.debug("Fail to emit control message to %s: stream closed", to_peer)
self.pubsub._handle_dead_peer(to_peer)

View File

@ -440,8 +440,7 @@ class Pubsub:
except StreamClosed: except StreamClosed:
peer_id = stream.muxed_conn.peer_id peer_id = stream.muxed_conn.peer_id
logger.debug("Fail to message peer %s: stream closed", peer_id) logger.debug("Fail to message peer %s: stream closed", peer_id)
del self.peers[peer_id] self._handle_dead_peer(peer_id)
self.router.remove_peer(peer_id)
async def publish(self, topic_id: str, data: bytes) -> None: async def publish(self, topic_id: str, data: bytes) -> None:
""" """