This commit is contained in:
zixuanzh 2018-11-12 13:02:49 -05:00
parent b4272918d9
commit c5c9d3e5c9
17 changed files with 66 additions and 84 deletions

View File

@ -1,5 +1,6 @@
from .host_interface import IHost from .host_interface import IHost
# Upon host creation, host takes in options, # Upon host creation, host takes in options,
# including the list of addresses on which to listen. # including the list of addresses on which to listen.
# Host then parses these options and delegates to its Network instance, # Host then parses these options and delegates to its Network instance,
@ -51,7 +52,7 @@ class BasicHost(IHost):
async def new_stream(self, peer_id, protocol_id): async def new_stream(self, peer_id, protocol_id):
""" """
:param peer_id: peer_id that host is connecting :param peer_id: peer_id that host is connecting
:param proto_id: protocol id that stream runs on :param protocol_id: protocol id that stream runs on
:return: true if successful :return: true if successful
""" """
# TODO: host should return a mux stream not a raw stream # TODO: host should return a mux stream not a raw stream

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
class IHost(ABC): class IHost(ABC):
@abstractmethod @abstractmethod
@ -36,9 +37,8 @@ class IHost(ABC):
# protocol_id can be a list of protocol_ids # protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
@abstractmethod @abstractmethod
def new_stream(self, context, peer_id, protocol_id): def new_stream(self, peer_id, protocol_id):
""" """
:param context: a context instance
:param peer_id: peer_id that host is connecting :param peer_id: peer_id that host is connecting
:param proto_id: protocol id that stream runs on :param proto_id: protocol id that stream runs on
:return: true if successful :return: true if successful

View File

@ -1,37 +1,35 @@
from Crypto.PublicKey import RSA
from peer.peerstore import PeerStore from peer.peerstore import PeerStore
from network.swarm import Swarm from network.swarm import Swarm
from host.basic_host import BasicHost from host.basic_host import BasicHost
from transport.upgrader import TransportUpgrader from transport.upgrader import TransportUpgrader
from transport.tcp.tcp import TCP from transport.tcp.tcp import TCP
from Crypto.PublicKey import RSA
class Libp2p(object):
def __init__(self, idOpt = None, \ class Libp2p():
transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \
muxerOpt = ["mplex/6.7.0"], \
secOpt = ["secio"], \
peerstore = PeerStore()):
if idOpt: def __init__(self, id_opt=None, transport_opt=["/ip4/127.0.0.1/tcp/8001"], \
self.idOpt = idOpt muxer_opt=["mplex/6.7.0"], sec_opt=["secio"], peerstore=PeerStore()):
if id_opt:
self.id_opt = id_opt
else: else:
new_key = RSA.generate(2048, e=65537) new_key = RSA.generate(2048, e=65537)
self.idOpt = new_key.publickey().exportKey("PEM") self.id_opt = new_key.publickey().exportKey("PEM")
self.private_key = new_key.exportKey("PEM") self.private_key = new_key.exportKey("PEM")
self.transportOpt = transportOpt self.transport_opt = transport_opt
self.muxerOpt = muxerOpt self.muxer_opt = muxer_opt
self.secOpt = secOpt self.sec_opt = sec_opt
self.peerstore = peerstore self.peerstore = peerstore
async def new_node(self): async def new_node(self):
upgrader = TransportUpgrader(self.secOpt, self.transportOpt) upgrader = TransportUpgrader(self.sec_opt, self.transport_opt)
swarm = Swarm(self.idOpt, self.peerstore, upgrader) swarm = Swarm(self.id_opt, self.peerstore, upgrader)
tcp = TCP() tcp = TCP()
swarm.add_transport(tcp) swarm.add_transport(tcp)
await swarm.listen(self.transportOpt[0]) await swarm.listen(self.transport_opt[0])
host = BasicHost(swarm) host = BasicHost(swarm)
# TODO MuxedConnection currently contains all muxing logic (move to a Muxer) # TODO MuxedConnection currently contains all muxing logic (move to a Muxer)

View File

@ -3,6 +3,7 @@ from .utils import encode_uvarint, decode_uvarint
from .muxed_connection_interface import IMuxedConn from .muxed_connection_interface import IMuxedConn
from .muxed_stream import MuxedStream from .muxed_stream import MuxedStream
class MuxedConn(IMuxedConn): class MuxedConn(IMuxedConn):
""" """
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
@ -47,6 +48,10 @@ class MuxedConn(IMuxedConn):
def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): def open_stream(self, protocol_id, stream_id, peer_id, multi_addr):
""" """
creates a new muxed_stream creates a new muxed_stream
:param protocol_id: protocol_id of stream
:param stream_id: stream_id of stream
:param peer_id: peer_id that stream connects to
:param multi_addr: multi_addr that stream connects to
:return: a new stream :return: a new stream
""" """
stream = MuxedStream(stream_id, multi_addr, self) stream = MuxedStream(stream_id, multi_addr, self)

