diff --git a/host/basic_host.py b/host/basic_host.py index 2eb8ca0..15215f1 100644 --- a/host/basic_host.py +++ b/host/basic_host.py @@ -1,5 +1,6 @@ from .host_interface import IHost + # Upon host creation, host takes in options, # including the list of addresses on which to listen. # Host then parses these options and delegates to its Network instance, @@ -51,7 +52,7 @@ class BasicHost(IHost): async def new_stream(self, peer_id, protocol_id): """ :param peer_id: peer_id that host is connecting - :param proto_id: protocol id that stream runs on + :param protocol_id: protocol id that stream runs on :return: true if successful """ # TODO: host should return a mux stream not a raw stream diff --git a/host/host_interface.py b/host/host_interface.py index eaca0f8..f366489 100644 --- a/host/host_interface.py +++ b/host/host_interface.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + class IHost(ABC): @abstractmethod @@ -36,9 +37,8 @@ class IHost(ABC): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on @abstractmethod - def new_stream(self, context, peer_id, protocol_id): + def new_stream(self, peer_id, protocol_id): """ - :param context: a context instance :param peer_id: peer_id that host is connecting :param proto_id: protocol id that stream runs on :return: true if successful diff --git a/libp2p/libp2p.py b/libp2p/libp2p.py index 7fe6b77..6695200 100644 --- a/libp2p/libp2p.py +++ b/libp2p/libp2p.py @@ -1,37 +1,35 @@ +from Crypto.PublicKey import RSA from peer.peerstore import PeerStore from network.swarm import Swarm from host.basic_host import BasicHost from transport.upgrader import TransportUpgrader from transport.tcp.tcp import TCP -from Crypto.PublicKey import RSA -class Libp2p(object): - def __init__(self, idOpt = None, \ - transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \ - muxerOpt = ["mplex/6.7.0"], \ - secOpt = ["secio"], \ - peerstore = PeerStore()): - - if idOpt: - self.idOpt = idOpt +class Libp2p(): + + def __init__(self, id_opt=None, transport_opt=["/ip4/127.0.0.1/tcp/8001"], \ + muxer_opt=["mplex/6.7.0"], sec_opt=["secio"], peerstore=PeerStore()): + + if id_opt: + self.id_opt = id_opt else: new_key = RSA.generate(2048, e=65537) - self.idOpt = new_key.publickey().exportKey("PEM") + self.id_opt = new_key.publickey().exportKey("PEM") self.private_key = new_key.exportKey("PEM") - - self.transportOpt = transportOpt - self.muxerOpt = muxerOpt - self.secOpt = secOpt + + self.transport_opt = transport_opt + self.muxer_opt = muxer_opt + self.sec_opt = sec_opt self.peerstore = peerstore async def new_node(self): - upgrader = TransportUpgrader(self.secOpt, self.transportOpt) - swarm = Swarm(self.idOpt, self.peerstore, upgrader) + upgrader = TransportUpgrader(self.sec_opt, self.transport_opt) + swarm = Swarm(self.id_opt, self.peerstore, upgrader) tcp = TCP() swarm.add_transport(tcp) - await swarm.listen(self.transportOpt[0]) + await swarm.listen(self.transport_opt[0]) host = BasicHost(swarm) # TODO MuxedConnection currently contains all muxing logic (move to a Muxer) diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index 5794289..61bd6fe 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -3,6 +3,7 @@ from .utils import encode_uvarint, decode_uvarint from .muxed_connection_interface import IMuxedConn from .muxed_stream import MuxedStream + class MuxedConn(IMuxedConn): """ reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go @@ -47,6 +48,10 @@ class MuxedConn(IMuxedConn): def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): """ creates a new muxed_stream + :param protocol_id: protocol_id of stream + :param stream_id: stream_id of stream + :param peer_id: peer_id that stream connects to + :param multi_addr: multi_addr that stream connects to :return: a new stream """ stream = MuxedStream(stream_id, multi_addr, self) diff --git a/muxer/mplex/muxed_connection_interface.py b/muxer/mplex/muxed_connection_interface.py index 9aa854c..7a22970 100644 --- a/muxer/mplex/muxed_connection_interface.py +++ b/muxer/mplex/muxed_connection_interface.py @@ -23,11 +23,13 @@ class IMuxedConn(ABC): pass @abstractmethod - def open_stream(self, protocol_id, stream_name): + def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): """ creates a new muxed_stream - :param protocol_id: id to be associated with stream - :param stream_name: name as part of identifier + :param protocol_id: protocol_id of stream + :param stream_id: stream_id of stream + :param peer_id: peer_id that stream connects to + :param multi_addr: multi_addr that stream connects to :return: a new stream """ pass diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index ea8a606..1d6bd5f 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -1,4 +1,3 @@ -import asyncio from .muxed_stream_interface import IMuxedStream from .constants import HEADER_TAGS @@ -15,12 +14,12 @@ class MuxedStream(IMuxedStream): :param initiator: boolean if this is an initiator :param muxed_conn: muxed connection of this muxed_stream """ - self.id = stream_id + self.stream_id = stream_id self.initiator = initiator self.muxed_conn = muxed_conn - # self.read_deadline = None - # self.write_deadline = None + self.read_deadline = None + self.write_deadline = None self.local_closed = False self.remote_closed = False @@ -33,22 +32,22 @@ class MuxedStream(IMuxedStream): """ if self.initiator: return HEADER_TAGS[action] - else: - return HEADER_TAGS[action] - 1 + + return HEADER_TAGS[action] - 1 async def read(self): """ read messages associated with stream from buffer til end of file :return: bytes of input """ - return await self.muxed_conn.read_buffer(self.id) + return await self.muxed_conn.read_buffer(self.stream_id) async def write(self, data): """ write to stream :return: number of bytes written """ - return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) + return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) def close(self): """ @@ -59,8 +58,8 @@ class MuxedStream(IMuxedStream): if self.local_closed and self.remote_closed: return True - self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.id) - self.muxed_conn.streams.pop(self.id) + self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) + self.muxed_conn.streams.pop(self.stream_id) self.local_closed = True self.remote_closed = True diff --git a/muxer/mplex/smux_multiplex.py b/muxer/mplex/smux_multiplex.py index 60fd0ed..a99f266 100644 --- a/muxer/mplex/smux_multiplex.py +++ b/muxer/mplex/smux_multiplex.py @@ -1,8 +1,9 @@ -from .muxed_stream import MuxedStream from .muxed_connection import MuxedConn + class Multiplex(object): """ + muxing logic currently lives in MuxedConn reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go """ def __init__(self, conn, initiator): diff --git a/network/connection/raw_connection.py b/network/connection/raw_connection.py index 4947b1c..6612854 100644 --- a/network/connection/raw_connection.py +++ b/network/connection/raw_connection.py @@ -1,6 +1,6 @@ -import asyncio from .raw_connection_interface import IRawConnection + class RawConnection(IRawConnection): def __init__(self, ip, port, reader, writer): @@ -12,15 +12,3 @@ class RawConnection(IRawConnection): def close(self): self.writer.close() - - # def __init__(self, ip, port): - # self.conn_ip = ip - # self.conn_port = port - # self.reader, self.writer = self.open_connection() - - # async def open_connection(self): - # """ - # opens a connection on self.ip and self.port - # :return: a raw connection - # """ - # return await asyncio.open_connection(self.conn_ip, self.conn_port) diff --git a/network/connection/raw_connection_interface.py b/network/connection/raw_connection_interface.py index 2684355..0cb1ca9 100644 --- a/network/connection/raw_connection_interface.py +++ b/network/connection/raw_connection_interface.py @@ -1,15 +1,7 @@ -from abc import ABC, abstractmethod +from abc import ABC + class IRawConnection(ABC): """ A Raw Connection provides a Reader and a Writer - open_connection should return such a connection """ - - # @abstractmethod - # async def open_connection(self): - # """ - # opens a connection on ip and port - # :return: a raw connection - # """ - # pass diff --git a/network/network_interface.py b/network/network_interface.py index 7ba7641..0a30473 100644 --- a/network/network_interface.py +++ b/network/network_interface.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + class INetwork(ABC): @abstractmethod diff --git a/network/stream/net_stream.py b/network/stream/net_stream.py index acf1f0e..0377c32 100644 --- a/network/stream/net_stream.py +++ b/network/stream/net_stream.py @@ -1,10 +1,11 @@ -import asyncio from .net_stream_interface import INetStream + class NetStream(INetStream): def __init__(self, muxed_stream): self.muxed_stream = muxed_stream + self.protocol_id = None def get_protocol(self): """ @@ -26,12 +27,12 @@ class NetStream(INetStream): """ return await self.muxed_stream.read() - async def write(self, bytes): + async def write(self, data): """ write to stream :return: number of bytes written """ - return await self.muxed_stream.write(bytes) + return await self.muxed_stream.write(data) def close(self): """ diff --git a/network/stream/net_stream_interface.py b/network/stream/net_stream_interface.py index 920e2fa..33d838b 100644 --- a/network/stream/net_stream_interface.py +++ b/network/stream/net_stream_interface.py @@ -1,11 +1,7 @@ from abc import ABC, abstractmethod -class INetStream(ABC): - def __init__(self, peer_id, multi_addr, connection): - self.peer_id = peer_id - self.multi_addr = multi_addr - self.connection = connection +class INetStream(ABC): @abstractmethod def get_protocol(self): diff --git a/network/swarm.py b/network/swarm.py index bd98528..9a5dc3b 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -4,6 +4,7 @@ from .stream.net_stream import NetStream from .multiaddr import MultiAddr from .connection.raw_connection import RawConnection + class Swarm(INetwork): def __init__(self, my_peer_id, peerstore, upgrader): @@ -13,6 +14,7 @@ class Swarm(INetwork): self.connections = dict() self.listeners = dict() self.stream_handlers = dict() + self.transport = None def set_stream_handler(self, protocol_id, stream_handler): """ @@ -37,11 +39,8 @@ class Swarm(INetwork): multiaddr = addrs[0] if peer_id in self.connections: - """ - If muxed connection already exists for peer_id, - set muxed connection equal to - existing muxed connection - """ + # If muxed connection already exists for peer_id, + # set muxed connection equal to existing muxed connection muxed_conn = self.connections[peer_id] else: # Transport dials peer (gets back a raw conn) @@ -68,8 +67,7 @@ class Swarm(INetwork): """ :param *args: one or many multiaddrs to start listening on :return: true if at least one success - """ - """ + For each multiaddr in args Check if a listener for multiaddr exists already If listener already exists, continue @@ -87,8 +85,10 @@ class Swarm(INetwork): multiaddr_dict = multiaddr.to_options() async def conn_handler(reader, writer): - # Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr) - raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer) + # Upgrade reader/write to a net_stream and pass \ + # to appropriate stream handler (using multiaddr) + raw_conn = RawConnection(multiaddr_dict['host'], \ + multiaddr_dict['port'], reader, writer) muxed_conn = self.upgrader.upgrade_connection(raw_conn, False) muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream() diff --git a/network/walkthrough.txt b/network/walkthrough.txt deleted file mode 100644 index 3183d2a..0000000 --- a/network/walkthrough.txt +++ /dev/null @@ -1,4 +0,0 @@ -host.go --> config.go - config.go: newNode --> swarm.go: newSwarm - newSwarm | initializes data stores - diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index 7bad422..1a8ddc3 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -4,8 +4,8 @@ from libp2p.libp2p import Libp2p @pytest.mark.asyncio async def test_simple_messages(): - libA = Libp2p(transportOpt=["/ip4/127.0.0.1/tcp/8001/ipfs/hostA"]) - libB = Libp2p(transportOpt=["/ip4/127.0.0.1/tcp/8000/ipfs/hostB"]) + libA = Libp2p(transport_opt=["/ip4/127.0.0.1/tcp/8001/ipfs/hostA"]) + libB = Libp2p(transport_opt=["/ip4/127.0.0.1/tcp/8000/ipfs/hostB"]) hostA = await libA.new_node() hostB = await libB.new_node() diff --git a/transport/listener_interface.py b/transport/listener_interface.py index 9398bf9..9c4125b 100644 --- a/transport/listener_interface.py +++ b/transport/listener_interface.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + class IListener(ABC): @abstractmethod diff --git a/transport/transport_interface.py b/transport/transport_interface.py index 324ea43..e2ade45 100644 --- a/transport/transport_interface.py +++ b/transport/transport_interface.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + class ITransport(ABC): @abstractmethod diff --git a/transport/upgrader.py b/transport/upgrader.py index 6cd5b2e..66cf21c 100644 --- a/transport/upgrader.py +++ b/transport/upgrader.py @@ -1,6 +1,7 @@ from muxer.mplex.muxed_connection import MuxedConn -class TransportUpgrader(object): + +class TransportUpgrader(): def __init__(self, secOpt, muxerOpt): self.sec = secOpt @@ -12,7 +13,7 @@ class TransportUpgrader(object): """ pass - + def upgrade_security(self): pass @@ -20,8 +21,7 @@ class TransportUpgrader(object): """ upgrade raw connection to muxed connection """ - # For PoC, no security - # Default to mplex - + + # For PoC, no security, default to mplex # TODO do exchange to determine multiplexer return MuxedConn(conn, initiator)