From 55dae3ed1efa2c32650dc3e81d619c747ee4d366 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 11 Nov 2018 16:16:21 -0500 Subject: [PATCH 1/3] mplex constants --- muxer/mplex/constants.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 muxer/mplex/constants.py diff --git a/muxer/mplex/constants.py b/muxer/mplex/constants.py new file mode 100644 index 0000000..dc3e613 --- /dev/null +++ b/muxer/mplex/constants.py @@ -0,0 +1,6 @@ +HEADER_TAGS = { + "NEW_STREAM": 0, + "MESSAGE": 2, + "CLOSE": 4, + "RESET": 6 +} \ No newline at end of file From fe20d258825d5216213124d52dc078b0209ce56d Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 11 Nov 2018 18:03:04 -0500 Subject: [PATCH 2/3] muxed stream --- muxer/mplex/constants.py | 2 +- muxer/mplex/muxed_connection.py | 2 - muxer/mplex/muxed_connection_interface.py | 9 ++- muxer/mplex/muxed_stream.py | 99 ++++++++++++++++++++--- muxer/mplex/muxed_stream_interface.py | 28 ++++++- 5 files changed, 122 insertions(+), 18 deletions(-) diff --git a/muxer/mplex/constants.py b/muxer/mplex/constants.py index dc3e613..a6dfcab 100644 --- a/muxer/mplex/constants.py +++ b/muxer/mplex/constants.py @@ -3,4 +3,4 @@ HEADER_TAGS = { "MESSAGE": 2, "CLOSE": 4, "RESET": 6 -} \ No newline at end of file +} diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index b7caa62..c58a7c3 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -32,10 +32,8 @@ class MuxedConn(IMuxedConn): creates a new muxed_stream :return: a new stream """ - return Stream(peer_id, multi_addr, self) - def accept_stream(self): """ accepts a muxed stream opened by the other end diff --git a/muxer/mplex/muxed_connection_interface.py b/muxer/mplex/muxed_connection_interface.py index df1694a..9aa854c 100644 --- a/muxer/mplex/muxed_connection_interface.py +++ b/muxer/mplex/muxed_connection_interface.py @@ -1,11 +1,18 @@ from abc import ABC, abstractmethod + class IMuxedConn(ABC): """ reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ - # TODO closer + @abstractmethod + def close(self): + """ + close connection + :return: true if successful + """ + pass @abstractmethod def is_closed(self): diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index 92624e5..23d531b 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -1,34 +1,113 @@ +import asyncio from .muxed_stream_interface import IMuxedStream +from .constants import HEADER_TAGS + class MuxedStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, protocol_id, stream_name): - self.protocol_id = protocol_id - self.name = stream_name + def __init__(self, stream_id, initiator, muxed_conn): + """ + create new MuxedStream in muxer + :param stream_id: stream stream id + :param initiator: boolean if this is an initiator + :param muxed_conn: muxed connection of this muxed_stream + """ + self.id = stream_id + self.initiator = initiator + self.muxed_conn = muxed_conn + + # self.read_deadline = None + # self.write_deadline = None + + self.local_closed = False + self.remote_closed = False + + def get_flag(self, action): + """ + get header flag based on action for mplex + :param action: action type in str + :return: int flag + """ + if self.initiator: + return HEADER_TAGS[action] + else: + return HEADER_TAGS[action] - 1 def read(self): - pass + """ + read from stream til end of file + :return: bytes of input + """ + return self.muxed_conn.read_buffer(self.id) - def write(self): - pass + def write(self, data): + """ + write to stream + :return: number of bytes written + """ + return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) def close(self): - pass + """ + close stream + :return: true if successful + """ + + if self.local_closed and self.remote_closed: + return True + + self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.id) + self.muxed_conn.streams.pop(self.id) + + self.local_closed = True + self.remote_closed = True + + return True def reset(self): """ closes both ends of the stream tells this remote side to hang up - :return: error/exception + :return: true if successful """ + # TODO behavior not fully understood pass + # if self.local_closed and self.remote_closed: + # return True + # + # self.muxed_conn.send_message(self.get_flag("RESET"), None, self.id) + # self.muxed_conn.streams.pop(self.id, None) + # + # self.local_closed = True + # self.remote_closed = True + # + # return True + # TODO deadline not in use def set_deadline(self, ttl): """ set deadline for muxed stream - :return: a new stream + :return: True if successful """ - pass + self.read_deadline = ttl + self.write_deadline = ttl + return True + + def set_read_deadline(self, ttl): + """ + set read deadline for muxed stream + :return: True if successful + """ + self.read_deadline = ttl + return True + + def set_write_deadline(self, ttl): + """ + set write deadline for muxed stream + :return: True if successful + """ + self.write_deadline = ttl + return True diff --git a/muxer/mplex/muxed_stream_interface.py b/muxer/mplex/muxed_stream_interface.py index 2c9bc25..e95a2c8 100644 --- a/muxer/mplex/muxed_stream_interface.py +++ b/muxer/mplex/muxed_stream_interface.py @@ -1,11 +1,31 @@ from abc import ABC, abstractmethod -# from datetime import time + class IMuxedStream(ABC): - # TODO Reader - # TODO Writer - # TODO Closer + @abstractmethod + def read(self): + """ + read from stream + :return: bytes of input + """ + pass + + @abstractmethod + def write(self, _bytes): + """ + write to stream + :return: number of bytes written + """ + pass + + @abstractmethod + def close(self): + """ + close stream + :return: true if successful + """ + pass @abstractmethod def reset(self): From 7a05170df4891e8a6c2def0ccbd4861deb4e20d7 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 11 Nov 2018 19:17:38 -0500 Subject: [PATCH 3/3] update docstring --- muxer/mplex/muxed_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index 23d531b..a6b6cbf 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -38,7 +38,7 @@ class MuxedStream(IMuxedStream): def read(self): """ - read from stream til end of file + read messages associated with stream from buffer til end of file :return: bytes of input """ return self.muxed_conn.read_buffer(self.id)