View File

@ -23,11 +23,13 @@ class IMuxedConn(ABC):
pass pass
@abstractmethod @abstractmethod
def open_stream(self, protocol_id, stream_name): def open_stream(self, protocol_id, stream_id, peer_id, multi_addr):
""" """
creates a new muxed_stream creates a new muxed_stream
:param protocol_id: id to be associated with stream :param protocol_id: protocol_id of stream
:param stream_name: name as part of identifier :param stream_id: stream_id of stream
:param peer_id: peer_id that stream connects to
:param multi_addr: multi_addr that stream connects to
:return: a new stream :return: a new stream
""" """
pass pass

View File

@ -1,4 +1,3 @@
import asyncio
from .muxed_stream_interface import IMuxedStream from .muxed_stream_interface import IMuxedStream
from .constants import HEADER_TAGS from .constants import HEADER_TAGS
@ -15,12 +14,12 @@ class MuxedStream(IMuxedStream):
:param initiator: boolean if this is an initiator :param initiator: boolean if this is an initiator
:param muxed_conn: muxed connection of this muxed_stream :param muxed_conn: muxed connection of this muxed_stream
""" """
self.id = stream_id self.stream_id = stream_id
self.initiator = initiator self.initiator = initiator
self.muxed_conn = muxed_conn self.muxed_conn = muxed_conn
# self.read_deadline = None self.read_deadline = None
# self.write_deadline = None self.write_deadline = None
self.local_closed = False self.local_closed = False
self.remote_closed = False self.remote_closed = False
@ -33,7 +32,7 @@ class MuxedStream(IMuxedStream):
""" """
if self.initiator: if self.initiator:
return HEADER_TAGS[action] return HEADER_TAGS[action]
else:
return HEADER_TAGS[action] - 1 return HEADER_TAGS[action] - 1
async def read(self): async def read(self):
@ -41,14 +40,14 @@ class MuxedStream(IMuxedStream):
read messages associated with stream from buffer til end of file read messages associated with stream from buffer til end of file
:return: bytes of input :return: bytes of input
""" """
return await self.muxed_conn.read_buffer(self.id) return await self.muxed_conn.read_buffer(self.stream_id)
async def write(self, data): async def write(self, data):
""" """
write to stream write to stream
:return: number of bytes written :return: number of bytes written
""" """
return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id) return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id)
def close(self): def close(self):
""" """
@ -59,8 +58,8 @@ class MuxedStream(IMuxedStream):
if self.local_closed and self.remote_closed: if self.local_closed and self.remote_closed:
return True return True
self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.id) self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id)
self.muxed_conn.streams.pop(self.id) self.muxed_conn.streams.pop(self.stream_id)
self.local_closed = True self.local_closed = True
self.remote_closed = True self.remote_closed = True

View File

@ -1,8 +1,9 @@
from .muxed_stream import MuxedStream
from .muxed_connection import MuxedConn from .muxed_connection import MuxedConn
class Multiplex(object): class Multiplex(object):
""" """
muxing logic currently lives in MuxedConn
reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go
""" """
def __init__(self, conn, initiator): def __init__(self, conn, initiator):

View File

@ -1,6 +1,6 @@
import asyncio
from .raw_connection_interface import IRawConnection from .raw_connection_interface import IRawConnection
class RawConnection(IRawConnection): class RawConnection(IRawConnection):
def __init__(self, ip, port, reader, writer): def __init__(self, ip, port, reader, writer):
@ -12,15 +12,3 @@ class RawConnection(IRawConnection):
def close(self): def close(self):
self.writer.close() self.writer.close()
# def __init__(self, ip, port):
# self.conn_ip = ip
# self.conn_port = port
# self.reader, self.writer = self.open_connection()
# async def open_connection(self):
# """
# opens a connection on self.ip and self.port
# :return: a raw connection
# """
# return await asyncio.open_connection(self.conn_ip, self.conn_port)

View File

@ -1,15 +1,7 @@
from abc import ABC, abstractmethod from abc import ABC
class IRawConnection(ABC): class IRawConnection(ABC):
""" """
A Raw Connection provides a Reader and a Writer A Raw Connection provides a Reader and a Writer
open_connection should return such a connection
""" """
# @abstractmethod
# async def open_connection(self):
# """
# opens a connection on ip and port
# :return: a raw connection
# """
# pass

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
class INetwork(ABC): class INetwork(ABC):
@abstractmethod @abstractmethod

