Remove initiator in Mplex
Besides, fix the wrong passed `multi_addr` to `mplex_stream`.
This commit is contained in:
parent
8217319c28
commit
7bc363f2fa
|
@ -145,6 +145,7 @@ class Swarm(INetwork):
|
||||||
# Use muxed conn to open stream, which returns
|
# Use muxed conn to open stream, which returns
|
||||||
# a muxed stream
|
# a muxed stream
|
||||||
# TODO: Remove protocol id from being passed into muxed_conn
|
# TODO: Remove protocol id from being passed into muxed_conn
|
||||||
|
# FIXME: Remove multiaddr from being passed into muxed_conn
|
||||||
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
|
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
|
||||||
|
|
||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
|
|
|
@ -62,6 +62,7 @@ class IMuxedConn(ABC):
|
||||||
Read a message from `stream_id`'s buffer, non-blockingly.
|
Read a message from `stream_id`'s buffer, non-blockingly.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# FIXME: Remove multiaddr from being passed into muxed_conn
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
self, protocol_id: str, multi_addr: Multiaddr
|
self, protocol_id: str, multi_addr: Multiaddr
|
||||||
|
|
|
@ -25,7 +25,6 @@ class Mplex(IMuxedConn):
|
||||||
|
|
||||||
secured_conn: ISecureConn
|
secured_conn: ISecureConn
|
||||||
raw_conn: IRawConnection
|
raw_conn: IRawConnection
|
||||||
initiator: bool
|
|
||||||
peer_id: ID
|
peer_id: ID
|
||||||
# TODO: `dataIn` in go implementation. Should be size of 8.
|
# TODO: `dataIn` in go implementation. Should be size of 8.
|
||||||
# TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies
|
# TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies
|
||||||
|
@ -41,13 +40,12 @@ class Mplex(IMuxedConn):
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
create a new muxed connection
|
create a new muxed connection
|
||||||
:param conn: an instance of raw connection
|
:param secured_conn: an instance of ``ISecureConn``
|
||||||
:param generic_protocol_handler: generic protocol handler
|
:param generic_protocol_handler: generic protocol handler
|
||||||
for new muxed streams
|
for new muxed streams
|
||||||
:param peer_id: peer_id of peer the connection is to
|
:param peer_id: peer_id of peer the connection is to
|
||||||
"""
|
"""
|
||||||
self.conn = secured_conn
|
self.conn = secured_conn
|
||||||
self.initiator = secured_conn.initiator
|
|
||||||
|
|
||||||
# Store generic protocol handler
|
# Store generic protocol handler
|
||||||
self.generic_protocol_handler = generic_protocol_handler
|
self.generic_protocol_handler = generic_protocol_handler
|
||||||
|
@ -63,6 +61,10 @@ class Mplex(IMuxedConn):
|
||||||
# Kick off reading
|
# Kick off reading
|
||||||
asyncio.ensure_future(self.handle_incoming())
|
asyncio.ensure_future(self.handle_incoming())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def initiator(self) -> bool:
|
||||||
|
return self.conn.initiator
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
"""
|
"""
|
||||||
close the stream muxer and underlying raw connection
|
close the stream muxer and underlying raw connection
|
||||||
|
@ -98,6 +100,7 @@ class Mplex(IMuxedConn):
|
||||||
return None
|
return None
|
||||||
return await self.buffers[stream_id].get()
|
return await self.buffers[stream_id].get()
|
||||||
|
|
||||||
|
# FIXME: Remove multiaddr from being passed into muxed_conn
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
self, protocol_id: str, multi_addr: Multiaddr
|
self, protocol_id: str, multi_addr: Multiaddr
|
||||||
) -> IMuxedStream:
|
) -> IMuxedStream:
|
||||||
|
@ -108,7 +111,7 @@ class Mplex(IMuxedConn):
|
||||||
:return: a new stream
|
:return: a new stream
|
||||||
"""
|
"""
|
||||||
stream_id = self.conn.next_stream_id()
|
stream_id = self.conn.next_stream_id()
|
||||||
stream = MplexStream(stream_id, multi_addr, self)
|
stream = MplexStream(stream_id, True, self)
|
||||||
self.buffers[stream_id] = asyncio.Queue()
|
self.buffers[stream_id] = asyncio.Queue()
|
||||||
await self.send_message(HeaderTags.NewStream, None, stream_id)
|
await self.send_message(HeaderTags.NewStream, None, stream_id)
|
||||||
return stream
|
return stream
|
||||||
|
|
Loading…
Reference in New Issue
Block a user