diff --git a/muxer/mplex/smux_multiplex.py b/muxer/mplex/smux_multiplex.py deleted file mode 100644 index a99f266..0000000 --- a/muxer/mplex/smux_multiplex.py +++ /dev/null @@ -1,45 +0,0 @@ -from .muxed_connection import MuxedConn - - -class Multiplex(object): - """ - muxing logic currently lives in MuxedConn - reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go - """ - def __init__(self, conn, initiator): - """ - :param conn: an instance of raw connection - :param initiator: boolean to prevent multiplex with self - """ - self.muxed_conn = MuxedConn(conn, initiator) - - def close(self): - """ - close the stream muxer and underlying raw connection - """ - return self.muxed_conn.close() - - def is_closed(self): - """ - check connection is fully closed - :return: true if successful - """ - return self.muxed_conn.is_closed() - - def open_stream(self, protocol_id, stream_name): - """ - creates a new muxed_stream - :return: a new stream - """ - return self.muxed_conn.open_stream(protocol_id, stream_name) - - def accept_stream(self, _muxed_stream): - """ - accepts a muxed stream opened by the other end - :param _muxed_stream: stream to be accepted - :return: the accepted stream - """ - pass - - # def new_conn(raw_conn, is_server): - # pass diff --git a/network/connection/raw_connection.py b/network/connection/raw_connection.py index 6612854..1569f22 100644 --- a/network/connection/raw_connection.py +++ b/network/connection/raw_connection.py @@ -9,6 +9,5 @@ class RawConnection(IRawConnection): self.reader = reader self.writer = writer - def close(self): self.writer.close() diff --git a/muxer/__init__.py b/stream_muxer/__init__.py similarity index 100% rename from muxer/__init__.py rename to stream_muxer/__init__.py diff --git a/muxer/mplex/__init__.py b/stream_muxer/mplex/__init__.py similarity index 100% rename from muxer/mplex/__init__.py rename to stream_muxer/mplex/__init__.py diff --git a/muxer/mplex/constants.py b/stream_muxer/mplex/constants.py similarity index 100% rename from muxer/mplex/constants.py rename to stream_muxer/mplex/constants.py diff --git a/muxer/mplex/muxed_connection.py b/stream_muxer/mplex/mplex.py similarity index 93% rename from muxer/mplex/muxed_connection.py rename to stream_muxer/mplex/mplex.py index 0a43640..516dce2 100644 --- a/muxer/mplex/muxed_connection.py +++ b/stream_muxer/mplex/mplex.py @@ -1,10 +1,10 @@ import asyncio from .utils import encode_uvarint, decode_uvarint -from .muxed_connection_interface import IMuxedConn -from .muxed_stream import MuxedStream +from .mplex_stream import MplexStream +from ..muxed_connection_interface import IMuxedConn -class MuxedConn(IMuxedConn): +class Mplex(IMuxedConn): """ reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ @@ -19,6 +19,7 @@ class MuxedConn(IMuxedConn): self.buffers = {} self.streams = {} self.stream_queue = asyncio.Queue() + self.conn_lock = asyncio.Lock() # The initiator need not read upon construction time. # It should read when the user decides that it wants to read from the constructed stream. @@ -57,7 +58,7 @@ class MuxedConn(IMuxedConn): :param multi_addr: multi_addr that stream connects to :return: a new stream """ - stream = MuxedStream(stream_id, multi_addr, self) + stream = MplexStream(stream_id, multi_addr, self) self.streams[stream_id] = stream return stream @@ -69,7 +70,7 @@ class MuxedConn(IMuxedConn): # TODO update to pull out protocol_id from message protocol_id = "/echo/1.0.0" stream_id = await self.stream_queue.get() - stream = MuxedStream(stream_id, False, self) + stream = MplexStream(stream_id, False, self) return stream, stream_id, protocol_id async def send_message(self, flag, data, stream_id): diff --git a/muxer/mplex/muxed_stream.py b/stream_muxer/mplex/mplex_stream.py similarity index 54% rename from muxer/mplex/muxed_stream.py rename to stream_muxer/mplex/mplex_stream.py index 11e04f4..417ef64 100644 --- a/muxer/mplex/muxed_stream.py +++ b/stream_muxer/mplex/mplex_stream.py @@ -1,28 +1,28 @@ -from .muxed_stream_interface import IMuxedStream +import asyncio from .constants import HEADER_TAGS +from ..muxed_stream_interface import IMuxedStream -class MuxedStream(IMuxedStream): +class MplexStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator, muxed_conn): + def __init__(self, stream_id, initiator, mplex_conn): """ create new MuxedStream in muxer :param stream_id: stream stream id :param initiator: boolean if this is an initiator - :param muxed_conn: muxed connection of this muxed_stream + :param mplex_conn: muxed connection of this muxed_stream """ self.stream_id = stream_id self.initiator = initiator - self.muxed_conn = muxed_conn - + self.mplex_conn = mplex_conn self.read_deadline = None self.write_deadline = None - self.local_closed = False self.remote_closed = False + self.stream_lock = asyncio.Lock() def get_flag(self, action): """ @@ -40,50 +40,60 @@ class MuxedStream(IMuxedStream): read messages associated with stream from buffer til end of file :return: bytes of input """ - return await self.muxed_conn.read_buffer(self.stream_id) + return await self.mplex_conn.read_buffer(self.stream_id) async def write(self, data): """ write to stream :return: number of bytes written """ - return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) + return await self.mplex_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) async def close(self): """ - close stream + Closing a stream closes it for writing and closes the remote end for reading + but allows writing in the other direction. :return: true if successful """ + # TODO error handling with timeout + # TODO understand better how mutexes are used from go repo + await self.mplex_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) - if self.local_closed and self.remote_closed: - return True + remote_lock = "" + async with self.stream_lock: + if self.local_closed: + return True + self.local_closed = True + remote_lock = self.remote_closed - await self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) - self.muxed_conn.streams.pop(self.stream_id) - - self.local_closed = True - self.remote_closed = True + if remote_lock: + async with self.mplex_conn.conn_lock: + self.mplex_conn.streams.pop(self.stream_id) return True - def reset(self): + async def reset(self): """ closes both ends of the stream tells this remote side to hang up :return: true if successful """ - # TODO behavior not fully understood - pass - # if self.local_closed and self.remote_closed: - # return True - # - # self.muxed_conn.send_message(self.get_flag("RESET"), None, self.id) - # self.muxed_conn.streams.pop(self.id, None) - # - # self.local_closed = True - # self.remote_closed = True - # - # return True + # TODO understand better how mutexes are used here + # TODO understand the difference between close and reset + async with self.stream_lock: + if self.remote_closed and self.local_closed: + return True + + if not self.remote_closed: + await self.mplex_conn.send_message(self.get_flag("RESET"), None, self.stream_id) + + self.local_closed = True + self.remote_closed = True + + async with self.mplex_conn.conn_lock: + self.mplex_conn.streams.pop(self.stream_id, None) + + return True # TODO deadline not in use def set_deadline(self, ttl): diff --git a/muxer/mplex/utils.py b/stream_muxer/mplex/utils.py similarity index 100% rename from muxer/mplex/utils.py rename to stream_muxer/mplex/utils.py diff --git a/muxer/mplex/muxed_connection_interface.py b/stream_muxer/muxed_connection_interface.py similarity index 100% rename from muxer/mplex/muxed_connection_interface.py rename to stream_muxer/muxed_connection_interface.py diff --git a/muxer/mplex/muxed_stream_interface.py b/stream_muxer/muxed_stream_interface.py similarity index 100% rename from muxer/mplex/muxed_stream_interface.py rename to stream_muxer/muxed_stream_interface.py diff --git a/muxer/yamux/__init__.py b/stream_muxer/yamux/__init__.py similarity index 100% rename from muxer/yamux/__init__.py rename to stream_muxer/yamux/__init__.py diff --git a/tests/network/__init__.py b/tests/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/network/test_connection.py b/tests/network/test_connection.py new file mode 100644 index 0000000..d7ddd61 --- /dev/null +++ b/tests/network/test_connection.py @@ -0,0 +1,28 @@ +import asyncio +import pytest + +# from network.connection.raw_connection import RawConnection + + +async def handle_echo(reader, writer): + data = await reader.read(100) + writer.write(data) + await writer.drain() + + writer.close() + +@pytest.mark.asyncio +# TODO: this test should develop out into a fuller test between MPlex modules communicating with each other. +async def test_simple_echo(): + server_ip = '127.0.0.1' + server_port = 8888 + await asyncio.start_server(handle_echo, server_ip, server_port) + + reader, writer = await asyncio.open_connection(server_ip, server_port) + # raw_connection = RawConnection(server_ip, server_port, reader, writer) + + test_message = "hello world" + writer.write(test_message.encode()) + response = (await reader.read()).decode() + + assert response == (test_message) diff --git a/transport/upgrader.py b/transport/upgrader.py index 66cf21c..eeec0f6 100644 --- a/transport/upgrader.py +++ b/transport/upgrader.py @@ -1,4 +1,4 @@ -from muxer.mplex.muxed_connection import MuxedConn +from stream_muxer.mplex.mplex import Mplex class TransportUpgrader(): @@ -24,4 +24,4 @@ class TransportUpgrader(): # For PoC, no security, default to mplex # TODO do exchange to determine multiplexer - return MuxedConn(conn, initiator) + return Mplex(conn, initiator)