View File

@ -1,10 +1,11 @@
import asyncio
from .net_stream_interface import INetStream from .net_stream_interface import INetStream
class NetStream(INetStream): class NetStream(INetStream):
def __init__(self, muxed_stream): def __init__(self, muxed_stream):
self.muxed_stream = muxed_stream self.muxed_stream = muxed_stream
self.protocol_id = None
def get_protocol(self): def get_protocol(self):
""" """
@ -26,12 +27,12 @@ class NetStream(INetStream):
""" """
return await self.muxed_stream.read() return await self.muxed_stream.read()
async def write(self, bytes): async def write(self, data):
""" """
write to stream write to stream
:return: number of bytes written :return: number of bytes written
""" """
return await self.muxed_stream.write(bytes) return await self.muxed_stream.write(data)
def close(self): def close(self):
""" """

View File

@ -1,11 +1,7 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
class INetStream(ABC):
def __init__(self, peer_id, multi_addr, connection): class INetStream(ABC):
self.peer_id = peer_id
self.multi_addr = multi_addr
self.connection = connection
@abstractmethod @abstractmethod
def get_protocol(self): def get_protocol(self):

View File

@ -4,6 +4,7 @@ from .stream.net_stream import NetStream
from .multiaddr import MultiAddr from .multiaddr import MultiAddr
from .connection.raw_connection import RawConnection from .connection.raw_connection import RawConnection
class Swarm(INetwork): class Swarm(INetwork):
def __init__(self, my_peer_id, peerstore, upgrader): def __init__(self, my_peer_id, peerstore, upgrader):
@ -13,6 +14,7 @@ class Swarm(INetwork):
self.connections = dict() self.connections = dict()
self.listeners = dict() self.listeners = dict()
self.stream_handlers = dict() self.stream_handlers = dict()
self.transport = None
def set_stream_handler(self, protocol_id, stream_handler): def set_stream_handler(self, protocol_id, stream_handler):
""" """
@ -37,11 +39,8 @@ class Swarm(INetwork):
multiaddr = addrs[0] multiaddr = addrs[0]
if peer_id in self.connections: if peer_id in self.connections:
""" # If muxed connection already exists for peer_id,
If muxed connection already exists for peer_id, # set muxed connection equal to existing muxed connection
set muxed connection equal to
existing muxed connection
"""
muxed_conn = self.connections[peer_id] muxed_conn = self.connections[peer_id]
else: else:
# Transport dials peer (gets back a raw conn) # Transport dials peer (gets back a raw conn)
@ -68,8 +67,7 @@ class Swarm(INetwork):
""" """
:param *args: one or many multiaddrs to start listening on :param *args: one or many multiaddrs to start listening on
:return: true if at least one success :return: true if at least one success
"""
"""
For each multiaddr in args For each multiaddr in args
Check if a listener for multiaddr exists already Check if a listener for multiaddr exists already
If listener already exists, continue If listener already exists, continue
@ -87,8 +85,10 @@ class Swarm(INetwork):
multiaddr_dict = multiaddr.to_options() multiaddr_dict = multiaddr.to_options()
async def conn_handler(reader, writer): async def conn_handler(reader, writer):
# Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr) # Upgrade reader/write to a net_stream and pass \
raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer) # to appropriate stream handler (using multiaddr)
raw_conn = RawConnection(multiaddr_dict['host'], \
multiaddr_dict['port'], reader, writer)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False) muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream() muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream()

View File

@ -1,4 +0,0 @@
host.go --> config.go
config.go: newNode --> swarm.go: newSwarm
newSwarm | initializes data stores

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
class IListener(ABC): class IListener(ABC):
@abstractmethod @abstractmethod

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
class ITransport(ABC): class ITransport(ABC):
@abstractmethod @abstractmethod

View File

@ -1,6 +1,7 @@
from muxer.mplex.muxed_connection import MuxedConn from muxer.mplex.muxed_connection import MuxedConn
class TransportUpgrader(object):
class TransportUpgrader():
def __init__(self, secOpt, muxerOpt): def __init__(self, secOpt, muxerOpt):
self.sec = secOpt self.sec = secOpt
@ -20,8 +21,7 @@ class TransportUpgrader(object):
""" """
upgrade raw connection to muxed connection upgrade raw connection to muxed connection
""" """
# For PoC, no security
# Default to mplex
# For PoC, no security, default to mplex
# TODO do exchange to determine multiplexer # TODO do exchange to determine multiplexer
return MuxedConn(conn, initiator) return MuxedConn(conn, initiator)