diff --git a/host/basic_host.py b/host/basic_host.py index 5e2bec9..2eb8ca0 100644 --- a/host/basic_host.py +++ b/host/basic_host.py @@ -11,7 +11,6 @@ class BasicHost(IHost): def __init__(self, _network): self.network = _network self.peerstore = self.network.peerstore - # self.stream_handlers = {} def get_id(self): """ @@ -49,13 +48,13 @@ class BasicHost(IHost): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on - def new_stream(self, peer_id, protocol_id): + async def new_stream(self, peer_id, protocol_id): """ :param peer_id: peer_id that host is connecting :param proto_id: protocol id that stream runs on :return: true if successful """ # TODO: host should return a mux stream not a raw stream - stream = self.network.new_stream(peer_id) + stream = await self.network.new_stream(peer_id, protocol_id) stream.set_protocol(protocol_id) return stream diff --git a/libp2p/libp2p.py b/libp2p/libp2p.py index a8f5d4b..7fe6b77 100644 --- a/libp2p/libp2p.py +++ b/libp2p/libp2p.py @@ -8,7 +8,7 @@ from Crypto.PublicKey import RSA class Libp2p(object): def __init__(self, idOpt = None, \ - transportOpt = ["/ip4/127.0.0.1/tcp/10000"], \ + transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \ muxerOpt = ["mplex/6.7.0"], \ secOpt = ["secio"], \ peerstore = PeerStore()): @@ -25,16 +25,15 @@ class Libp2p(object): self.secOpt = secOpt self.peerstore = peerstore - def new_node(self): + async def new_node(self): upgrader = TransportUpgrader(self.secOpt, self.transportOpt) swarm = Swarm(self.idOpt, self.peerstore, upgrader) tcp = TCP() swarm.add_transport(tcp) - swarm.listen(self.transportOpts) + await swarm.listen(self.transportOpt[0]) host = BasicHost(swarm) - # TODO MuxedConnection currently contains all muxing logic + # TODO MuxedConnection currently contains all muxing logic (move to a Muxer) # TODO routing unimplemented - return host diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index dc0bf8a..0ecb69e 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -17,8 +17,9 @@ class MuxedConn(IMuxedConn): self.initiator = initiator self.buffers = {} self.streams = {} + self.stream_queue = asyncio.Queue() - self.add_incoming_task() + asyncio.ensure_future(self.handle_incoming()) def close(self): """ @@ -33,7 +34,12 @@ class MuxedConn(IMuxedConn): """ pass - def read_buffer(self, stream_id): + async def read_buffer(self, stream_id): + # Empty buffer or nonexistent stream + # TODO: propagate up timeout exception and catch + if stream_id not in self.buffers or not self.buffers[stream_id]: + await self.handle_incoming() + data = self.buffers[stream_id] self.buffers[stream_id] = bytearray() return data @@ -43,37 +49,22 @@ class MuxedConn(IMuxedConn): creates a new muxed_stream :return: a new stream """ - stream = MuxedStream(peer_id, multi_addr, self) + stream = MuxedStream(stream_id, multi_addr, self) self.streams[stream_id] = stream - self.buffers[stream_id] = bytearray() return stream - def accept_stream(self): + async def accept_stream(self): """ accepts a muxed stream opened by the other end :return: the accepted stream """ - data = bytearray() - while True: - chunk = self.raw_conn.reader.read(100) - if not chunk: - break - data += chunk - header, end_index = decode_uvarint(data, 0) - length, end_index = decode_uvarint(data, end_index) - message = data[end_index, end_index + length] - - flag = header & 0x07 - stream_id = header >> 3 - # TODO update to pull out protocol_id from message protocol_id = "/echo/1.0.0" - + stream_id = await self.stream_queue.get() stream = MuxedStream(stream_id, False, self) - return stream, stream_id, protocol_id - def send_message(self, flag, data, stream_id): + async def send_message(self, flag, data, stream_id): """ sends a message over the connection :param header: header to use @@ -86,7 +77,8 @@ class MuxedConn(IMuxedConn): header = encode_uvarint(header) data_length = encode_uvarint(len(data)) _bytes = header + data_length + data - return self.write_to_stream(_bytes) + + return await self.write_to_stream(_bytes) async def write_to_stream(self, _bytes): self.raw_conn.writer.write(_bytes) @@ -95,25 +87,23 @@ class MuxedConn(IMuxedConn): async def handle_incoming(self): data = bytearray() - while True: - chunk = self.raw_conn.reader.read(100) - if not chunk: - break + try: + chunk = await asyncio.wait_for(self.raw_conn.reader.read(1024), timeout=5) data += chunk - header, end_index = decode_uvarint(data, 0) - length, end_index = decode_uvarint(data, end_index) - message = data[end_index, end_index + length] - # Deal with other types of messages - flag = header & 0x07 - stream_id = header >> 3 + header, end_index = decode_uvarint(data, 0) + length, end_index = decode_uvarint(data, end_index + 1) - self.buffers[stream_id] = self.buffers[stream_id] + message - # Read header - # Read message length - # Read message into corresponding buffer + message = data[-length:] - def add_incoming_task(self): - loop = asyncio.get_event_loop() - handle_incoming_task = loop.create_task(self.handle_incoming()) - handle_incoming_task.add_done_callback(self.add_incoming_task) + # Deal with other types of messages + flag = header & 0x07 + stream_id = header >> 3 + + if stream_id not in self.buffers: + self.buffers[stream_id] = message + await self.stream_queue.put(stream_id) + else: + self.buffers[stream_id] = self.buffers[stream_id] + message + except asyncio.TimeoutError: + print('timeout!') diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index a6b6cbf..ea8a606 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -36,19 +36,19 @@ class MuxedStream(IMuxedStream): else: return HEADER_TAGS[action] - 1 - def read(self): + async def read(self): """ read messages associated with stream from buffer til end of file :return: bytes of input """ - return self.muxed_conn.read_buffer(self.id) + return await self.muxed_conn.read_buffer(self.id) - def write(self, data): + async def write(self, data): """ write to stream :return: number of bytes written """ - return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) + return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) def close(self): """ diff --git a/network/multiaddr.py b/network/multiaddr.py index 77075a1..982a875 100644 --- a/network/multiaddr.py +++ b/network/multiaddr.py @@ -2,6 +2,8 @@ class MultiAddr: # Validates input string and constructs internal representation. def __init__(self, addr): + self.protocol_map = dict() + # Empty multiaddrs are valid. if not addr: self.protocol_map = dict() diff --git a/network/stream/net_stream.py b/network/stream/net_stream.py index 0dca44d..acf1f0e 100644 --- a/network/stream/net_stream.py +++ b/network/stream/net_stream.py @@ -19,19 +19,19 @@ class NetStream(INetStream): """ self.protocol_id = protocol_id - def read(self): + async def read(self): """ read from stream :return: bytes of input until EOF """ - return self.muxed_stream.read() + return await self.muxed_stream.read() - def write(self, bytes): + async def write(self, bytes): """ write to stream :return: number of bytes written """ - return self.muxed_stream.write(bytes) + return await self.muxed_stream.write(bytes) def close(self): """ diff --git a/network/swarm.py b/network/swarm.py index 2e24a53..bd98528 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -22,13 +22,20 @@ class Swarm(INetwork): """ self.stream_handlers[protocol_id] = stream_handler - def new_stream(self, peer_id, protocol_id): + async def new_stream(self, peer_id, protocol_id): """ :param peer_id: peer_id of destination :param protocol_id: protocol id :return: net stream instance """ - muxed_conn = None + # Get peer info from peer store + addrs = self.peerstore.addrs(peer_id) + + if not addrs: + raise SwarmException("No known addresses to peer") + + multiaddr = addrs[0] + if peer_id in self.connections: """ If muxed connection already exists for peer_id, @@ -37,14 +44,8 @@ class Swarm(INetwork): """ muxed_conn = self.connections[peer_id] else: - # Get peer info from peer store - addrs = self.peerstore.addrs(peer_id) - # Transport dials peer (gets back a raw conn) - if not addrs: - raise SwarmException("No known addresses to peer") - first_addr = addrs[0] - raw_conn = self.transport.dial(first_addr) + raw_conn = await self.transport.dial(MultiAddr(multiaddr)) # Use upgrader to upgrade raw conn to muxed conn muxed_conn = self.upgrader.upgrade_connection(raw_conn, True) @@ -54,28 +55,30 @@ class Swarm(INetwork): # Use muxed conn to open stream, which returns # a muxed stream - stream_id = str(uuid.uuid4()) - muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, first_addr) + # TODO: use better stream IDs + stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3 + muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr) # Create a net stream net_stream = NetStream(muxed_stream) return net_stream - def listen(self, *args): + async def listen(self, *args): """ :param *args: one or many multiaddrs to start listening on :return: true if at least one success """ - - # For each multiaddr in args - # Check if a listener for multiaddr exists already - # If listener already exists, continue - # Otherwise, do the following: - # Pass multiaddr into conn handler - # Have conn handler delegate to stream handler - # Call listener listen with the multiaddr - # Map multiaddr to listener + """ + For each multiaddr in args + Check if a listener for multiaddr exists already + If listener already exists, continue + Otherwise: + Capture multiaddr in conn handler + Have conn handler delegate to stream handler + Call listener listen with the multiaddr + Map multiaddr to listener + """ for multiaddr_str in args: if multiaddr_str in self.listeners: return True @@ -83,27 +86,28 @@ class Swarm(INetwork): multiaddr = MultiAddr(multiaddr_str) multiaddr_dict = multiaddr.to_options() - def conn_handler(reader, writer): + async def conn_handler(reader, writer): # Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr) - raw_conn = RawConnection(multiaddr_dict.host, multiaddr_dict.port, reader, writer) + raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer) muxed_conn = self.upgrader.upgrade_connection(raw_conn, False) - muxed_stream, stream_id, protocol_id = muxed_conn.accept_stream() + muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream() net_stream = NetStream(muxed_stream) net_stream.set_protocol(protocol_id) # Give to stream handler # TODO: handle case when stream handler is set - self.stream_handlers[protocol_id](net_stream) + # TODO: handle case of multiple protocols over same raw connection + await self.stream_handlers[protocol_id](net_stream) try: # Success listener = self.transport.create_listener(conn_handler) - listener.listen(multiaddr) + await listener.listen(multiaddr) return True except IOError: # Failed. Continue looping. - print("Failed to connect to: " + multiaddr) + print("Failed to connect to: " + str(multiaddr)) # No multiaddr succeeded return False diff --git a/requirements.txt b/requirements.txt index 95dfbe6..6a79713 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ asyncio pylint pytest pycryptodome +pytest-asyncio diff --git a/transport/tcp/tcp.py b/transport/tcp/tcp.py index 2984d22..fab9f9d 100644 --- a/transport/tcp/tcp.py +++ b/transport/tcp/tcp.py @@ -13,25 +13,26 @@ class TCP(ITransport): def __init__(self, handler_function=None): self.multiaddrs = [] self.server = None - self.handler = staticmethod(handler_function) + self.handler = handler_function - def listen(self, multiaddr): + async def listen(self, multiaddr): """ put listener in listening mode and wait for incoming connections :param multiaddr: multiaddr of peer :return: return True if successful """ - # TODO check for exceptions - if "ipfs" in multiaddr.get_protocols(): - # ipfs_id = multiaddr.get_ipfs_id() - _multiaddr = multiaddr.remove_protocol("ipfs") + _multiaddr = multiaddr - self.multiaddrs.append(multiaddr) + # TODO check for exceptions + if "ipfs" in _multiaddr.get_protocols(): + # ipfs_id = multiaddr.get_ipfs_id() + _multiaddr.remove_protocol("ipfs") + + self.multiaddrs.append(_multiaddr) multiaddr_dict = _multiaddr.to_options() - loop = asyncio.get_event_loop() - coroutine = asyncio.start_server(self.handler, multiaddr_dict.host,\ - multiaddr_dict.port, loop=loop) - self.server = loop.run_until_complete(coroutine) + coroutine = asyncio.start_server(self.handler, multiaddr_dict['host'],\ + multiaddr_dict['port']) + self.server = await coroutine return True def get_addrs(self): @@ -59,17 +60,19 @@ class TCP(ITransport): self.server = None return True - def dial(self, multiaddr, options=None): + async def dial(self, multiaddr, options=None): """ dial a transport to peer listening on multiaddr :param multiaddr: multiaddr of peer :param options: optional object :return: True if successful """ - _multiaddr_dict = multiaddr.to_dict() - host = _multiaddr_dict.host - port = _multiaddr_dict.port - reader, writer = open_conn(host, port) + _multiaddr_dict = multiaddr.to_options() + host = _multiaddr_dict['host'] + port = _multiaddr_dict['port'] + + reader, writer = await asyncio.open_connection(host, port) + return RawConnection(host, port, reader, writer) def create_listener(self, handler_function, options=None): @@ -81,7 +84,3 @@ class TCP(ITransport): :return: a listener object that implements listener_interface.py """ return self.Listener(handler_function) - -async def open_conn(host, port): - reader, writer = await asyncio.open_connection(host, port) - return reader, writer