diff --git a/muxer/mplex/constants.py b/muxer/mplex/constants.py new file mode 100644 index 0000000..a6dfcab --- /dev/null +++ b/muxer/mplex/constants.py @@ -0,0 +1,6 @@ +HEADER_TAGS = { + "NEW_STREAM": 0, + "MESSAGE": 2, + "CLOSE": 4, + "RESET": 6 +} diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index fdae239..9603ab0 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -48,7 +48,6 @@ class MuxedConn(IMuxedConn): self.buffers[stream_id] = bytearray() return stream - def accept_stream(self): """ accepts a muxed stream opened by the other end diff --git a/muxer/mplex/muxed_connection_interface.py b/muxer/mplex/muxed_connection_interface.py index df1694a..9aa854c 100644 --- a/muxer/mplex/muxed_connection_interface.py +++ b/muxer/mplex/muxed_connection_interface.py @@ -1,11 +1,18 @@ from abc import ABC, abstractmethod + class IMuxedConn(ABC): """ reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ - # TODO closer + @abstractmethod + def close(self): + """ + close connection + :return: true if successful + """ + pass @abstractmethod def is_closed(self): diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index 92624e5..a6b6cbf 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -1,34 +1,113 @@ +import asyncio from .muxed_stream_interface import IMuxedStream +from .constants import HEADER_TAGS + class MuxedStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, protocol_id, stream_name): - self.protocol_id = protocol_id - self.name = stream_name + def __init__(self, stream_id, initiator, muxed_conn): + """ + create new MuxedStream in muxer + :param stream_id: stream stream id + :param initiator: boolean if this is an initiator + :param muxed_conn: muxed connection of this muxed_stream + """ + self.id = stream_id + self.initiator = initiator + self.muxed_conn = muxed_conn + + # self.read_deadline = None + # self.write_deadline = None + + self.local_closed = False + self.remote_closed = False + + def get_flag(self, action): + """ + get header flag based on action for mplex + :param action: action type in str + :return: int flag + """ + if self.initiator: + return HEADER_TAGS[action] + else: + return HEADER_TAGS[action] - 1 def read(self): - pass + """ + read messages associated with stream from buffer til end of file + :return: bytes of input + """ + return self.muxed_conn.read_buffer(self.id) - def write(self): - pass + def write(self, data): + """ + write to stream + :return: number of bytes written + """ + return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) def close(self): - pass + """ + close stream + :return: true if successful + """ + + if self.local_closed and self.remote_closed: + return True + + self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.id) + self.muxed_conn.streams.pop(self.id) + + self.local_closed = True + self.remote_closed = True + + return True def reset(self): """ closes both ends of the stream tells this remote side to hang up - :return: error/exception + :return: true if successful """ + # TODO behavior not fully understood pass + # if self.local_closed and self.remote_closed: + # return True + # + # self.muxed_conn.send_message(self.get_flag("RESET"), None, self.id) + # self.muxed_conn.streams.pop(self.id, None) + # + # self.local_closed = True + # self.remote_closed = True + # + # return True + # TODO deadline not in use def set_deadline(self, ttl): """ set deadline for muxed stream - :return: a new stream + :return: True if successful """ - pass + self.read_deadline = ttl + self.write_deadline = ttl + return True + + def set_read_deadline(self, ttl): + """ + set read deadline for muxed stream + :return: True if successful + """ + self.read_deadline = ttl + return True + + def set_write_deadline(self, ttl): + """ + set write deadline for muxed stream + :return: True if successful + """ + self.write_deadline = ttl + return True diff --git a/muxer/mplex/muxed_stream_interface.py b/muxer/mplex/muxed_stream_interface.py index 2c9bc25..e95a2c8 100644 --- a/muxer/mplex/muxed_stream_interface.py +++ b/muxer/mplex/muxed_stream_interface.py @@ -1,11 +1,31 @@ from abc import ABC, abstractmethod -# from datetime import time + class IMuxedStream(ABC): - # TODO Reader - # TODO Writer - # TODO Closer + @abstractmethod + def read(self): + """ + read from stream + :return: bytes of input + """ + pass + + @abstractmethod + def write(self, _bytes): + """ + write to stream + :return: number of bytes written + """ + pass + + @abstractmethod + def close(self): + """ + close stream + :return: true if successful + """ + pass @abstractmethod def reset(self):