adding additional bugfixed files
This commit is contained in:
parent
90cb0e903e
commit
61e11a2716
|
@ -11,7 +11,6 @@ class BasicHost(IHost):
|
||||||
def __init__(self, _network):
|
def __init__(self, _network):
|
||||||
self.network = _network
|
self.network = _network
|
||||||
self.peerstore = self.network.peerstore
|
self.peerstore = self.network.peerstore
|
||||||
# self.stream_handlers = {}
|
|
||||||
|
|
||||||
def get_id(self):
|
def get_id(self):
|
||||||
"""
|
"""
|
||||||
|
@ -49,13 +48,13 @@ class BasicHost(IHost):
|
||||||
|
|
||||||
# protocol_id can be a list of protocol_ids
|
# protocol_id can be a list of protocol_ids
|
||||||
# stream will decide which protocol_id to run on
|
# stream will decide which protocol_id to run on
|
||||||
def new_stream(self, peer_id, protocol_id):
|
async def new_stream(self, peer_id, protocol_id):
|
||||||
"""
|
"""
|
||||||
:param peer_id: peer_id that host is connecting
|
:param peer_id: peer_id that host is connecting
|
||||||
:param proto_id: protocol id that stream runs on
|
:param proto_id: protocol id that stream runs on
|
||||||
:return: true if successful
|
:return: true if successful
|
||||||
"""
|
"""
|
||||||
# TODO: host should return a mux stream not a raw stream
|
# TODO: host should return a mux stream not a raw stream
|
||||||
stream = self.network.new_stream(peer_id)
|
stream = await self.network.new_stream(peer_id, protocol_id)
|
||||||
stream.set_protocol(protocol_id)
|
stream.set_protocol(protocol_id)
|
||||||
return stream
|
return stream
|
||||||
|
|
|
@ -8,7 +8,7 @@ from Crypto.PublicKey import RSA
|
||||||
class Libp2p(object):
|
class Libp2p(object):
|
||||||
|
|
||||||
def __init__(self, idOpt = None, \
|
def __init__(self, idOpt = None, \
|
||||||
transportOpt = ["/ip4/127.0.0.1/tcp/10000"], \
|
transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \
|
||||||
muxerOpt = ["mplex/6.7.0"], \
|
muxerOpt = ["mplex/6.7.0"], \
|
||||||
secOpt = ["secio"], \
|
secOpt = ["secio"], \
|
||||||
peerstore = PeerStore()):
|
peerstore = PeerStore()):
|
||||||
|
@ -25,16 +25,15 @@ class Libp2p(object):
|
||||||
self.secOpt = secOpt
|
self.secOpt = secOpt
|
||||||
self.peerstore = peerstore
|
self.peerstore = peerstore
|
||||||
|
|
||||||
def new_node(self):
|
async def new_node(self):
|
||||||
|
|
||||||
upgrader = TransportUpgrader(self.secOpt, self.transportOpt)
|
upgrader = TransportUpgrader(self.secOpt, self.transportOpt)
|
||||||
swarm = Swarm(self.idOpt, self.peerstore, upgrader)
|
swarm = Swarm(self.idOpt, self.peerstore, upgrader)
|
||||||
tcp = TCP()
|
tcp = TCP()
|
||||||
swarm.add_transport(tcp)
|
swarm.add_transport(tcp)
|
||||||
swarm.listen(self.transportOpts)
|
await swarm.listen(self.transportOpt[0])
|
||||||
host = BasicHost(swarm)
|
host = BasicHost(swarm)
|
||||||
|
|
||||||
# TODO MuxedConnection currently contains all muxing logic
|
# TODO MuxedConnection currently contains all muxing logic (move to a Muxer)
|
||||||
# TODO routing unimplemented
|
# TODO routing unimplemented
|
||||||
|
|
||||||
return host
|
return host
|
||||||
|
|
|
@ -17,8 +17,9 @@ class MuxedConn(IMuxedConn):
|
||||||
self.initiator = initiator
|
self.initiator = initiator
|
||||||
self.buffers = {}
|
self.buffers = {}
|
||||||
self.streams = {}
|
self.streams = {}
|
||||||
|
self.stream_queue = asyncio.Queue()
|
||||||
|
|
||||||
self.add_incoming_task()
|
asyncio.ensure_future(self.handle_incoming())
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""
|
"""
|
||||||
|
@ -33,7 +34,12 @@ class MuxedConn(IMuxedConn):
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def read_buffer(self, stream_id):
|
async def read_buffer(self, stream_id):
|
||||||
|
# Empty buffer or nonexistent stream
|
||||||
|
# TODO: propagate up timeout exception and catch
|
||||||
|
if stream_id not in self.buffers or not self.buffers[stream_id]:
|
||||||
|
await self.handle_incoming()
|
||||||
|
|
||||||
data = self.buffers[stream_id]
|
data = self.buffers[stream_id]
|
||||||
self.buffers[stream_id] = bytearray()
|
self.buffers[stream_id] = bytearray()
|
||||||
return data
|
return data
|
||||||
|
@ -43,37 +49,22 @@ class MuxedConn(IMuxedConn):
|
||||||
creates a new muxed_stream
|
creates a new muxed_stream
|
||||||
:return: a new stream
|
:return: a new stream
|
||||||
"""
|
"""
|
||||||
stream = MuxedStream(peer_id, multi_addr, self)
|
stream = MuxedStream(stream_id, multi_addr, self)
|
||||||
self.streams[stream_id] = stream
|
self.streams[stream_id] = stream
|
||||||
self.buffers[stream_id] = bytearray()
|
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
def accept_stream(self):
|
async def accept_stream(self):
|
||||||
"""
|
"""
|
||||||
accepts a muxed stream opened by the other end
|
accepts a muxed stream opened by the other end
|
||||||
:return: the accepted stream
|
:return: the accepted stream
|
||||||
"""
|
"""
|
||||||
data = bytearray()
|
|
||||||
while True:
|
|
||||||
chunk = self.raw_conn.reader.read(100)
|
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
data += chunk
|
|
||||||
header, end_index = decode_uvarint(data, 0)
|
|
||||||
length, end_index = decode_uvarint(data, end_index)
|
|
||||||
message = data[end_index, end_index + length]
|
|
||||||
|
|
||||||
flag = header & 0x07
|
|
||||||
stream_id = header >> 3
|
|
||||||
|
|
||||||
# TODO update to pull out protocol_id from message
|
# TODO update to pull out protocol_id from message
|
||||||
protocol_id = "/echo/1.0.0"
|
protocol_id = "/echo/1.0.0"
|
||||||
|
stream_id = await self.stream_queue.get()
|
||||||
stream = MuxedStream(stream_id, False, self)
|
stream = MuxedStream(stream_id, False, self)
|
||||||
|
|
||||||
return stream, stream_id, protocol_id
|
return stream, stream_id, protocol_id
|
||||||
|
|
||||||
def send_message(self, flag, data, stream_id):
|
async def send_message(self, flag, data, stream_id):
|
||||||
"""
|
"""
|
||||||
sends a message over the connection
|
sends a message over the connection
|
||||||
:param header: header to use
|
:param header: header to use
|
||||||
|
@ -86,7 +77,8 @@ class MuxedConn(IMuxedConn):
|
||||||
header = encode_uvarint(header)
|
header = encode_uvarint(header)
|
||||||
data_length = encode_uvarint(len(data))
|
data_length = encode_uvarint(len(data))
|
||||||
_bytes = header + data_length + data
|
_bytes = header + data_length + data
|
||||||
return self.write_to_stream(_bytes)
|
|
||||||
|
return await self.write_to_stream(_bytes)
|
||||||
|
|
||||||
async def write_to_stream(self, _bytes):
|
async def write_to_stream(self, _bytes):
|
||||||
self.raw_conn.writer.write(_bytes)
|
self.raw_conn.writer.write(_bytes)
|
||||||
|
@ -95,25 +87,23 @@ class MuxedConn(IMuxedConn):
|
||||||
|
|
||||||
async def handle_incoming(self):
|
async def handle_incoming(self):
|
||||||
data = bytearray()
|
data = bytearray()
|
||||||
while True:
|
try:
|
||||||
chunk = self.raw_conn.reader.read(100)
|
chunk = await asyncio.wait_for(self.raw_conn.reader.read(1024), timeout=5)
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
data += chunk
|
data += chunk
|
||||||
header, end_index = decode_uvarint(data, 0)
|
|
||||||
length, end_index = decode_uvarint(data, end_index)
|
|
||||||
message = data[end_index, end_index + length]
|
|
||||||
|
|
||||||
# Deal with other types of messages
|
header, end_index = decode_uvarint(data, 0)
|
||||||
flag = header & 0x07
|
length, end_index = decode_uvarint(data, end_index + 1)
|
||||||
stream_id = header >> 3
|
|
||||||
|
|
||||||
self.buffers[stream_id] = self.buffers[stream_id] + message
|
message = data[-length:]
|
||||||
# Read header
|
|
||||||
# Read message length
|
|
||||||
# Read message into corresponding buffer
|
|
||||||
|
|
||||||
def add_incoming_task(self):
|
# Deal with other types of messages
|
||||||
loop = asyncio.get_event_loop()
|
flag = header & 0x07
|
||||||
handle_incoming_task = loop.create_task(self.handle_incoming())
|
stream_id = header >> 3
|
||||||
handle_incoming_task.add_done_callback(self.add_incoming_task)
|
|
||||||
|
if stream_id not in self.buffers:
|
||||||
|
self.buffers[stream_id] = message
|
||||||
|
await self.stream_queue.put(stream_id)
|
||||||
|
else:
|
||||||
|
self.buffers[stream_id] = self.buffers[stream_id] + message
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
print('timeout!')
|
||||||
|
|
|
@ -36,19 +36,19 @@ class MuxedStream(IMuxedStream):
|
||||||
else:
|
else:
|
||||||
return HEADER_TAGS[action] - 1
|
return HEADER_TAGS[action] - 1
|
||||||
|
|
||||||
def read(self):
|
async def read(self):
|
||||||
"""
|
"""
|
||||||
read messages associated with stream from buffer til end of file
|
read messages associated with stream from buffer til end of file
|
||||||
:return: bytes of input
|
:return: bytes of input
|
||||||
"""
|
"""
|
||||||
return self.muxed_conn.read_buffer(self.id)
|
return await self.muxed_conn.read_buffer(self.id)
|
||||||
|
|
||||||
def write(self, data):
|
async def write(self, data):
|
||||||
"""
|
"""
|
||||||
write to stream
|
write to stream
|
||||||
:return: number of bytes written
|
:return: number of bytes written
|
||||||
"""
|
"""
|
||||||
return self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id)
|
return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -2,6 +2,8 @@ class MultiAddr:
|
||||||
|
|
||||||
# Validates input string and constructs internal representation.
|
# Validates input string and constructs internal representation.
|
||||||
def __init__(self, addr):
|
def __init__(self, addr):
|
||||||
|
self.protocol_map = dict()
|
||||||
|
|
||||||
# Empty multiaddrs are valid.
|
# Empty multiaddrs are valid.
|
||||||
if not addr:
|
if not addr:
|
||||||
self.protocol_map = dict()
|
self.protocol_map = dict()
|
||||||
|
|
|
@ -19,19 +19,19 @@ class NetStream(INetStream):
|
||||||
"""
|
"""
|
||||||
self.protocol_id = protocol_id
|
self.protocol_id = protocol_id
|
||||||
|
|
||||||
def read(self):
|
async def read(self):
|
||||||
"""
|
"""
|
||||||
read from stream
|
read from stream
|
||||||
:return: bytes of input until EOF
|
:return: bytes of input until EOF
|
||||||
"""
|
"""
|
||||||
return self.muxed_stream.read()
|
return await self.muxed_stream.read()
|
||||||
|
|
||||||
def write(self, bytes):
|
async def write(self, bytes):
|
||||||
"""
|
"""
|
||||||
write to stream
|
write to stream
|
||||||
:return: number of bytes written
|
:return: number of bytes written
|
||||||
"""
|
"""
|
||||||
return self.muxed_stream.write(bytes)
|
return await self.muxed_stream.write(bytes)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -22,13 +22,20 @@ class Swarm(INetwork):
|
||||||
"""
|
"""
|
||||||
self.stream_handlers[protocol_id] = stream_handler
|
self.stream_handlers[protocol_id] = stream_handler
|
||||||
|
|
||||||
def new_stream(self, peer_id, protocol_id):
|
async def new_stream(self, peer_id, protocol_id):
|
||||||
"""
|
"""
|
||||||
:param peer_id: peer_id of destination
|
:param peer_id: peer_id of destination
|
||||||
:param protocol_id: protocol id
|
:param protocol_id: protocol id
|
||||||
:return: net stream instance
|
:return: net stream instance
|
||||||
"""
|
"""
|
||||||
muxed_conn = None
|
# Get peer info from peer store
|
||||||
|
addrs = self.peerstore.addrs(peer_id)
|
||||||
|
|
||||||
|
if not addrs:
|
||||||
|
raise SwarmException("No known addresses to peer")
|
||||||
|
|
||||||
|
multiaddr = addrs[0]
|
||||||
|
|
||||||
if peer_id in self.connections:
|
if peer_id in self.connections:
|
||||||
"""
|
"""
|
||||||
If muxed connection already exists for peer_id,
|
If muxed connection already exists for peer_id,
|
||||||
|
@ -37,14 +44,8 @@ class Swarm(INetwork):
|
||||||
"""
|
"""
|
||||||
muxed_conn = self.connections[peer_id]
|
muxed_conn = self.connections[peer_id]
|
||||||
else:
|
else:
|
||||||
# Get peer info from peer store
|
|
||||||
addrs = self.peerstore.addrs(peer_id)
|
|
||||||
|
|
||||||
# Transport dials peer (gets back a raw conn)
|
# Transport dials peer (gets back a raw conn)
|
||||||
if not addrs:
|
raw_conn = await self.transport.dial(MultiAddr(multiaddr))
|
||||||
raise SwarmException("No known addresses to peer")
|
|
||||||
first_addr = addrs[0]
|
|
||||||
raw_conn = self.transport.dial(first_addr)
|
|
||||||
|
|
||||||
# Use upgrader to upgrade raw conn to muxed conn
|
# Use upgrader to upgrade raw conn to muxed conn
|
||||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)
|
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)
|
||||||
|
@ -54,28 +55,30 @@ class Swarm(INetwork):
|
||||||
|
|
||||||
# Use muxed conn to open stream, which returns
|
# Use muxed conn to open stream, which returns
|
||||||
# a muxed stream
|
# a muxed stream
|
||||||
stream_id = str(uuid.uuid4())
|
# TODO: use better stream IDs
|
||||||
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, first_addr)
|
stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3
|
||||||
|
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr)
|
||||||
|
|
||||||
# Create a net stream
|
# Create a net stream
|
||||||
net_stream = NetStream(muxed_stream)
|
net_stream = NetStream(muxed_stream)
|
||||||
|
|
||||||
return net_stream
|
return net_stream
|
||||||
|
|
||||||
def listen(self, *args):
|
async def listen(self, *args):
|
||||||
"""
|
"""
|
||||||
:param *args: one or many multiaddrs to start listening on
|
:param *args: one or many multiaddrs to start listening on
|
||||||
:return: true if at least one success
|
:return: true if at least one success
|
||||||
"""
|
"""
|
||||||
|
"""
|
||||||
# For each multiaddr in args
|
For each multiaddr in args
|
||||||
# Check if a listener for multiaddr exists already
|
Check if a listener for multiaddr exists already
|
||||||
# If listener already exists, continue
|
If listener already exists, continue
|
||||||
# Otherwise, do the following:
|
Otherwise:
|
||||||
# Pass multiaddr into conn handler
|
Capture multiaddr in conn handler
|
||||||
# Have conn handler delegate to stream handler
|
Have conn handler delegate to stream handler
|
||||||
# Call listener listen with the multiaddr
|
Call listener listen with the multiaddr
|
||||||
# Map multiaddr to listener
|
Map multiaddr to listener
|
||||||
|
"""
|
||||||
for multiaddr_str in args:
|
for multiaddr_str in args:
|
||||||
if multiaddr_str in self.listeners:
|
if multiaddr_str in self.listeners:
|
||||||
return True
|
return True
|
||||||
|
@ -83,27 +86,28 @@ class Swarm(INetwork):
|
||||||
multiaddr = MultiAddr(multiaddr_str)
|
multiaddr = MultiAddr(multiaddr_str)
|
||||||
multiaddr_dict = multiaddr.to_options()
|
multiaddr_dict = multiaddr.to_options()
|
||||||
|
|
||||||
def conn_handler(reader, writer):
|
async def conn_handler(reader, writer):
|
||||||
# Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr)
|
# Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr)
|
||||||
raw_conn = RawConnection(multiaddr_dict.host, multiaddr_dict.port, reader, writer)
|
raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer)
|
||||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
|
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
|
||||||
|
|
||||||
muxed_stream, stream_id, protocol_id = muxed_conn.accept_stream()
|
muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream()
|
||||||
net_stream = NetStream(muxed_stream)
|
net_stream = NetStream(muxed_stream)
|
||||||
net_stream.set_protocol(protocol_id)
|
net_stream.set_protocol(protocol_id)
|
||||||
|
|
||||||
# Give to stream handler
|
# Give to stream handler
|
||||||
# TODO: handle case when stream handler is set
|
# TODO: handle case when stream handler is set
|
||||||
self.stream_handlers[protocol_id](net_stream)
|
# TODO: handle case of multiple protocols over same raw connection
|
||||||
|
await self.stream_handlers[protocol_id](net_stream)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Success
|
# Success
|
||||||
listener = self.transport.create_listener(conn_handler)
|
listener = self.transport.create_listener(conn_handler)
|
||||||
listener.listen(multiaddr)
|
await listener.listen(multiaddr)
|
||||||
return True
|
return True
|
||||||
except IOError:
|
except IOError:
|
||||||
# Failed. Continue looping.
|
# Failed. Continue looping.
|
||||||
print("Failed to connect to: " + multiaddr)
|
print("Failed to connect to: " + str(multiaddr))
|
||||||
|
|
||||||
# No multiaddr succeeded
|
# No multiaddr succeeded
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -2,3 +2,4 @@ asyncio
|
||||||
pylint
|
pylint
|
||||||
pytest
|
pytest
|
||||||
pycryptodome
|
pycryptodome
|
||||||
|
pytest-asyncio
|
||||||
|
|
|
@ -13,25 +13,26 @@ class TCP(ITransport):
|
||||||
def __init__(self, handler_function=None):
|
def __init__(self, handler_function=None):
|
||||||
self.multiaddrs = []
|
self.multiaddrs = []
|
||||||
self.server = None
|
self.server = None
|
||||||
self.handler = staticmethod(handler_function)
|
self.handler = handler_function
|
||||||
|
|
||||||
def listen(self, multiaddr):
|
async def listen(self, multiaddr):
|
||||||
"""
|
"""
|
||||||
put listener in listening mode and wait for incoming connections
|
put listener in listening mode and wait for incoming connections
|
||||||
:param multiaddr: multiaddr of peer
|
:param multiaddr: multiaddr of peer
|
||||||
:return: return True if successful
|
:return: return True if successful
|
||||||
"""
|
"""
|
||||||
# TODO check for exceptions
|
_multiaddr = multiaddr
|
||||||
if "ipfs" in multiaddr.get_protocols():
|
|
||||||
# ipfs_id = multiaddr.get_ipfs_id()
|
|
||||||
_multiaddr = multiaddr.remove_protocol("ipfs")
|
|
||||||
|
|
||||||
self.multiaddrs.append(multiaddr)
|
# TODO check for exceptions
|
||||||
|
if "ipfs" in _multiaddr.get_protocols():
|
||||||
|
# ipfs_id = multiaddr.get_ipfs_id()
|
||||||
|
_multiaddr.remove_protocol("ipfs")
|
||||||
|
|
||||||
|
self.multiaddrs.append(_multiaddr)
|
||||||
multiaddr_dict = _multiaddr.to_options()
|
multiaddr_dict = _multiaddr.to_options()
|
||||||
loop = asyncio.get_event_loop()
|
coroutine = asyncio.start_server(self.handler, multiaddr_dict['host'],\
|
||||||
coroutine = asyncio.start_server(self.handler, multiaddr_dict.host,\
|
multiaddr_dict['port'])
|
||||||
multiaddr_dict.port, loop=loop)
|
self.server = await coroutine
|
||||||
self.server = loop.run_until_complete(coroutine)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def get_addrs(self):
|
def get_addrs(self):
|
||||||
|
@ -59,17 +60,19 @@ class TCP(ITransport):
|
||||||
self.server = None
|
self.server = None
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def dial(self, multiaddr, options=None):
|
async def dial(self, multiaddr, options=None):
|
||||||
"""
|
"""
|
||||||
dial a transport to peer listening on multiaddr
|
dial a transport to peer listening on multiaddr
|
||||||
:param multiaddr: multiaddr of peer
|
:param multiaddr: multiaddr of peer
|
||||||
:param options: optional object
|
:param options: optional object
|
||||||
:return: True if successful
|
:return: True if successful
|
||||||
"""
|
"""
|
||||||
_multiaddr_dict = multiaddr.to_dict()
|
_multiaddr_dict = multiaddr.to_options()
|
||||||
host = _multiaddr_dict.host
|
host = _multiaddr_dict['host']
|
||||||
port = _multiaddr_dict.port
|
port = _multiaddr_dict['port']
|
||||||
reader, writer = open_conn(host, port)
|
|
||||||
|
reader, writer = await asyncio.open_connection(host, port)
|
||||||
|
|
||||||
return RawConnection(host, port, reader, writer)
|
return RawConnection(host, port, reader, writer)
|
||||||
|
|
||||||
def create_listener(self, handler_function, options=None):
|
def create_listener(self, handler_function, options=None):
|
||||||
|
@ -81,7 +84,3 @@ class TCP(ITransport):
|
||||||
:return: a listener object that implements listener_interface.py
|
:return: a listener object that implements listener_interface.py
|
||||||
"""
|
"""
|
||||||
return self.Listener(handler_function)
|
return self.Listener(handler_function)
|
||||||
|
|
||||||
async def open_conn(host, port):
|
|
||||||
reader, writer = await asyncio.open_connection(host, port)
|
|
||||||
return reader, writer
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user