diff --git a/libp2p/network/connection/swarm_connection.py b/libp2p/network/connection/swarm_connection.py index a4fc8be..90cb823 100644 --- a/libp2p/network/connection/swarm_connection.py +++ b/libp2p/network/connection/swarm_connection.py @@ -54,7 +54,7 @@ class SwarmConn(INetConn, Service): # before we cancel the stream handler tasks. await trio.sleep(0.1) - await self._notify_disconnected() + self._notify_disconnected() async def _handle_new_streams(self) -> None: while self.manager.is_running: @@ -68,7 +68,7 @@ class SwarmConn(INetConn, Service): await self.close() async def _handle_muxed_stream(self, muxed_stream: IMuxedStream) -> None: - net_stream = await self._add_stream(muxed_stream) + net_stream = self._add_stream(muxed_stream) if self.swarm.common_stream_handler is not None: try: await self.swarm.common_stream_handler(net_stream) @@ -78,14 +78,14 @@ class SwarmConn(INetConn, Service): # TODO: Clean up and remove the stream from SwarmConn if there is anything wrong. self.remove_stream(net_stream) - async def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream: + def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream: net_stream = NetStream(muxed_stream) self.streams.add(net_stream) - await self.swarm.notify_opened_stream(net_stream) + self.swarm.notify_opened_stream(net_stream) return net_stream - async def _notify_disconnected(self) -> None: - await self.swarm.notify_disconnected(self) + def _notify_disconnected(self) -> None: + self.swarm.notify_disconnected(self) async def run(self) -> None: self.manager.run_task(self._handle_new_streams) @@ -93,7 +93,7 @@ class SwarmConn(INetConn, Service): async def new_stream(self) -> NetStream: muxed_stream = await self.muxed_conn.open_stream() - return await self._add_stream(muxed_stream) + return self._add_stream(muxed_stream) async def get_streams(self) -> Tuple[NetStream, ...]: return tuple(self.streams) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 9d19717..0904774 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -250,7 +250,7 @@ class Swarm(INetworkService): await listener.listen(maddr, self.manager._task_nursery) # type: ignore # Call notifiers since event occurred - await self.notify_listen(maddr) + self.notify_listen(maddr) return True except IOError: @@ -297,7 +297,7 @@ class Swarm(INetworkService): # Store muxed_conn with peer id self.connections[muxed_conn.peer_id] = swarm_conn # Call notifiers since event occurred - self.manager.run_task(self.notify_connected, swarm_conn) + self.notify_connected(swarm_conn) await manager.wait_started() return swarm_conn @@ -320,27 +320,22 @@ class Swarm(INetworkService): """ self.notifees.append(notifee) - # TODO: Use `run_task`. - async def notify_opened_stream(self, stream: INetStream) -> None: - async with trio.open_nursery() as nursery: - for notifee in self.notifees: - nursery.start_soon(notifee.opened_stream, self, stream) + def notify_opened_stream(self, stream: INetStream) -> None: + for notifee in self.notifees: + self.manager.run_task(notifee.opened_stream, self, stream) # TODO: `notify_closed_stream` - async def notify_connected(self, conn: INetConn) -> None: - async with trio.open_nursery() as nursery: - for notifee in self.notifees: - nursery.start_soon(notifee.connected, self, conn) + def notify_connected(self, conn: INetConn) -> None: + for notifee in self.notifees: + self.manager.run_task(notifee.connected, self, conn) - async def notify_disconnected(self, conn: INetConn) -> None: - async with trio.open_nursery() as nursery: - for notifee in self.notifees: - nursery.start_soon(notifee.disconnected, self, conn) + def notify_disconnected(self, conn: INetConn) -> None: + for notifee in self.notifees: + self.manager.run_task(notifee.disconnected, self, conn) - async def notify_listen(self, multiaddr: Multiaddr) -> None: - async with trio.open_nursery() as nursery: - for notifee in self.notifees: - nursery.start_soon(notifee.listen, self, multiaddr) + def notify_listen(self, multiaddr: Multiaddr) -> None: + for notifee in self.notifees: + self.manager.run_task(notifee.listen, self, multiaddr) # TODO: `notify_listen_close`