From e047752d82965164413af2f1ef8c92c4a2d3d03d Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 20 Nov 2018 20:28:41 -0500 Subject: [PATCH 1/6] rename muxed_conn --- muxer/mplex/smux_multiplex.py | 45 ------------------- {muxer => stream_muxer}/__init__.py | 0 {muxer => stream_muxer}/mplex/__init__.py | 0 {muxer => stream_muxer}/mplex/constants.py | 0 .../mplex/mplex.py | 10 ++--- .../mplex/mplex_stream.py | 4 +- {muxer => stream_muxer}/mplex/utils.py | 0 .../muxed_connection_interface.py | 0 .../muxed_stream_interface.py | 0 {muxer => stream_muxer}/yamux/__init__.py | 0 transport/upgrader.py | 4 +- 11 files changed, 9 insertions(+), 54 deletions(-) delete mode 100644 muxer/mplex/smux_multiplex.py rename {muxer => stream_muxer}/__init__.py (100%) rename {muxer => stream_muxer}/mplex/__init__.py (100%) rename {muxer => stream_muxer}/mplex/constants.py (100%) rename muxer/mplex/muxed_connection.py => stream_muxer/mplex/mplex.py (94%) rename muxer/mplex/muxed_stream.py => stream_muxer/mplex/mplex_stream.py (97%) rename {muxer => stream_muxer}/mplex/utils.py (100%) rename {muxer/mplex => stream_muxer}/muxed_connection_interface.py (100%) rename {muxer/mplex => stream_muxer}/muxed_stream_interface.py (100%) rename {muxer => stream_muxer}/yamux/__init__.py (100%) diff --git a/muxer/mplex/smux_multiplex.py b/muxer/mplex/smux_multiplex.py deleted file mode 100644 index a99f266..0000000 --- a/muxer/mplex/smux_multiplex.py +++ /dev/null @@ -1,45 +0,0 @@ -from .muxed_connection import MuxedConn - - -class Multiplex(object): - """ - muxing logic currently lives in MuxedConn - reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go - """ - def __init__(self, conn, initiator): - """ - :param conn: an instance of raw connection - :param initiator: boolean to prevent multiplex with self - """ - self.muxed_conn = MuxedConn(conn, initiator) - - def close(self): - """ - close the stream muxer and underlying raw connection - """ - return self.muxed_conn.close() - - def is_closed(self): - """ - check connection is fully closed - :return: true if successful - """ - return self.muxed_conn.is_closed() - - def open_stream(self, protocol_id, stream_name): - """ - creates a new muxed_stream - :return: a new stream - """ - return self.muxed_conn.open_stream(protocol_id, stream_name) - - def accept_stream(self, _muxed_stream): - """ - accepts a muxed stream opened by the other end - :param _muxed_stream: stream to be accepted - :return: the accepted stream - """ - pass - - # def new_conn(raw_conn, is_server): - # pass diff --git a/muxer/__init__.py b/stream_muxer/__init__.py similarity index 100% rename from muxer/__init__.py rename to stream_muxer/__init__.py diff --git a/muxer/mplex/__init__.py b/stream_muxer/mplex/__init__.py similarity index 100% rename from muxer/mplex/__init__.py rename to stream_muxer/mplex/__init__.py diff --git a/muxer/mplex/constants.py b/stream_muxer/mplex/constants.py similarity index 100% rename from muxer/mplex/constants.py rename to stream_muxer/mplex/constants.py diff --git a/muxer/mplex/muxed_connection.py b/stream_muxer/mplex/mplex.py similarity index 94% rename from muxer/mplex/muxed_connection.py rename to stream_muxer/mplex/mplex.py index 0a43640..02e6c3d 100644 --- a/muxer/mplex/muxed_connection.py +++ b/stream_muxer/mplex/mplex.py @@ -1,10 +1,10 @@ import asyncio from .utils import encode_uvarint, decode_uvarint -from .muxed_connection_interface import IMuxedConn -from .muxed_stream import MuxedStream +from .mplex_stream import MplexStream +from ..muxed_connection_interface import IMuxedConn -class MuxedConn(IMuxedConn): +class Mplex(IMuxedConn): """ reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go """ @@ -57,7 +57,7 @@ class MuxedConn(IMuxedConn): :param multi_addr: multi_addr that stream connects to :return: a new stream """ - stream = MuxedStream(stream_id, multi_addr, self) + stream = MplexStream(stream_id, multi_addr, self) self.streams[stream_id] = stream return stream @@ -69,7 +69,7 @@ class MuxedConn(IMuxedConn): # TODO update to pull out protocol_id from message protocol_id = "/echo/1.0.0" stream_id = await self.stream_queue.get() - stream = MuxedStream(stream_id, False, self) + stream = MplexStream(stream_id, False, self) return stream, stream_id, protocol_id async def send_message(self, flag, data, stream_id): diff --git a/muxer/mplex/muxed_stream.py b/stream_muxer/mplex/mplex_stream.py similarity index 97% rename from muxer/mplex/muxed_stream.py rename to stream_muxer/mplex/mplex_stream.py index 11e04f4..ebf77e2 100644 --- a/muxer/mplex/muxed_stream.py +++ b/stream_muxer/mplex/mplex_stream.py @@ -1,8 +1,8 @@ -from .muxed_stream_interface import IMuxedStream from .constants import HEADER_TAGS +from ..muxed_stream_interface import IMuxedStream -class MuxedStream(IMuxedStream): +class MplexStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ diff --git a/muxer/mplex/utils.py b/stream_muxer/mplex/utils.py similarity index 100% rename from muxer/mplex/utils.py rename to stream_muxer/mplex/utils.py diff --git a/muxer/mplex/muxed_connection_interface.py b/stream_muxer/muxed_connection_interface.py similarity index 100% rename from muxer/mplex/muxed_connection_interface.py rename to stream_muxer/muxed_connection_interface.py diff --git a/muxer/mplex/muxed_stream_interface.py b/stream_muxer/muxed_stream_interface.py similarity index 100% rename from muxer/mplex/muxed_stream_interface.py rename to stream_muxer/muxed_stream_interface.py diff --git a/muxer/yamux/__init__.py b/stream_muxer/yamux/__init__.py similarity index 100% rename from muxer/yamux/__init__.py rename to stream_muxer/yamux/__init__.py diff --git a/transport/upgrader.py b/transport/upgrader.py index 66cf21c..eeec0f6 100644 --- a/transport/upgrader.py +++ b/transport/upgrader.py @@ -1,4 +1,4 @@ -from muxer.mplex.muxed_connection import MuxedConn +from stream_muxer.mplex.mplex import Mplex class TransportUpgrader(): @@ -24,4 +24,4 @@ class TransportUpgrader(): # For PoC, no security, default to mplex # TODO do exchange to determine multiplexer - return MuxedConn(conn, initiator) + return Mplex(conn, initiator) From d83b6ef459865d7141e5b8cfc3dd292cd1bd6147 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 20 Nov 2018 21:46:18 -0500 Subject: [PATCH 2/6] mplex stream close reset done --- stream_muxer/mplex/mplex.py | 1 + stream_muxer/mplex/mplex_stream.py | 63 +++++++++++++++++------------- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/stream_muxer/mplex/mplex.py b/stream_muxer/mplex/mplex.py index 02e6c3d..516dce2 100644 --- a/stream_muxer/mplex/mplex.py +++ b/stream_muxer/mplex/mplex.py @@ -19,6 +19,7 @@ class Mplex(IMuxedConn): self.buffers = {} self.streams = {} self.stream_queue = asyncio.Queue() + self.conn_lock = asyncio.Lock() # The initiator need not read upon construction time. # It should read when the user decides that it wants to read from the constructed stream. diff --git a/stream_muxer/mplex/mplex_stream.py b/stream_muxer/mplex/mplex_stream.py index ebf77e2..f310a34 100644 --- a/stream_muxer/mplex/mplex_stream.py +++ b/stream_muxer/mplex/mplex_stream.py @@ -1,3 +1,4 @@ +import asyncio from .constants import HEADER_TAGS from ..muxed_stream_interface import IMuxedStream @@ -7,22 +8,21 @@ class MplexStream(IMuxedStream): reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator, muxed_conn): + def __init__(self, stream_id, initiator, mplex_conn): """ create new MuxedStream in muxer :param stream_id: stream stream id :param initiator: boolean if this is an initiator - :param muxed_conn: muxed connection of this muxed_stream + :param mplex_conn: muxed connection of this muxed_stream """ self.stream_id = stream_id self.initiator = initiator - self.muxed_conn = muxed_conn - + self.mplex_conn = mplex_conn self.read_deadline = None self.write_deadline = None - self.local_closed = False self.remote_closed = False + self.stream_lock = asyncio.Lock() def get_flag(self, action): """ @@ -40,50 +40,57 @@ class MplexStream(IMuxedStream): read messages associated with stream from buffer til end of file :return: bytes of input """ - return await self.muxed_conn.read_buffer(self.stream_id) + return await self.mplex_conn.read_buffer(self.stream_id) async def write(self, data): """ write to stream :return: number of bytes written """ - return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) + return await self.mplex_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) async def close(self): """ - close stream + Closing a stream closes it for writing and closes the remote end for reading + but allows writing in the other direction. :return: true if successful """ + #TODO error handling with timeout + await self.mplex_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) - if self.local_closed and self.remote_closed: - return True + remote_lock = "" + async with self.stream_lock: + if self.local_closed: + return True + self.local_closed = True + remote_lock = self.remote_closed - await self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) - self.muxed_conn.streams.pop(self.stream_id) - - self.local_closed = True - self.remote_closed = True + if remote_lock: + async with self.mplex_conn.conn_lock: + self.mplex_conn.streams.pop(self.stream_id) return True - def reset(self): + async def reset(self): """ closes both ends of the stream tells this remote side to hang up :return: true if successful """ - # TODO behavior not fully understood - pass - # if self.local_closed and self.remote_closed: - # return True - # - # self.muxed_conn.send_message(self.get_flag("RESET"), None, self.id) - # self.muxed_conn.streams.pop(self.id, None) - # - # self.local_closed = True - # self.remote_closed = True - # - # return True + async with self.stream_lock: + if self.remote_closed and self.local_closed: + return True + + if not self.remote_closed: + await self.mplex_conn.send_message(self.get_flag("RESET"), None, self.stream_id) + + self.local_closed = True + self.remote_closed = True + + async with self.mplex_conn.conn_lock: + self.mplex_conn.streams.pop(self.stream_id, None) + + return True # TODO deadline not in use def set_deadline(self, ttl): From daf9f69b6258e4ecdead322585728f5dee7b16d4 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 20 Nov 2018 23:25:49 -0500 Subject: [PATCH 3/6] raw_connection echo test --- tests/network/__init__.py | 0 tests/network/test_connection.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 tests/network/__init__.py create mode 100644 tests/network/test_connection.py diff --git a/tests/network/__init__.py b/tests/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/network/test_connection.py b/tests/network/test_connection.py new file mode 100644 index 0000000..64c1a91 --- /dev/null +++ b/tests/network/test_connection.py @@ -0,0 +1,29 @@ +import pytest +import asyncio + +from network.connection.raw_connection import RawConnection + + +async def handle_echo(reader, writer): + data = await reader.read(100) + message = data.decode() + + writer.write(data) + await writer.drain() + + writer.close() + +@pytest.mark.asyncio +async def test_echo(): + server_ip = '127.0.0.1' + server_port = 8888 + await asyncio.start_server(handle_echo, server_ip, server_port) + + reader, writer = await asyncio.open_connection(server_ip, server_port) + raw_connection = RawConnection(server_ip, server_port, reader, writer) + + test_message = "hello world" + writer.write(test_message.encode()) + response = (await reader.read()).decode() + + assert response == (test_message) From 2569eac3a071761c078799db40bce725fe3e4832 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 20 Nov 2018 23:49:37 -0500 Subject: [PATCH 4/6] lint --- network/connection/raw_connection.py | 1 - tests/network/test_connection.py | 14 ++++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/network/connection/raw_connection.py b/network/connection/raw_connection.py index 6612854..1569f22 100644 --- a/network/connection/raw_connection.py +++ b/network/connection/raw_connection.py @@ -9,6 +9,5 @@ class RawConnection(IRawConnection): self.reader = reader self.writer = writer - def close(self): self.writer.close() diff --git a/tests/network/test_connection.py b/tests/network/test_connection.py index 64c1a91..56f4cc2 100644 --- a/tests/network/test_connection.py +++ b/tests/network/test_connection.py @@ -1,29 +1,27 @@ -import pytest import asyncio +import pytest -from network.connection.raw_connection import RawConnection +# from network.connection.raw_connection import RawConnection async def handle_echo(reader, writer): data = await reader.read(100) - message = data.decode() - writer.write(data) await writer.drain() writer.close() @pytest.mark.asyncio -async def test_echo(): +async def test_simple_echo(): server_ip = '127.0.0.1' server_port = 8888 await asyncio.start_server(handle_echo, server_ip, server_port) reader, writer = await asyncio.open_connection(server_ip, server_port) - raw_connection = RawConnection(server_ip, server_port, reader, writer) - + # raw_connection = RawConnection(server_ip, server_port, reader, writer) + test_message = "hello world" writer.write(test_message.encode()) response = (await reader.read()).decode() - + assert response == (test_message) From 276aa97f21efcdbfdf696152df3639955b229da0 Mon Sep 17 00:00:00 2001 From: Robert Zajac Date: Wed, 21 Nov 2018 00:13:02 -0500 Subject: [PATCH 5/6] adding todo to connection test --- tests/network/test_connection.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/network/test_connection.py b/tests/network/test_connection.py index 56f4cc2..d7ddd61 100644 --- a/tests/network/test_connection.py +++ b/tests/network/test_connection.py @@ -12,6 +12,7 @@ async def handle_echo(reader, writer): writer.close() @pytest.mark.asyncio +# TODO: this test should develop out into a fuller test between MPlex modules communicating with each other. async def test_simple_echo(): server_ip = '127.0.0.1' server_port = 8888 From 7df6424ff1b090f6a580aa3245236a5e0f9e5bbf Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 21 Nov 2018 00:41:13 -0500 Subject: [PATCH 6/6] todo for close and reset --- stream_muxer/mplex/mplex_stream.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/stream_muxer/mplex/mplex_stream.py b/stream_muxer/mplex/mplex_stream.py index f310a34..417ef64 100644 --- a/stream_muxer/mplex/mplex_stream.py +++ b/stream_muxer/mplex/mplex_stream.py @@ -55,7 +55,8 @@ class MplexStream(IMuxedStream): but allows writing in the other direction. :return: true if successful """ - #TODO error handling with timeout + # TODO error handling with timeout + # TODO understand better how mutexes are used from go repo await self.mplex_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) remote_lock = "" @@ -77,6 +78,8 @@ class MplexStream(IMuxedStream): tells this remote side to hang up :return: true if successful """ + # TODO understand better how mutexes are used here + # TODO understand the difference between close and reset async with self.stream_lock: if self.remote_closed and self.local_closed: return True