From 7bc363f2fa98bf07dc089edec386e017513acc67 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 16 Aug 2019 17:01:27 +0800 Subject: [PATCH] Remove initiator in `Mplex` Besides, fix the wrong passed `multi_addr` to `mplex_stream`. --- libp2p/network/swarm.py | 1 + libp2p/stream_muxer/abc.py | 1 + libp2p/stream_muxer/mplex/mplex.py | 11 +++++++---- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 9dd1c15..0bf91a3 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -145,6 +145,7 @@ class Swarm(INetwork): # Use muxed conn to open stream, which returns # a muxed stream # TODO: Remove protocol id from being passed into muxed_conn + # FIXME: Remove multiaddr from being passed into muxed_conn muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) # Perform protocol muxing to determine protocol to use diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 7084d7a..26ad509 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -62,6 +62,7 @@ class IMuxedConn(ABC): Read a message from `stream_id`'s buffer, non-blockingly. """ + # FIXME: Remove multiaddr from being passed into muxed_conn @abstractmethod async def open_stream( self, protocol_id: str, multi_addr: Multiaddr diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index aa64b69..2aa59c9 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -25,7 +25,6 @@ class Mplex(IMuxedConn): secured_conn: ISecureConn raw_conn: IRawConnection - initiator: bool peer_id: ID # TODO: `dataIn` in go implementation. Should be size of 8. # TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies @@ -41,13 +40,12 @@ class Mplex(IMuxedConn): ) -> None: """ create a new muxed connection - :param conn: an instance of raw connection + :param secured_conn: an instance of ``ISecureConn`` :param generic_protocol_handler: generic protocol handler for new muxed streams :param peer_id: peer_id of peer the connection is to """ self.conn = secured_conn - self.initiator = secured_conn.initiator # Store generic protocol handler self.generic_protocol_handler = generic_protocol_handler @@ -63,6 +61,10 @@ class Mplex(IMuxedConn): # Kick off reading asyncio.ensure_future(self.handle_incoming()) + @property + def initiator(self) -> bool: + return self.conn.initiator + def close(self) -> None: """ close the stream muxer and underlying raw connection @@ -98,6 +100,7 @@ class Mplex(IMuxedConn): return None return await self.buffers[stream_id].get() + # FIXME: Remove multiaddr from being passed into muxed_conn async def open_stream( self, protocol_id: str, multi_addr: Multiaddr ) -> IMuxedStream: @@ -108,7 +111,7 @@ class Mplex(IMuxedConn): :return: a new stream """ stream_id = self.conn.next_stream_id() - stream = MplexStream(stream_id, multi_addr, self) + stream = MplexStream(stream_id, True, self) self.buffers[stream_id] = asyncio.Queue() await self.send_message(HeaderTags.NewStream, None, stream_id) return stream