From 3a0c7d06d1b5f5114aa1b082017c32e3e1973930 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sun, 3 Nov 2019 12:31:20 +0800 Subject: [PATCH 01/11] Update comment for `connection.close()` --- libp2p/network/swarm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index a697d7d..7bb40ce 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -248,8 +248,8 @@ class Swarm(INetwork): # TODO: Should be changed to close multisple connections, # if we have several connections per peer in the future. connection = self.connections[peer_id] - # NOTE: `connection.close` performs `del self.connections[peer_id]` for us, - # so we don't need to remove the entry here. + # NOTE: `connection.close` will perform `del self.connections[peer_id]` + # and `notify_disconnected` for us. await connection.close() logger.debug("successfully close the connection to peer %s", peer_id) From c6c9393f2bf65ab07f01ee9bd2966961060603bb Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 14:22:24 +0800 Subject: [PATCH 02/11] Add `dead_peer_queue` to pubsub --- libp2p/pubsub/pubsub.py | 6 +++++- libp2p/pubsub/pubsub_notifee.py | 10 +++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7be2d28..5c71757 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -60,6 +60,7 @@ class Pubsub: router: "IPubsubRouter" peer_queue: "asyncio.Queue[ID]" + dead_peer_queue: "asyncio.Queue[ID]" protocols: List[TProtocol] @@ -100,7 +101,10 @@ class Pubsub: # Register a notifee self.peer_queue = asyncio.Queue() - self.host.get_network().register_notifee(PubsubNotifee(self.peer_queue)) + self.dead_peer_queue = asyncio.Queue() + self.host.get_network().register_notifee( + PubsubNotifee(self.peer_queue, self.dead_peer_queue) + ) # Register stream handlers for each pubsub router protocol to handle # the pubsub streams opened on those protocols diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 19be612..2553124 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -15,13 +15,21 @@ if TYPE_CHECKING: class PubsubNotifee(INotifee): initiator_peers_queue: "asyncio.Queue[ID]" + dead_peers_queue: "asyncio.Queue[ID]" - def __init__(self, initiator_peers_queue: "asyncio.Queue[ID]") -> None: + def __init__( + self, + initiator_peers_queue: "asyncio.Queue[ID]", + dead_peers_queue: "asyncio.Queue[ID]", + ) -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them + :param dead_peers_queue: queue to add dead peers to so that pubsub + can process dead peers after we disconnect from each other """ self.initiator_peers_queue = initiator_peers_queue + self.dead_peers_queue = dead_peers_queue async def opened_stream(self, network: INetwork, stream: INetStream) -> None: pass From 4b15cb1af5f5e50f641ac7964e7e2a41551721e8 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 14:23:00 +0800 Subject: [PATCH 03/11] Implement `PubsubNotifee.disconnected` --- libp2p/pubsub/pubsub_notifee.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 2553124..6afa9ad 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -49,7 +49,14 @@ class PubsubNotifee(INotifee): await self.initiator_peers_queue.put(conn.muxed_conn.peer_id) async def disconnected(self, network: INetwork, conn: INetConn) -> None: - pass + """ + Add peer_id to dead_peers_queue, so that pubsub and its router can + remove this peer_id and close the stream inbetween. + + :param network: network the connection was opened on + :param conn: connection that was opened + """ + await self.dead_peers_queue.put(conn.muxed_conn.peer_id) async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None: pass From 84f5210220ffb581cb788b32fa93514a5b064276 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 14:23:42 +0800 Subject: [PATCH 04/11] Implement `handle_dead_peer_queue` --- libp2p/pubsub/pubsub.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5c71757..3aeb10a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -281,6 +281,17 @@ class Pubsub: logger.debug("added new peer %s", peer_id) + async def _handle_dead_peer(self, peer_id: ID) -> None: + del self.peers[peer_id] + + for topic in self.peer_topics: + if peer_id in self.peer_topics[topic]: + self.peer_topics[topic].remove(peer_id) + + self.router.remove_peer(peer_id) + + logger.debug("removed dead peer %s", peer_id) + async def handle_peer_queue(self) -> None: """ Continuously read from peer queue and each time a new peer is found, @@ -298,6 +309,21 @@ class Pubsub: # Force context switch await asyncio.sleep(0) + async def handle_dead_peer_queue(self) -> None: + """ + Continuously read from dead peer queue and close the stream between that peer and + remove peer info from pubsub and pubsub router. + """ + while True: + + peer_id: ID = await self.dead_peer_queue.get() + + # Remove Peer + asyncio.ensure_future(self._handle_dead_peer(peer_id)) + + # Force context switch + await asyncio.sleep(0) + def handle_subscription( self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts ) -> None: From a8d9536b086d1fcef87d9bdee6d4dd29d2d21a1f Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 18:26:38 +0800 Subject: [PATCH 05/11] Spin up `handle_dead_peer_queue` task --- libp2p/pubsub/pubsub.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3aeb10a..5e51877 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -141,6 +141,7 @@ class Pubsub: # Call handle peer to keep waiting for updates to peer queue asyncio.ensure_future(self.handle_peer_queue()) + asyncio.ensure_future(self.handle_dead_peer_queue()) def get_hello_packet(self) -> rpc_pb2.RPC: """Generate subscription message with all topics we are subscribed to From 97b3aca535e4d38b7bbed99267011327bf624ba8 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 18:27:31 +0800 Subject: [PATCH 06/11] Fix: Force context switch before canceling swarm connection tasks --- libp2p/network/connection/swarm_connection.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libp2p/network/connection/swarm_connection.py b/libp2p/network/connection/swarm_connection.py index e25d75f..29d544e 100644 --- a/libp2p/network/connection/swarm_connection.py +++ b/libp2p/network/connection/swarm_connection.py @@ -43,6 +43,9 @@ class SwarmConn(INetConn): # We *could* optimize this but it really isn't worth it. for stream in self.streams: await stream.reset() + # Force context switch for stream handlers to process the stream reset event we just emit + # before we cancel the stream handler tasks. + await asyncio.sleep(0.1) for task in self._tasks: task.cancel() From d36e32370354c81095fd2fb291907cefe13aa8bf Mon Sep 17 00:00:00 2001 From: NIC619 Date: Mon, 4 Nov 2019 21:17:54 +0800 Subject: [PATCH 07/11] Update error handling of pubsub stream handler --- libp2p/pubsub/pubsub.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 5e51877..dd0bed3 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -163,13 +163,7 @@ class Pubsub: peer_id = stream.muxed_conn.peer_id while True: - try: - incoming: bytes = await read_varint_prefixed_bytes(stream) - except (ParseError, IncompleteReadError) as error: - logger.debug( - "read corrupted data from peer %s, error=%s", peer_id, error - ) - continue + incoming: bytes = await read_varint_prefixed_bytes(stream) rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: @@ -252,13 +246,18 @@ class Pubsub: :param stream: newly created stream """ + peer_id = stream.muxed_conn.peer_id + + # Error handling pattern reference: + # https://github.com/libp2p/go-libp2p-pubsub/blob/534fe2f382d8dd75dab89ddb0760542546c9f24e/comm.go#L38-L46 # noqa: E501 try: await self.continuously_read_stream(stream) - except (StreamEOF, StreamReset) as error: - logger.debug("fail to read from stream, error=%s", error) - await stream.reset() - # TODO: what to do when the stream is terminated? - # disconnect the peer? + except StreamEOF as error: + stream.close() + logger.debug("fail to read from peer %s, error=%s", peer_id, error) + except (ParseError, IncompleteReadError, StreamReset) as error: + stream.reset() + logger.debug("read corrupted data from peer %s, error=%s", peer_id, error) async def _handle_new_peer(self, peer_id: ID) -> None: try: From eeb87848affbd6886a5d2b382ed287d39d2a56e8 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 5 Nov 2019 14:27:06 +0800 Subject: [PATCH 08/11] Apply PR feedback: - fix await stream close/reset - make `_handle_dead_peer` a sync function --- libp2p/pubsub/pubsub.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index dd0bed3..7a453e7 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -253,10 +253,10 @@ class Pubsub: try: await self.continuously_read_stream(stream) except StreamEOF as error: - stream.close() + await stream.close() logger.debug("fail to read from peer %s, error=%s", peer_id, error) except (ParseError, IncompleteReadError, StreamReset) as error: - stream.reset() + await stream.reset() logger.debug("read corrupted data from peer %s, error=%s", peer_id, error) async def _handle_new_peer(self, peer_id: ID) -> None: @@ -281,7 +281,7 @@ class Pubsub: logger.debug("added new peer %s", peer_id) - async def _handle_dead_peer(self, peer_id: ID) -> None: + def _handle_dead_peer(self, peer_id: ID) -> None: del self.peers[peer_id] for topic in self.peer_topics: @@ -300,14 +300,9 @@ class Pubsub: pubsub protocols we support """ while True: - peer_id: ID = await self.peer_queue.get() - # Add Peer - asyncio.ensure_future(self._handle_new_peer(peer_id)) - # Force context switch - await asyncio.sleep(0) async def handle_dead_peer_queue(self) -> None: """ @@ -315,14 +310,9 @@ class Pubsub: remove peer info from pubsub and pubsub router. """ while True: - peer_id: ID = await self.dead_peer_queue.get() - # Remove Peer - asyncio.ensure_future(self._handle_dead_peer(peer_id)) - - # Force context switch - await asyncio.sleep(0) + self._handle_dead_peer(peer_id) def handle_subscription( self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts From 93ef36bd8627e6fe43dd5e29ebb8eb778bf21fb8 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 5 Nov 2019 15:09:53 +0800 Subject: [PATCH 09/11] Clean up peer record if pubsub stream fail --- libp2p/pubsub/pubsub.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7a453e7..54de625 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -248,16 +248,17 @@ class Pubsub: """ peer_id = stream.muxed_conn.peer_id - # Error handling pattern reference: - # https://github.com/libp2p/go-libp2p-pubsub/blob/534fe2f382d8dd75dab89ddb0760542546c9f24e/comm.go#L38-L46 # noqa: E501 try: await self.continuously_read_stream(stream) - except StreamEOF as error: - await stream.close() - logger.debug("fail to read from peer %s, error=%s", peer_id, error) - except (ParseError, IncompleteReadError, StreamReset) as error: + except (StreamEOF, StreamReset, ParseError, IncompleteReadError) as error: + logger.debug( + "fail to read from peer %s, error=%s," + "closing the stream and remove the peer from record", + peer_id, + error, + ) await stream.reset() - logger.debug("read corrupted data from peer %s, error=%s", peer_id, error) + self._handle_dead_peer(peer_id) async def _handle_new_peer(self, peer_id: ID) -> None: try: @@ -282,6 +283,8 @@ class Pubsub: logger.debug("added new peer %s", peer_id) def _handle_dead_peer(self, peer_id: ID) -> None: + if peer_id not in self.peers: + return del self.peers[peer_id] for topic in self.peer_topics: From 5dfa29a0df32953c841c5568797f718025c1edf7 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 5 Nov 2019 15:22:31 +0800 Subject: [PATCH 10/11] Track tasks created in pubsub and add `close()` --- libp2p/pubsub/pubsub.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 54de625..bbfcd41 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -79,6 +79,8 @@ class Pubsub: # TODO: Be sure it is increased atomically everytime. counter: int # uint64 + _tasks: List["asyncio.Future[Any]"] + def __init__( self, host: IHost, router: "IPubsubRouter", my_id: ID, cache_size: int = None ) -> None: @@ -139,9 +141,10 @@ class Pubsub: self.counter = time.time_ns() + self._tasks = [] # Call handle peer to keep waiting for updates to peer queue - asyncio.ensure_future(self.handle_peer_queue()) - asyncio.ensure_future(self.handle_dead_peer_queue()) + self._tasks.append(asyncio.ensure_future(self.handle_peer_queue())) + self._tasks.append(asyncio.ensure_future(self.handle_dead_peer_queue())) def get_hello_packet(self) -> rpc_pb2.RPC: """Generate subscription message with all topics we are subscribed to @@ -174,7 +177,7 @@ class Pubsub: logger.debug( "received `publish` message %s from peer %s", msg, peer_id ) - asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) + self._tasks.append(asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg))) if rpc_incoming.subscriptions: # deal with RPC.subscriptions @@ -305,7 +308,7 @@ class Pubsub: while True: peer_id: ID = await self.peer_queue.get() # Add Peer - asyncio.ensure_future(self._handle_new_peer(peer_id)) + self._tasks.append(asyncio.ensure_future(self._handle_new_peer(peer_id))) async def handle_dead_peer_queue(self) -> None: """ @@ -537,3 +540,11 @@ class Pubsub: if not self.my_topics: return False return any(topic in self.my_topics for topic in msg.topicIDs) + + async def close(self) -> None: + for task in self._tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass From b8c7f0cfff7a22a1e528f1434972b5d1a7d45873 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Sat, 9 Nov 2019 23:55:35 +0800 Subject: [PATCH 11/11] Fix lint --- libp2p/pubsub/pubsub.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index bbfcd41..d5e0267 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -32,6 +32,7 @@ from .validators import signature_validator if TYPE_CHECKING: from .pubsub_router_interface import IPubsubRouter # noqa: F401 + from typing import Any # noqa: F401 logger = logging.getLogger("libp2p.pubsub") @@ -177,7 +178,11 @@ class Pubsub: logger.debug( "received `publish` message %s from peer %s", msg, peer_id ) - self._tasks.append(asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg))) + self._tasks.append( + asyncio.ensure_future( + self.push_msg(msg_forwarder=peer_id, msg=msg) + ) + ) if rpc_incoming.subscriptions: # deal with RPC.subscriptions @@ -311,10 +316,8 @@ class Pubsub: self._tasks.append(asyncio.ensure_future(self._handle_new_peer(peer_id))) async def handle_dead_peer_queue(self) -> None: - """ - Continuously read from dead peer queue and close the stream between that peer and - remove peer info from pubsub and pubsub router. - """ + """Continuously read from dead peer queue and close the stream between + that peer and remove peer info from pubsub and pubsub router.""" while True: peer_id: ID = await self.dead_peer_queue.get() # Remove Peer