From e3a1dd62e40dc5807f671c9698f6532a7aaa2a98 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sun, 26 Jan 2020 23:56:19 +0800 Subject: [PATCH] Use new type hinting for trio channel --- libp2p/pubsub/pubsub.py | 14 +++----------- libp2p/stream_muxer/mplex/mplex.py | 9 ++------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 3df0ac2..379361d 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -71,7 +71,6 @@ class Pubsub(IPubsub, Service): seen_messages: LRU - # TODO: Implement `trio.abc.Channel`? subscribed_topics_send: Dict[str, "trio.MemorySendChannel[rpc_pb2.Message]"] subscribed_topics_receive: Dict[str, "TrioSubscriptionAPI"] @@ -112,12 +111,8 @@ class Pubsub(IPubsub, Service): # Attach this new Pubsub object to the router self.router.attach(self) - peer_channels: Tuple[ - "trio.MemorySendChannel[ID]", "trio.MemoryReceiveChannel[ID]" - ] = trio.open_memory_channel(0) - dead_peer_channels: Tuple[ - "trio.MemorySendChannel[ID]", "trio.MemoryReceiveChannel[ID]" - ] = trio.open_memory_channel(0) + peer_channels = trio.open_memory_channel[ID](0) + dead_peer_channels = trio.open_memory_channel[ID](0) # Only keep the receive channels in `Pubsub`. # Therefore, we can only close from the receive side. self.peer_receive_channel = peer_channels[1] @@ -404,10 +399,7 @@ class Pubsub(IPubsub, Service): if topic_id in self.topic_ids: return self.subscribed_topics_receive[topic_id] - channels: Tuple[ - "trio.MemorySendChannel[rpc_pb2.Message]", - "trio.MemoryReceiveChannel[rpc_pb2.Message]", - ] = trio.open_memory_channel(math.inf) + channels = trio.open_memory_channel[rpc_pb2.Message](math.inf) send_channel, receive_channel = channels subscription = TrioSubscriptionAPI(receive_channel) self.subscribed_topics_send[topic_id] = send_channel diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 486fd3f..6f5f3fd 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -66,10 +66,7 @@ class Mplex(IMuxedConn): self.streams = {} self.streams_lock = trio.Lock() self.streams_msg_channels = {} - channels: Tuple[ - "trio.MemorySendChannel[IMuxedStream]", - "trio.MemoryReceiveChannel[IMuxedStream]", - ] = trio.open_memory_channel(math.inf) + channels = trio.open_memory_channel[IMuxedStream](math.inf) self.new_stream_send_channel, self.new_stream_receive_channel = channels self.event_shutting_down = trio.Event() self.event_closed = trio.Event() @@ -114,9 +111,7 @@ class Mplex(IMuxedConn): async def _initialize_stream(self, stream_id: StreamID, name: str) -> MplexStream: # Use an unbounded buffer, to avoid `handle_incoming` being blocked when doing # `send_channel.send`. - channels: Tuple[ - "trio.MemorySendChannel[bytes]", "trio.MemoryReceiveChannel[bytes]" - ] = trio.open_memory_channel(math.inf) + channels = trio.open_memory_channel[bytes](math.inf) stream = MplexStream(name, stream_id, self, channels[1]) async with self.streams_lock: self.streams[stream_id] = stream