commit
c106d2291a
|
@ -1,5 +1,6 @@
|
|||
from .host_interface import IHost
|
||||
|
||||
|
||||
# Upon host creation, host takes in options,
|
||||
# including the list of addresses on which to listen.
|
||||
# Host then parses these options and delegates to its Network instance,
|
||||
|
@ -51,7 +52,7 @@ class BasicHost(IHost):
|
|||
async def new_stream(self, peer_id, protocol_id):
|
||||
"""
|
||||
:param peer_id: peer_id that host is connecting
|
||||
:param proto_id: protocol id that stream runs on
|
||||
:param protocol_id: protocol id that stream runs on
|
||||
:return: true if successful
|
||||
"""
|
||||
# TODO: host should return a mux stream not a raw stream
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class IHost(ABC):
|
||||
|
||||
@abstractmethod
|
||||
|
@ -36,9 +37,8 @@ class IHost(ABC):
|
|||
# protocol_id can be a list of protocol_ids
|
||||
# stream will decide which protocol_id to run on
|
||||
@abstractmethod
|
||||
def new_stream(self, context, peer_id, protocol_id):
|
||||
def new_stream(self, peer_id, protocol_id):
|
||||
"""
|
||||
:param context: a context instance
|
||||
:param peer_id: peer_id that host is connecting
|
||||
:param proto_id: protocol id that stream runs on
|
||||
:return: true if successful
|
||||
|
|
|
@ -1,37 +1,35 @@
|
|||
from Crypto.PublicKey import RSA
|
||||
from peer.peerstore import PeerStore
|
||||
from network.swarm import Swarm
|
||||
from host.basic_host import BasicHost
|
||||
from transport.upgrader import TransportUpgrader
|
||||
from transport.tcp.tcp import TCP
|
||||
from Crypto.PublicKey import RSA
|
||||
|
||||
class Libp2p(object):
|
||||
|
||||
def __init__(self, idOpt = None, \
|
||||
transportOpt = ["/ip4/127.0.0.1/tcp/8001"], \
|
||||
muxerOpt = ["mplex/6.7.0"], \
|
||||
secOpt = ["secio"], \
|
||||
peerstore = PeerStore()):
|
||||
class Libp2p():
|
||||
|
||||
if idOpt:
|
||||
self.idOpt = idOpt
|
||||
def __init__(self, id_opt=None, transport_opt=["/ip4/127.0.0.1/tcp/8001"], \
|
||||
muxer_opt=["mplex/6.7.0"], sec_opt=["secio"], peerstore=PeerStore()):
|
||||
|
||||
if id_opt:
|
||||
self.id_opt = id_opt
|
||||
else:
|
||||
new_key = RSA.generate(2048, e=65537)
|
||||
self.idOpt = new_key.publickey().exportKey("PEM")
|
||||
self.id_opt = new_key.publickey().exportKey("PEM")
|
||||
self.private_key = new_key.exportKey("PEM")
|
||||
|
||||
self.transportOpt = transportOpt
|
||||
self.muxerOpt = muxerOpt
|
||||
self.secOpt = secOpt
|
||||
self.transport_opt = transport_opt
|
||||
self.muxer_opt = muxer_opt
|
||||
self.sec_opt = sec_opt
|
||||
self.peerstore = peerstore
|
||||
|
||||
async def new_node(self):
|
||||
|
||||
upgrader = TransportUpgrader(self.secOpt, self.transportOpt)
|
||||
swarm = Swarm(self.idOpt, self.peerstore, upgrader)
|
||||
upgrader = TransportUpgrader(self.sec_opt, self.transport_opt)
|
||||
swarm = Swarm(self.id_opt, self.peerstore, upgrader)
|
||||
tcp = TCP()
|
||||
swarm.add_transport(tcp)
|
||||
await swarm.listen(self.transportOpt[0])
|
||||
await swarm.listen(self.transport_opt[0])
|
||||
host = BasicHost(swarm)
|
||||
|
||||
# TODO MuxedConnection currently contains all muxing logic (move to a Muxer)
|
||||
|
|
|
@ -3,6 +3,7 @@ from .utils import encode_uvarint, decode_uvarint
|
|||
from .muxed_connection_interface import IMuxedConn
|
||||
from .muxed_stream import MuxedStream
|
||||
|
||||
|
||||
class MuxedConn(IMuxedConn):
|
||||
"""
|
||||
reference: https://github.com/libp2p/go-mplex/blob/master/multiplex.go
|
||||
|
@ -47,6 +48,10 @@ class MuxedConn(IMuxedConn):
|
|||
def open_stream(self, protocol_id, stream_id, peer_id, multi_addr):
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: protocol_id of stream
|
||||
:param stream_id: stream_id of stream
|
||||
:param peer_id: peer_id that stream connects to
|
||||
:param multi_addr: multi_addr that stream connects to
|
||||
:return: a new stream
|
||||
"""
|
||||
stream = MuxedStream(stream_id, multi_addr, self)
|
||||
|
|
|
@ -23,11 +23,13 @@ class IMuxedConn(ABC):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def open_stream(self, protocol_id, stream_name):
|
||||
def open_stream(self, protocol_id, stream_id, peer_id, multi_addr):
|
||||
"""
|
||||
creates a new muxed_stream
|
||||
:param protocol_id: id to be associated with stream
|
||||
:param stream_name: name as part of identifier
|
||||
:param protocol_id: protocol_id of stream
|
||||
:param stream_id: stream_id of stream
|
||||
:param peer_id: peer_id that stream connects to
|
||||
:param multi_addr: multi_addr that stream connects to
|
||||
:return: a new stream
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import asyncio
|
||||
from .muxed_stream_interface import IMuxedStream
|
||||
from .constants import HEADER_TAGS
|
||||
|
||||
|
@ -15,12 +14,12 @@ class MuxedStream(IMuxedStream):
|
|||
:param initiator: boolean if this is an initiator
|
||||
:param muxed_conn: muxed connection of this muxed_stream
|
||||
"""
|
||||
self.id = stream_id
|
||||
self.stream_id = stream_id
|
||||
self.initiator = initiator
|
||||
self.muxed_conn = muxed_conn
|
||||
|
||||
# self.read_deadline = None
|
||||
# self.write_deadline = None
|
||||
self.read_deadline = None
|
||||
self.write_deadline = None
|
||||
|
||||
self.local_closed = False
|
||||
self.remote_closed = False
|
||||
|
@ -33,22 +32,22 @@ class MuxedStream(IMuxedStream):
|
|||
"""
|
||||
if self.initiator:
|
||||
return HEADER_TAGS[action]
|
||||
else:
|
||||
return HEADER_TAGS[action] - 1
|
||||
|
||||
return HEADER_TAGS[action] - 1
|
||||
|
||||
async def read(self):
|
||||
"""
|
||||
read messages associated with stream from buffer til end of file
|
||||
:return: bytes of input
|
||||
"""
|
||||
return await self.muxed_conn.read_buffer(self.id)
|
||||
return await self.muxed_conn.read_buffer(self.stream_id)
|
||||
|
||||
async def write(self, data):
|
||||
"""
|
||||
write to stream
|
||||
:return: number of bytes written
|
||||
"""
|
||||
return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.id)
|
||||
return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id)
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
|
@ -59,8 +58,8 @@ class MuxedStream(IMuxedStream):
|
|||
if self.local_closed and self.remote_closed:
|
||||
return True
|
||||
|
||||
self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.id)
|
||||
self.muxed_conn.streams.pop(self.id)
|
||||
self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id)
|
||||
self.muxed_conn.streams.pop(self.stream_id)
|
||||
|
||||
self.local_closed = True
|
||||
self.remote_closed = True
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
from .muxed_stream import MuxedStream
|
||||
from .muxed_connection import MuxedConn
|
||||
|
||||
|
||||
class Multiplex(object):
|
||||
"""
|
||||
muxing logic currently lives in MuxedConn
|
||||
reference: https://github.com/whyrusleeping/go-smux-multiplex/blob/master/multiplex.go
|
||||
"""
|
||||
def __init__(self, conn, initiator):
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import asyncio
|
||||
from .raw_connection_interface import IRawConnection
|
||||
|
||||
|
||||
class RawConnection(IRawConnection):
|
||||
|
||||
def __init__(self, ip, port, reader, writer):
|
||||
|
@ -12,15 +12,3 @@ class RawConnection(IRawConnection):
|
|||
|
||||
def close(self):
|
||||
self.writer.close()
|
||||
|
||||
# def __init__(self, ip, port):
|
||||
# self.conn_ip = ip
|
||||
# self.conn_port = port
|
||||
# self.reader, self.writer = self.open_connection()
|
||||
|
||||
# async def open_connection(self):
|
||||
# """
|
||||
# opens a connection on self.ip and self.port
|
||||
# :return: a raw connection
|
||||
# """
|
||||
# return await asyncio.open_connection(self.conn_ip, self.conn_port)
|
||||
|
|
|
@ -1,15 +1,7 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from abc import ABC
|
||||
|
||||
|
||||
class IRawConnection(ABC):
|
||||
"""
|
||||
A Raw Connection provides a Reader and a Writer
|
||||
open_connection should return such a connection
|
||||
"""
|
||||
|
||||
# @abstractmethod
|
||||
# async def open_connection(self):
|
||||
# """
|
||||
# opens a connection on ip and port
|
||||
# :return: a raw connection
|
||||
# """
|
||||
# pass
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class INetwork(ABC):
|
||||
|
||||
@abstractmethod
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
import asyncio
|
||||
from .net_stream_interface import INetStream
|
||||
|
||||
|
||||
class NetStream(INetStream):
|
||||
|
||||
def __init__(self, muxed_stream):
|
||||
self.muxed_stream = muxed_stream
|
||||
self.protocol_id = None
|
||||
|
||||
def get_protocol(self):
|
||||
"""
|
||||
|
@ -26,12 +27,12 @@ class NetStream(INetStream):
|
|||
"""
|
||||
return await self.muxed_stream.read()
|
||||
|
||||
async def write(self, bytes):
|
||||
async def write(self, data):
|
||||
"""
|
||||
write to stream
|
||||
:return: number of bytes written
|
||||
"""
|
||||
return await self.muxed_stream.write(bytes)
|
||||
return await self.muxed_stream.write(data)
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
class INetStream(ABC):
|
||||
|
||||
def __init__(self, peer_id, multi_addr, connection):
|
||||
self.peer_id = peer_id
|
||||
self.multi_addr = multi_addr
|
||||
self.connection = connection
|
||||
class INetStream(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def get_protocol(self):
|
||||
|
|
|
@ -4,6 +4,7 @@ from .stream.net_stream import NetStream
|
|||
from .multiaddr import MultiAddr
|
||||
from .connection.raw_connection import RawConnection
|
||||
|
||||
|
||||
class Swarm(INetwork):
|
||||
|
||||
def __init__(self, my_peer_id, peerstore, upgrader):
|
||||
|
@ -13,6 +14,7 @@ class Swarm(INetwork):
|
|||
self.connections = dict()
|
||||
self.listeners = dict()
|
||||
self.stream_handlers = dict()
|
||||
self.transport = None
|
||||
|
||||
def set_stream_handler(self, protocol_id, stream_handler):
|
||||
"""
|
||||
|
@ -37,11 +39,8 @@ class Swarm(INetwork):
|
|||
multiaddr = addrs[0]
|
||||
|
||||
if peer_id in self.connections:
|
||||
"""
|
||||
If muxed connection already exists for peer_id,
|
||||
set muxed connection equal to
|
||||
existing muxed connection
|
||||
"""
|
||||
# If muxed connection already exists for peer_id,
|
||||
# set muxed connection equal to existing muxed connection
|
||||
muxed_conn = self.connections[peer_id]
|
||||
else:
|
||||
# Transport dials peer (gets back a raw conn)
|
||||
|
@ -68,8 +67,7 @@ class Swarm(INetwork):
|
|||
"""
|
||||
:param *args: one or many multiaddrs to start listening on
|
||||
:return: true if at least one success
|
||||
"""
|
||||
"""
|
||||
|
||||
For each multiaddr in args
|
||||
Check if a listener for multiaddr exists already
|
||||
If listener already exists, continue
|
||||
|
@ -87,8 +85,10 @@ class Swarm(INetwork):
|
|||
multiaddr_dict = multiaddr.to_options()
|
||||
|
||||
async def conn_handler(reader, writer):
|
||||
# Upgrade reader/write to a net_stream and pass to appropriate stream handler (using multiaddr)
|
||||
raw_conn = RawConnection(multiaddr_dict['host'], multiaddr_dict['port'], reader, writer)
|
||||
# Upgrade reader/write to a net_stream and pass \
|
||||
# to appropriate stream handler (using multiaddr)
|
||||
raw_conn = RawConnection(multiaddr_dict['host'], \
|
||||
multiaddr_dict['port'], reader, writer)
|
||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)
|
||||
|
||||
muxed_stream, stream_id, protocol_id = await muxed_conn.accept_stream()
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
host.go --> config.go
|
||||
config.go: newNode --> swarm.go: newSwarm
|
||||
newSwarm | initializes data stores
|
||||
|
|
@ -4,8 +4,8 @@ from libp2p.libp2p import Libp2p
|
|||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_messages():
|
||||
libA = Libp2p(transportOpt=["/ip4/127.0.0.1/tcp/8001/ipfs/hostA"])
|
||||
libB = Libp2p(transportOpt=["/ip4/127.0.0.1/tcp/8000/ipfs/hostB"])
|
||||
libA = Libp2p(transport_opt=["/ip4/127.0.0.1/tcp/8001/ipfs/hostA"])
|
||||
libB = Libp2p(transport_opt=["/ip4/127.0.0.1/tcp/8000/ipfs/hostB"])
|
||||
|
||||
hostA = await libA.new_node()
|
||||
hostB = await libB.new_node()
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class IListener(ABC):
|
||||
|
||||
@abstractmethod
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ITransport(ABC):
|
||||
|
||||
@abstractmethod
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from muxer.mplex.muxed_connection import MuxedConn
|
||||
|
||||
class TransportUpgrader(object):
|
||||
|
||||
class TransportUpgrader():
|
||||
|
||||
def __init__(self, secOpt, muxerOpt):
|
||||
self.sec = secOpt
|
||||
|
@ -20,8 +21,7 @@ class TransportUpgrader(object):
|
|||
"""
|
||||
upgrade raw connection to muxed connection
|
||||
"""
|
||||
# For PoC, no security
|
||||
# Default to mplex
|
||||
|
||||
# For PoC, no security, default to mplex
|
||||
# TODO do exchange to determine multiplexer
|
||||
return MuxedConn(conn, initiator)
|
||||
|
|
Loading…
Reference in New Issue
Block a user