From 19650d0f72f170e90c4721e26b061050ce5682dd Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 31 Oct 2018 22:31:00 +0100 Subject: [PATCH] muxer scaffolding --- connection/muxed_connection.py | 0 connection/muxed_connection_interface.py | 0 muxer/muxed_connection.py | 41 ++++++++++++ muxer/muxed_connection_interface.py | 32 +++++++++ muxer/muxed_stream.py | 33 ++++++++++ muxer/muxed_stream_interface.py | 25 +++++++ muxer/smux_multiplex.py | 44 +++++++++++++ network/tcp.py | 83 ++++++++++++++++++++++++ 8 files changed, 258 insertions(+) delete mode 100644 connection/muxed_connection.py delete mode 100644 connection/muxed_connection_interface.py create mode 100644 muxer/muxed_connection.py create mode 100644 muxer/muxed_connection_interface.py create mode 100644 muxer/muxed_stream.py create mode 100644 muxer/muxed_stream_interface.py create mode 100644 network/tcp.py diff --git a/connection/muxed_connection.py b/connection/muxed_connection.py deleted file mode 100644 index e69de29..0000000 diff --git a/connection/muxed_connection_interface.py b/connection/muxed_connection_interface.py deleted file mode 100644 index e69de29..0000000 diff --git a/muxer/muxed_connection.py b/muxer/muxed_connection.py new file mode 100644 index 0000000..26c80f5 --- /dev/null +++ b/muxer/muxed_connection.py @@ -0,0 +1,41 @@ +from .muxed_connection_interface import IMuxedConnection + +class MuxedConnection(IMuxedConnection): + """ + reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go + """ + def __init__(self, conn, initiator): + """ + create a new muxed connection + :param conn: an instance of raw connection + :param initiator: boolean to prevent multiplex with self + """ + self.raw_conn = conn + self.initiator = initiator + + def close(self): + """ + close the stream muxer and underlying raw connection + """ + pass + + def is_closed(self): + """ + check connection is fully closed + :return: true if successful + """ + pass + + def open_stream(self): + """ + creates a new muxed_stream + :return: a new stream + """ + pass + + def accept_stream(self): + """ + accepts a muxed stream opened by the other end + :return: the accepted stream + """ + pass diff --git a/muxer/muxed_connection_interface.py b/muxer/muxed_connection_interface.py new file mode 100644 index 0000000..cf12591 --- /dev/null +++ b/muxer/muxed_connection_interface.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod + +class IMuxedConn(ABC): + """ + reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go + """ + + # TODO closer + + @abstractmethod + def is_closed(self): + """ + check connection is fully closed + :return: true if successful + """ + pass + + @abstractmethod + def open_stream(self): + """ + creates a new muxed_stream + :return: a new stream + """ + pass + + @abstractmethod + def accept_stream(self): + """ + accepts a muxed stream opened by the other end + :return: the accepted stream + """ + pass diff --git a/muxer/muxed_stream.py b/muxer/muxed_stream.py new file mode 100644 index 0000000..be37d58 --- /dev/null +++ b/muxer/muxed_stream.py @@ -0,0 +1,33 @@ +from .muxed_stream_interface import IMuxedStream + +class MuxedStream(IMuxedStream): + """ + reference: https://github.com/libp2p/go-mplex/blob/master/stream.go + """ + def __init__(self, stream_id, stream_name): + self.id = stream_id + self.name = stream_name + + def read(self): + pass + + def write(self): + pass + + def close(self): + pass + + def reset(self): + """ + closes both ends of the stream + tells this remote side to hang up + :return: error/exception + """ + pass + + def set_deadline(self, ttl): + """ + set deadline for muxed stream + :return: a new stream + """ + pass diff --git a/muxer/muxed_stream_interface.py b/muxer/muxed_stream_interface.py new file mode 100644 index 0000000..0ac3ec9 --- /dev/null +++ b/muxer/muxed_stream_interface.py @@ -0,0 +1,25 @@ +from abc import ABC, abstractmethod +from datetime import time + +class IMuxedStream(ABC): + + # TODO Reader + # TODO Writer + # TODO Closer + + @abstractmethod + def reset(self): + """ + closes both ends of the stream + tells this remote side to hang up + :return: error/exception + """ + pass + + @abstractmethod + def set_deadline(self, ttl): + """ + set deadline for muxed stream + :return: a new stream + """ + pass diff --git a/muxer/smux_multiplex.py b/muxer/smux_multiplex.py index e69de29..ae7feb9 100644 --- a/muxer/smux_multiplex.py +++ b/muxer/smux_multiplex.py @@ -0,0 +1,44 @@ +from .muxed_stream import MuxedStream +from .muxed_connection import MuxedConn + +class Multiplex(object): + """ + reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go + """ + def __init__(self, conn, initiator): + """ + :param conn: an instance of raw connection + : param initiator: boolean to prevent multiplex with self + """ + self.muxed_conn = MuxedConn(conn, initiator) + + def close(self): + """ + close the stream muxer and underlying raw connection + """ + return self.muxed_conn.close() + + def is_closed(self): + """ + check connection is fully closed + :return: true if successful + """ + return self.muxed_conn.is_closed() + + def open_stream(self): + """ + creates a new muxed_stream + :return: a new stream + """ + return self.muxed_conn.open_stream() + + def accept_stream(self, _muxed_stream): + """ + accepts a muxed stream opened by the other end + :param _muxed_stream: stream to be accepted + :return: the accepted stream + """ + pass + + # def new_conn(raw_conn, is_server): + # pass diff --git a/network/tcp.py b/network/tcp.py new file mode 100644 index 0000000..6ea962d --- /dev/null +++ b/network/tcp.py @@ -0,0 +1,83 @@ +import asyncio +from .transport_interface import ITransport +from .listener_interface import IListener + +class TCP(ITransport): + + def __init__(self): + self.listener = self.Listener() + + class Listener(IListener): + + def __init__(self, handler_function=None): + self.multiaddrs = [] + self.server = None + self.handler = staticmethod(handler_function) + + def listen(self, multiaddr): + """ + put listener in listening mode and wait for incoming connections + :param multiaddr: multiaddr of peer + :return: return True if successful + """ + # TODO check for exceptions + _multiaddr = multiaddr + if "ipfs" in multiaddr.get_protocols(): + # ipfs_id = multiaddr.get_ipfs_id() + _multiaddr = multiaddr.remove_protocol("ipfs") + + self.multiaddrs.append(_multiaddr) + _multiaddr_dict = _multiaddr.to_dict() + _loop = asyncio.get_event_loop() + _coroutine = asyncio.start_server(self.handler, _multiaddr_dict.host,\ + _multiaddr_dict.port, loop=_loop) + self.server = _loop.run_until_complete(_coroutine) + return True + + def get_addrs(self): + """ + retrieve list of addresses the listener is listening on + :return: return list of addrs + """ + # TODO check if server is listening + return self.multiaddrs + + def close(self, options=None): + """ + close the listener such that no more connections + can be open on this transport instance + :param options: optional object potential with timeout + a timeout value in ms that fires and destroy all connections + :return: return True if successful + """ + if self.server is None: + return False + self.server.close() + _loop = asyncio.get_event_loop() + _loop.run_until_complete(self.server.wait_closed()) + _loop.close() + self.server = None + return True + + def dial(self, multiaddr, options=None): + """ + dial a transport to peer listening on multiaddr + :param multiaddr: multiaddr of peer + :param options: optional object + :return: True if successful + """ + _multiaddr_dict = multiaddr.to_dict() + reader, writer = await asyncio.open_connection(_multiaddr_dict.host,\ + _multiaddr_dict.port) + return False + # TODO dial behavior not fully understood + + def create_listener(self, handler_function, options=None): + """ + create listener on transport + :param options: optional object with properties the listener must have + :param handler_function: a function called when a new conntion is received + that takes a connection as argument which implements interface-connection + :return: a listener object that implements listener_interface.py + """ + return self.Listener(handler_function)