From 2bde260f5f5b1b3a064d09aad7d22bb3fbbdb7ca Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 11 Nov 2018 09:56:44 -0500 Subject: [PATCH] refactor for sprint --- connection/raw_connection.py | 14 ---- connection/raw_connection_interface.py | 8 -- libp2p/libp2p.py | 17 ++++ {connection => muxer/mplex}/__init__.py | 0 muxer/{ => mplex}/muxed_connection.py | 0 .../{ => mplex}/muxed_connection_interface.py | 0 muxer/{ => mplex}/muxed_stream.py | 0 muxer/{ => mplex}/muxed_stream_interface.py | 0 muxer/{ => mplex}/smux_multiplex.py | 0 {stream => muxer/yamux}/__init__.py | 0 network/tcp.py | 83 ------------------- transport/connection/__init__.py | 0 transport/connection/raw_connection.py | 16 ++++ .../connection/raw_connection_interface.py | 15 ++++ {network => transport}/listener_interface.py | 0 transport/stream/__init__.py | 0 {stream => transport/stream}/stream.py | 0 .../stream}/stream_interface.py | 0 transport/tcp.py | 43 ++++++++-- transport/upgrader.py | 18 ++++ 20 files changed, 102 insertions(+), 112 deletions(-) delete mode 100644 connection/raw_connection.py delete mode 100644 connection/raw_connection_interface.py rename {connection => muxer/mplex}/__init__.py (100%) rename muxer/{ => mplex}/muxed_connection.py (100%) rename muxer/{ => mplex}/muxed_connection_interface.py (100%) rename muxer/{ => mplex}/muxed_stream.py (100%) rename muxer/{ => mplex}/muxed_stream_interface.py (100%) rename muxer/{ => mplex}/smux_multiplex.py (100%) rename {stream => muxer/yamux}/__init__.py (100%) delete mode 100644 network/tcp.py create mode 100644 transport/connection/__init__.py create mode 100644 transport/connection/raw_connection.py create mode 100644 transport/connection/raw_connection_interface.py rename {network => transport}/listener_interface.py (100%) create mode 100644 transport/stream/__init__.py rename {stream => transport/stream}/stream.py (100%) rename {stream => transport/stream}/stream_interface.py (100%) create mode 100644 transport/upgrader.py diff --git a/connection/raw_connection.py b/connection/raw_connection.py deleted file mode 100644 index 9c77edb..0000000 --- a/connection/raw_connection.py +++ /dev/null @@ -1,14 +0,0 @@ -import asyncio -from .raw_connection_interface import IRawConnection - -class RawConnection(IRawConnection): - - def __init__(self, ip, port): - self.conn_ip = ip - self.conn_port = port - self.reader = None - self.writer = None - - async def open_connection(self): - self.reader, self.writer = \ - await asyncio.open_connection(self.conn_ip, self.conn_port) diff --git a/connection/raw_connection_interface.py b/connection/raw_connection_interface.py deleted file mode 100644 index 30f0f32..0000000 --- a/connection/raw_connection_interface.py +++ /dev/null @@ -1,8 +0,0 @@ -from abc import ABC, abstractmethod - -class IRawConnection(ABC): - - @abstractmethod - async def open_connection(self): - pass - \ No newline at end of file diff --git a/libp2p/libp2p.py b/libp2p/libp2p.py index e69de29..cb8c78b 100644 --- a/libp2p/libp2p.py +++ b/libp2p/libp2p.py @@ -0,0 +1,17 @@ +from .config import Config +from ..peer.peerstore import PeerStore + +class Libp2p(object): + + def __init__(self, idOpt, \ + transportOpt = ["/ip4/0.0.0.0/tcp/0"], \ + muxerOpt = ["mplex/6.7.0"], \ + secOpt = ["secio"], \ + peerstoreOpt = new PeerStore()): + + if idOpt: + self.idOpt = idOpt + else: + # TODO generate RSA public key pair + + # TODO initialize diff --git a/connection/__init__.py b/muxer/mplex/__init__.py similarity index 100% rename from connection/__init__.py rename to muxer/mplex/__init__.py diff --git a/muxer/muxed_connection.py b/muxer/mplex/muxed_connection.py similarity index 100% rename from muxer/muxed_connection.py rename to muxer/mplex/muxed_connection.py diff --git a/muxer/muxed_connection_interface.py b/muxer/mplex/muxed_connection_interface.py similarity index 100% rename from muxer/muxed_connection_interface.py rename to muxer/mplex/muxed_connection_interface.py diff --git a/muxer/muxed_stream.py b/muxer/mplex/muxed_stream.py similarity index 100% rename from muxer/muxed_stream.py rename to muxer/mplex/muxed_stream.py diff --git a/muxer/muxed_stream_interface.py b/muxer/mplex/muxed_stream_interface.py similarity index 100% rename from muxer/muxed_stream_interface.py rename to muxer/mplex/muxed_stream_interface.py diff --git a/muxer/smux_multiplex.py b/muxer/mplex/smux_multiplex.py similarity index 100% rename from muxer/smux_multiplex.py rename to muxer/mplex/smux_multiplex.py diff --git a/stream/__init__.py b/muxer/yamux/__init__.py similarity index 100% rename from stream/__init__.py rename to muxer/yamux/__init__.py diff --git a/network/tcp.py b/network/tcp.py deleted file mode 100644 index 6ea962d..0000000 --- a/network/tcp.py +++ /dev/null @@ -1,83 +0,0 @@ -import asyncio -from .transport_interface import ITransport -from .listener_interface import IListener - -class TCP(ITransport): - - def __init__(self): - self.listener = self.Listener() - - class Listener(IListener): - - def __init__(self, handler_function=None): - self.multiaddrs = [] - self.server = None - self.handler = staticmethod(handler_function) - - def listen(self, multiaddr): - """ - put listener in listening mode and wait for incoming connections - :param multiaddr: multiaddr of peer - :return: return True if successful - """ - # TODO check for exceptions - _multiaddr = multiaddr - if "ipfs" in multiaddr.get_protocols(): - # ipfs_id = multiaddr.get_ipfs_id() - _multiaddr = multiaddr.remove_protocol("ipfs") - - self.multiaddrs.append(_multiaddr) - _multiaddr_dict = _multiaddr.to_dict() - _loop = asyncio.get_event_loop() - _coroutine = asyncio.start_server(self.handler, _multiaddr_dict.host,\ - _multiaddr_dict.port, loop=_loop) - self.server = _loop.run_until_complete(_coroutine) - return True - - def get_addrs(self): - """ - retrieve list of addresses the listener is listening on - :return: return list of addrs - """ - # TODO check if server is listening - return self.multiaddrs - - def close(self, options=None): - """ - close the listener such that no more connections - can be open on this transport instance - :param options: optional object potential with timeout - a timeout value in ms that fires and destroy all connections - :return: return True if successful - """ - if self.server is None: - return False - self.server.close() - _loop = asyncio.get_event_loop() - _loop.run_until_complete(self.server.wait_closed()) - _loop.close() - self.server = None - return True - - def dial(self, multiaddr, options=None): - """ - dial a transport to peer listening on multiaddr - :param multiaddr: multiaddr of peer - :param options: optional object - :return: True if successful - """ - _multiaddr_dict = multiaddr.to_dict() - reader, writer = await asyncio.open_connection(_multiaddr_dict.host,\ - _multiaddr_dict.port) - return False - # TODO dial behavior not fully understood - - def create_listener(self, handler_function, options=None): - """ - create listener on transport - :param options: optional object with properties the listener must have - :param handler_function: a function called when a new conntion is received - that takes a connection as argument which implements interface-connection - :return: a listener object that implements listener_interface.py - """ - return self.Listener(handler_function) diff --git a/transport/connection/__init__.py b/transport/connection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/transport/connection/raw_connection.py b/transport/connection/raw_connection.py new file mode 100644 index 0000000..e93e19e --- /dev/null +++ b/transport/connection/raw_connection.py @@ -0,0 +1,16 @@ +import asyncio +from .raw_connection_interface import IRawConnection + +class RawConnection(IRawConnection): + + def __init__(self, ip, port): + self.conn_ip = ip + self.conn_port = port + self.reader, self.writer = self.open_connection() + + async def open_connection(self): + """ + opens a connection on self.ip and self.port + :return: a raw connection + """ + return await asyncio.open_connection(self.conn_ip, self.conn_port) diff --git a/transport/connection/raw_connection_interface.py b/transport/connection/raw_connection_interface.py new file mode 100644 index 0000000..8c07a95 --- /dev/null +++ b/transport/connection/raw_connection_interface.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + +class IRawConnection(ABC): + """ + A Raw Connection provides a Reader and a Writer + open_connection should return such a connection + """ + + @abstractmethod + async def open_connection(self): + """ + opens a connection on ip and port + :return: a raw connection + """ + pass diff --git a/network/listener_interface.py b/transport/listener_interface.py similarity index 100% rename from network/listener_interface.py rename to transport/listener_interface.py diff --git a/transport/stream/__init__.py b/transport/stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stream/stream.py b/transport/stream/stream.py similarity index 100% rename from stream/stream.py rename to transport/stream/stream.py diff --git a/stream/stream_interface.py b/transport/stream/stream_interface.py similarity index 100% rename from stream/stream_interface.py rename to transport/stream/stream_interface.py diff --git a/transport/tcp.py b/transport/tcp.py index d2a0687..6ea962d 100644 --- a/transport/tcp.py +++ b/transport/tcp.py @@ -5,24 +5,42 @@ from .listener_interface import IListener class TCP(ITransport): def __init__(self): - self.multiaddr = None + self.listener = self.Listener() class Listener(IListener): + def __init__(self, handler_function=None): + self.multiaddrs = [] + self.server = None + self.handler = staticmethod(handler_function) + def listen(self, multiaddr): """ put listener in listening mode and wait for incoming connections :param multiaddr: multiaddr of peer :return: return True if successful """ - pass + # TODO check for exceptions + _multiaddr = multiaddr + if "ipfs" in multiaddr.get_protocols(): + # ipfs_id = multiaddr.get_ipfs_id() + _multiaddr = multiaddr.remove_protocol("ipfs") + + self.multiaddrs.append(_multiaddr) + _multiaddr_dict = _multiaddr.to_dict() + _loop = asyncio.get_event_loop() + _coroutine = asyncio.start_server(self.handler, _multiaddr_dict.host,\ + _multiaddr_dict.port, loop=_loop) + self.server = _loop.run_until_complete(_coroutine) + return True def get_addrs(self): """ retrieve list of addresses the listener is listening on :return: return list of addrs """ - pass + # TODO check if server is listening + return self.multiaddrs def close(self, options=None): """ @@ -32,16 +50,27 @@ class TCP(ITransport): a timeout value in ms that fires and destroy all connections :return: return True if successful """ - pass + if self.server is None: + return False + self.server.close() + _loop = asyncio.get_event_loop() + _loop.run_until_complete(self.server.wait_closed()) + _loop.close() + self.server = None + return True def dial(self, multiaddr, options=None): """ dial a transport to peer listening on multiaddr :param multiaddr: multiaddr of peer :param options: optional object - :return: list of multiaddrs + :return: True if successful """ - pass + _multiaddr_dict = multiaddr.to_dict() + reader, writer = await asyncio.open_connection(_multiaddr_dict.host,\ + _multiaddr_dict.port) + return False + # TODO dial behavior not fully understood def create_listener(self, handler_function, options=None): """ @@ -51,4 +80,4 @@ class TCP(ITransport): that takes a connection as argument which implements interface-connection :return: a listener object that implements listener_interface.py """ - pass + return self.Listener(handler_function) diff --git a/transport/upgrader.py b/transport/upgrader.py new file mode 100644 index 0000000..6401424 --- /dev/null +++ b/transport/upgrader.py @@ -0,0 +1,18 @@ +class TransportUpgrader(object): + + def __init__(self, secOpt, muxerOpt): + self.sec = secOpt + self.muxer = muxerOpt + + def upgrade_listener(self, transport, listeners): + """ + upgrade multiaddr listeners to libp2p-transport listeners + + """ + pass + + def upgrade_security(self): + pass + + def upgrade_muxer(self): + pass \ No newline at end of file