Swarm: change notify_xxx back to async func

This commit is contained in:
mhchia 2020-02-04 22:56:13 +08:00
parent 13930ae718
commit 12cb0d9ac4
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
2 changed files with 29 additions and 23 deletions

View File

@ -52,7 +52,7 @@ class SwarmConn(INetConn):
# before we cancel the stream handler tasks. # before we cancel the stream handler tasks.
await trio.sleep(0.1) await trio.sleep(0.1)
self._notify_disconnected() await self._notify_disconnected()
async def _handle_new_streams(self) -> None: async def _handle_new_streams(self) -> None:
self.event_started.set() self.event_started.set()
@ -67,7 +67,7 @@ class SwarmConn(INetConn):
nursery.start_soon(self._handle_muxed_stream, stream) nursery.start_soon(self._handle_muxed_stream, stream)
async def _handle_muxed_stream(self, muxed_stream: IMuxedStream) -> None: async def _handle_muxed_stream(self, muxed_stream: IMuxedStream) -> None:
net_stream = self._add_stream(muxed_stream) net_stream = await self._add_stream(muxed_stream)
try: try:
# Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427 # Ignore type here since mypy complains: https://github.com/python/mypy/issues/2427
await self.swarm.common_stream_handler(net_stream) # type: ignore await self.swarm.common_stream_handler(net_stream) # type: ignore
@ -75,21 +75,21 @@ class SwarmConn(INetConn):
# As long as `common_stream_handler`, remove the stream. # As long as `common_stream_handler`, remove the stream.
self.remove_stream(net_stream) self.remove_stream(net_stream)
def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream: async def _add_stream(self, muxed_stream: IMuxedStream) -> NetStream:
net_stream = NetStream(muxed_stream) net_stream = NetStream(muxed_stream)
self.streams.add(net_stream) self.streams.add(net_stream)
self.swarm.notify_opened_stream(net_stream) await self.swarm.notify_opened_stream(net_stream)
return net_stream return net_stream
def _notify_disconnected(self) -> None: async def _notify_disconnected(self) -> None:
self.swarm.notify_disconnected(self) await self.swarm.notify_disconnected(self)
async def start(self) -> None: async def start(self) -> None:
await self._handle_new_streams() await self._handle_new_streams()
async def new_stream(self) -> NetStream: async def new_stream(self) -> NetStream:
muxed_stream = await self.muxed_conn.open_stream() muxed_stream = await self.muxed_conn.open_stream()
return self._add_stream(muxed_stream) return await self._add_stream(muxed_stream)
def get_streams(self) -> Tuple[NetStream, ...]: def get_streams(self) -> Tuple[NetStream, ...]:
return tuple(self.streams) return tuple(self.streams)

View File

@ -274,7 +274,7 @@ class Swarm(Service, INetworkService):
await listener.listen(maddr, self.listener_nursery) await listener.listen(maddr, self.listener_nursery)
# Call notifiers since event occurred # Call notifiers since event occurred
self.notify_listen(maddr) await self.notify_listen(maddr)
return True return True
except IOError: except IOError:
@ -310,7 +310,7 @@ class Swarm(Service, INetworkService):
# Store muxed_conn with peer id # Store muxed_conn with peer id
self.connections[muxed_conn.peer_id] = swarm_conn self.connections[muxed_conn.peer_id] = swarm_conn
# Call notifiers since event occurred # Call notifiers since event occurred
self.notify_connected(swarm_conn) await self.notify_connected(swarm_conn)
return swarm_conn return swarm_conn
def remove_conn(self, swarm_conn: SwarmConn) -> None: def remove_conn(self, swarm_conn: SwarmConn) -> None:
@ -330,22 +330,28 @@ class Swarm(Service, INetworkService):
""" """
self.notifees.append(notifee) self.notifees.append(notifee)
def notify_opened_stream(self, stream: INetStream) -> None: async def notify_opened_stream(self, stream: INetStream) -> None:
async with trio.open_nursery() as nursery:
for notifee in self.notifees: for notifee in self.notifees:
self.manager.run_task(notifee.opened_stream, self, stream) nursery.start_soon(notifee.opened_stream, self, stream)
# TODO: `notify_closed_stream` async def notify_connected(self, conn: INetConn) -> None:
async with trio.open_nursery() as nursery:
def notify_connected(self, conn: INetConn) -> None:
for notifee in self.notifees: for notifee in self.notifees:
self.manager.run_task(notifee.connected, self, conn) nursery.start_soon(notifee.connected, self, conn)
def notify_disconnected(self, conn: INetConn) -> None: async def notify_disconnected(self, conn: INetConn) -> None:
async with trio.open_nursery() as nursery:
for notifee in self.notifees: for notifee in self.notifees:
self.manager.run_task(notifee.disconnected, self, conn) nursery.start_soon(notifee.disconnected, self, conn)
def notify_listen(self, multiaddr: Multiaddr) -> None: async def notify_listen(self, multiaddr: Multiaddr) -> None:
async with trio.open_nursery() as nursery:
for notifee in self.notifees: for notifee in self.notifees:
self.manager.run_task(notifee.listen, self, multiaddr) nursery.start_soon(notifee.listen, self, multiaddr)
# TODO: `notify_listen_close` async def notify_closed_stream(self, stream: INetStream) -> None:
raise NotImplementedError
async def notify_listen_close(self, multiaddr: Multiaddr) -> None:
raise NotImplementedError