diff --git a/libp2p/network/connection/net_connection_interface.py b/libp2p/network/connection/net_connection_interface.py index c2c6285..e308ad6 100644 --- a/libp2p/network/connection/net_connection_interface.py +++ b/libp2p/network/connection/net_connection_interface.py @@ -7,7 +7,7 @@ from libp2p.stream_muxer.abc import IMuxedConn class INetConn(Closer): - conn: IMuxedConn + muxed_conn: IMuxedConn @abstractmethod async def new_stream(self) -> INetStream: diff --git a/libp2p/network/connection/swarm_connection.py b/libp2p/network/connection/swarm_connection.py index cf1dc9e..e25d75f 100644 --- a/libp2p/network/connection/swarm_connection.py +++ b/libp2p/network/connection/swarm_connection.py @@ -16,15 +16,15 @@ Reference: https://github.com/libp2p/go-libp2p-swarm/blob/04c86bbdafd390651cb2ee class SwarmConn(INetConn): - conn: IMuxedConn + muxed_conn: IMuxedConn swarm: "Swarm" streams: Set[NetStream] event_closed: asyncio.Event _tasks: List["asyncio.Future[Any]"] - def __init__(self, conn: IMuxedConn, swarm: "Swarm") -> None: - self.conn = conn + def __init__(self, muxed_conn: IMuxedConn, swarm: "Swarm") -> None: + self.muxed_conn = muxed_conn self.swarm = swarm self.streams = set() self.event_closed = asyncio.Event() @@ -37,7 +37,7 @@ class SwarmConn(INetConn): self.event_closed.set() self.swarm.remove_conn(self) - await self.conn.close() + await self.muxed_conn.close() # This is just for cleaning up state. The connection has already been closed. # We *could* optimize this but it really isn't worth it. @@ -56,7 +56,7 @@ class SwarmConn(INetConn): async def _handle_new_streams(self) -> None: while True: try: - stream = await self.conn.accept_stream() + stream = await self.muxed_conn.accept_stream() except MuxedConnUnavailable: # If there is anything wrong in the MuxedConn, # we should break the loop and close the connection. @@ -96,7 +96,7 @@ class SwarmConn(INetConn): self._tasks.append(asyncio.ensure_future(coro)) async def new_stream(self) -> NetStream: - muxed_stream = await self.conn.open_stream() + muxed_stream = await self.muxed_conn.open_stream() return self._add_stream(muxed_stream) async def get_streams(self) -> Tuple[NetStream, ...]: diff --git a/libp2p/network/stream/net_stream.py b/libp2p/network/stream/net_stream.py index 625a288..0142721 100644 --- a/libp2p/network/stream/net_stream.py +++ b/libp2p/network/stream/net_stream.py @@ -1,3 +1,5 @@ +from typing import Optional + from libp2p.stream_muxer.abc import IMuxedStream from libp2p.stream_muxer.exceptions import ( MuxedStreamClosed, @@ -16,7 +18,7 @@ from .net_stream_interface import INetStream class NetStream(INetStream): muxed_stream: IMuxedStream - protocol_id: TProtocol + protocol_id: Optional[TProtocol] def __init__(self, muxed_stream: IMuxedStream) -> None: self.muxed_stream = muxed_stream diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index bed204e..9d507fb 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -279,7 +279,7 @@ class Swarm(INetwork): """ Simply remove the connection from Swarm's records, without closing the connection. """ - peer_id = swarm_conn.conn.peer_id + peer_id = swarm_conn.muxed_conn.peer_id if peer_id not in self.connections: return # TODO: Should be changed to remove the exact connection, diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 627152e..85c0bd8 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -36,7 +36,7 @@ class PubsubNotifee(INotifee): :param network: network the connection was opened on :param conn: connection that was opened """ - await self.initiator_peers_queue.put(conn.conn.peer_id) + await self.initiator_peers_queue.put(conn.muxed_conn.peer_id) async def disconnected(self, network: INetwork, conn: INetConn) -> None: pass diff --git a/tests/factories.py b/tests/factories.py index cecc656..b4e8be2 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -155,7 +155,7 @@ async def mplex_conn_pair_factory(is_secure: bool) -> Tuple[Mplex, Swarm, Mplex, conn_0, swarm_0, conn_1, swarm_1 = await swarm_conn_pair_factory( is_secure, muxer_opt=muxer_opt ) - return conn_0.conn, swarm_0, conn_1.conn, swarm_1 + return conn_0.muxed_conn, swarm_0, conn_1.muxed_conn, swarm_1 async def mplex_stream_pair_factory( diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index 26d3140..a9fe031 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -53,8 +53,8 @@ async def perform_simple_test( node2_conn = node2.get_network().connections[peer_id_for_node(node1)] # Perform assertion - assertion_func(node1_conn.conn.secured_conn) - assertion_func(node2_conn.conn.secured_conn) + assertion_func(node1_conn.muxed_conn.secured_conn) + assertion_func(node2_conn.muxed_conn.secured_conn) # Success, terminate pending tasks.