diff --git a/libp2p/libp2p.py b/libp2p/libp2p.py index f14e95e..809e204 100644 --- a/libp2p/libp2p.py +++ b/libp2p/libp2p.py @@ -6,8 +6,8 @@ from Crypto.PublicKey import RSA class Libp2p(object): - def __init__(self, idOpt, \ - transportOpt = ["/ip4/127.0.0.1/tcp/10000/app/1.0.0"], \ + def __init__(self, idOpt = None, \ + transportOpt = ["/ip4/127.0.0.1/tcp/10000"], \ muxerOpt = ["mplex/6.7.0"], \ secOpt = ["secio"], \ peerstore = PeerStore()): diff --git a/muxer/mplex/constants.py b/muxer/mplex/constants.py new file mode 100644 index 0000000..a6dfcab --- /dev/null +++ b/muxer/mplex/constants.py @@ -0,0 +1,6 @@ +HEADER_TAGS = { + "NEW_STREAM": 0, + "MESSAGE": 2, + "CLOSE": 4, + "RESET": 6 +} diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index fdae239..9603ab0 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -48,7 +48,6 @@ class MuxedConn(IMuxedConn): self.buffers[stream_id] = bytearray() return stream - def accept_stream(self): """ accepts a muxed stream opened by the other end diff --git a/muxer/mplex/muxed_connection_interface.py b/muxer/mplex/muxed_connection_interface.py index df1694a..9aa854c 100644 --- a/muxer/mplex/muxed_connection_interface.py +++ b/muxer/mplex/muxed_connection_interface.py @@ -1,11 +1,18 @@ from abc import ABC, abstractmethod + class IMuxedConn(ABC): """ reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ - # TODO closer + @abstractmethod + def close(self): + """ + close connection + :return: true if successful + """ + pass @abstractmethod def is_closed(self): diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index 92624e5..a6b6cbf 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -1,34 +1,113 @@ +import asyncio from .muxed_stream_interface import IMuxedStream +from .constants import HEADER_TAGS + class MuxedStream(IMuxedStream): """ reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, protocol_id, stream_name): - self.protocol_id = protocol_id - self.name = stream_name + def __init__(self, stream_id, initiator, muxed_conn): + """ + create new MuxedStream in muxer + :param stream_id: stream stream id + :param initiator: boolean if this is an initiator + :param muxed_conn: muxed connection of this muxed_stream + """ + self.id = stream_id + self.initiator = initiator + self.muxed_conn = muxed_conn + + # self.read_deadline = None + # self.write_deadline = None + + self.local_closed = False + self.remote_closed = False + + def get_flag(self, action): + """ + get header flag based on action for mplex + :param action: action type in str + :return: int flag + """ + if self.initiator: + return HEADER_TAGS[action] + else: + return HEADER_TAGS[action] - 1 def read(self): - pass + """ + read messages associated with stream from buffer til end of file + :return: bytes of input + """ + return self.muxed_conn.read_buffer(self.id) - def write(self): - pass + def write(self, data): + """ + write to stream + :return: number of bytes written + """ + return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) def close(self): - pass + """ + close stream + :return: true if successful + """ + + if self.local_closed and self.remote_closed: + return True + + self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.id) + self.muxed_conn.streams.pop(self.id) + + self.local_closed = True + self.remote_closed = True + + return True def reset(self): """ closes both ends of the stream tells this remote side to hang up - :return: error/exception + :return: true if successful """ + # TODO behavior not fully understood pass + # if self.local_closed and self.remote_closed: + # return True + # + # self.muxed_conn.send_message(self.get_flag("RESET"), None, self.id) + # self.muxed_conn.streams.pop(self.id, None) + # + # self.local_closed = True + # self.remote_closed = True + # + # return True + # TODO deadline not in use def set_deadline(self, ttl): """ set deadline for muxed stream - :return: a new stream + :return: True if successful """ - pass + self.read_deadline = ttl + self.write_deadline = ttl + return True + + def set_read_deadline(self, ttl): + """ + set read deadline for muxed stream + :return: True if successful + """ + self.read_deadline = ttl + return True + + def set_write_deadline(self, ttl): + """ + set write deadline for muxed stream + :return: True if successful + """ + self.write_deadline = ttl + return True diff --git a/muxer/mplex/muxed_stream_interface.py b/muxer/mplex/muxed_stream_interface.py index 2c9bc25..e95a2c8 100644 --- a/muxer/mplex/muxed_stream_interface.py +++ b/muxer/mplex/muxed_stream_interface.py @@ -1,11 +1,31 @@ from abc import ABC, abstractmethod -# from datetime import time + class IMuxedStream(ABC): - # TODO Reader - # TODO Writer - # TODO Closer + @abstractmethod + def read(self): + """ + read from stream + :return: bytes of input + """ + pass + + @abstractmethod + def write(self, _bytes): + """ + write to stream + :return: number of bytes written + """ + pass + + @abstractmethod + def close(self): + """ + close stream + :return: true if successful + """ + pass @abstractmethod def reset(self): diff --git a/muxer/mplex/smux_multiplex.py b/muxer/mplex/smux_multiplex.py index 6239ad7..60fd0ed 100644 --- a/muxer/mplex/smux_multiplex.py +++ b/muxer/mplex/smux_multiplex.py @@ -8,7 +8,7 @@ class Multiplex(object): def __init__(self, conn, initiator): """ :param conn: an instance of raw connection - : param initiator: boolean to prevent multiplex with self + :param initiator: boolean to prevent multiplex with self """ self.muxed_conn = MuxedConn(conn, initiator) diff --git a/network/swarm.py b/network/swarm.py index 9e060f9..cb20f99 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -47,7 +47,7 @@ class Swarm(INetwork): raw_conn = self.transport.dial(first_addr) # Use upgrader to upgrade raw conn to muxed conn - muxed_conn = self.upgrader.upgrade_connection(raw_conn) + muxed_conn = self.upgrader.upgrade_connection(raw_conn, True) # Store muxed connection in connections self.connections[peer_id] = muxed_conn diff --git a/transport/upgrader.py b/transport/upgrader.py index e70d437..05e2218 100644 --- a/transport/upgrader.py +++ b/transport/upgrader.py @@ -1,3 +1,5 @@ +from muxer.mplex.muxed_connection import MuxedConn + class TransportUpgrader(object): def __init__(self, secOpt, muxerOpt): @@ -14,11 +16,13 @@ class TransportUpgrader(object): def upgrade_security(self): pass - def upgrade_connection(self, conn): + def upgrade_connection(self, conn, initiator): """ upgrade raw connection to muxed connection """ # For PoC, no security # Default to mplex - pass - \ No newline at end of file + + # TODO do exchange to determine multiplexer + + return MuxedConn(conn, initiator)