Merge pull request #192 from NIC619/add_type_hint

Add type hints to host/, peer/, network/, pubsub/, routing/
This commit is contained in:
NIC Lin 2019-07-31 11:13:37 +08:00 committed by GitHub
commit 21e97407ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 760 additions and 343 deletions

View File

@ -128,6 +128,8 @@ disable=print-statement,
dict-keys-not-iterating, dict-keys-not-iterating,
dict-values-not-iterating, dict-values-not-iterating,
missing-docstring, missing-docstring,
cyclic-import,
duplicate-code,
# Enable the message, report, category or checker with the given id(s). You can # Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option # either give multiple identifier separated by comma (,) or put this option

View File

@ -1,5 +1,21 @@
from typing import (
Any,
Awaitable,
Callable,
List,
Sequence,
)
import multiaddr import multiaddr
from libp2p.network.network_interface import INetwork
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from .host_interface import IHost from .host_interface import IHost
# Upon host creation, host takes in options, # Upon host creation, host takes in options,
@ -8,50 +24,58 @@ from .host_interface import IHost
# telling it to listen on the given listen addresses. # telling it to listen on the given listen addresses.
StreamHandlerFn = Callable[[INetStream], Awaitable[None]]
class BasicHost(IHost): class BasicHost(IHost):
_network: INetwork
router: KadmeliaPeerRouter
peerstore: IPeerStore
# default options constructor # default options constructor
def __init__(self, network, router=None): def __init__(self, network: INetwork, router: KadmeliaPeerRouter = None) -> None:
self._network = network self._network = network
self._router = router self._router = router
self.peerstore = self._network.peerstore self.peerstore = self._network.peerstore
def get_id(self): def get_id(self) -> ID:
""" """
:return: peer_id of host :return: peer_id of host
""" """
return self._network.get_peer_id() return self._network.get_peer_id()
def get_network(self): def get_network(self) -> INetwork:
""" """
:return: network instance of host :return: network instance of host
""" """
return self._network return self._network
def get_peerstore(self): def get_peerstore(self) -> IPeerStore:
""" """
:return: peerstore of the host (same one as in its network instance) :return: peerstore of the host (same one as in its network instance)
""" """
return self.peerstore return self.peerstore
def get_mux(self): # FIXME: Replace with correct return type
def get_mux(self) -> Any:
""" """
:return: mux instance of host :return: mux instance of host
""" """
def get_addrs(self): def get_addrs(self) -> List[multiaddr.Multiaddr]:
""" """
:return: all the multiaddr addresses this host is listening too :return: all the multiaddr addresses this host is listening too
""" """
p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty())) p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty()))
addrs = [] addrs: List[multiaddr.Multiaddr] = []
for transport in self._network.listeners.values(): for transport in self._network.listeners.values():
for addr in transport.get_addrs(): for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part)) addrs.append(addr.encapsulate(p2p_part))
return addrs return addrs
def set_stream_handler(self, protocol_id, stream_handler): def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
""" """
set stream handler for host set stream handler for host
:param protocol_id: protocol id used on stream :param protocol_id: protocol id used on stream
@ -62,16 +86,15 @@ class BasicHost(IHost):
# protocol_id can be a list of protocol_ids # protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
async def new_stream(self, peer_id, protocol_ids): async def new_stream(self, peer_id: ID, protocol_ids: Sequence[str]) -> INetStream:
""" """
:param peer_id: peer_id that host is connecting :param peer_id: peer_id that host is connecting
:param protocol_id: protocol id that stream runs on :param protocol_id: protocol id that stream runs on
:return: true if successful :return: stream: new stream created
""" """
stream = await self._network.new_stream(peer_id, protocol_ids) return await self._network.new_stream(peer_id, protocol_ids)
return stream
async def connect(self, peer_info): async def connect(self, peer_info: PeerInfo) -> None:
""" """
connect ensures there is a connection between this host and the peer with connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal

View File

@ -1,34 +1,53 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import (
Any,
Awaitable,
Callable,
List,
Sequence,
)
import multiaddr
from libp2p.network.network_interface import INetwork
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from libp2p.network.stream.net_stream_interface import INetStream
StreamHandlerFn = Callable[[INetStream], Awaitable[None]]
class IHost(ABC): class IHost(ABC):
@abstractmethod @abstractmethod
def get_id(self): def get_id(self) -> ID:
""" """
:return: peer_id of host :return: peer_id of host
""" """
@abstractmethod @abstractmethod
def get_network(self): def get_network(self) -> INetwork:
""" """
:return: network instance of host :return: network instance of host
""" """
# FIXME: Replace with correct return type
@abstractmethod @abstractmethod
def get_mux(self): def get_mux(self) -> Any:
""" """
:return: mux instance of host :return: mux instance of host
""" """
@abstractmethod @abstractmethod
def get_addrs(self): def get_addrs(self) -> List[multiaddr.Multiaddr]:
""" """
:return: all the multiaddr addresses this host is listening too :return: all the multiaddr addresses this host is listening too
""" """
@abstractmethod @abstractmethod
def set_stream_handler(self, protocol_id, stream_handler): def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
""" """
set stream handler for host set stream handler for host
:param protocol_id: protocol id used on stream :param protocol_id: protocol id used on stream
@ -39,15 +58,17 @@ class IHost(ABC):
# protocol_id can be a list of protocol_ids # protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
@abstractmethod @abstractmethod
def new_stream(self, peer_id, protocol_ids): async def new_stream(self,
peer_id: ID,
protocol_ids: Sequence[str]) -> INetStream:
""" """
:param peer_id: peer_id that host is connecting :param peer_id: peer_id that host is connecting
:param protocol_ids: protocol ids that stream can run on :param protocol_ids: protocol ids that stream can run on
:return: true if successful :return: stream: new stream created
""" """
@abstractmethod @abstractmethod
def connect(self, peer_info): async def connect(self, peer_info: PeerInfo) -> None:
""" """
connect ensures there is a connection between this host and the peer with connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal

View File

@ -1,8 +1,22 @@
import asyncio
from .raw_connection_interface import IRawConnection from .raw_connection_interface import IRawConnection
class RawConnection(IRawConnection): class RawConnection(IRawConnection):
def __init__(self, ip, port, reader, writer, initiator): conn_ip: str
conn_port: str
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
_next_id: int
initiator: bool
def __init__(self,
ip: str,
port: str,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
initiator: bool) -> None:
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
self.conn_ip = ip self.conn_ip = ip
self.conn_port = port self.conn_port = port
@ -11,12 +25,12 @@ class RawConnection(IRawConnection):
self._next_id = 0 if initiator else 1 self._next_id = 0 if initiator else 1
self.initiator = initiator self.initiator = initiator
async def write(self, data): async def write(self, data: bytes) -> None:
self.writer.write(data) self.writer.write(data)
self.writer.write("\n".encode()) self.writer.write("\n".encode())
await self.writer.drain() await self.writer.drain()
async def read(self): async def read(self) -> bytes:
line = await self.reader.readline() line = await self.reader.readline()
adjusted_line = line.decode().rstrip('\n') adjusted_line = line.decode().rstrip('\n')
@ -24,10 +38,10 @@ class RawConnection(IRawConnection):
# encoding and decoding # encoding and decoding
return adjusted_line.encode() return adjusted_line.encode()
def close(self): def close(self) -> None:
self.writer.close() self.writer.close()
def next_stream_id(self): def next_stream_id(self) -> int:
""" """
Get next available stream id Get next available stream id
:return: next available stream id for the connection :return: next available stream id for the connection

View File

@ -1,16 +1,45 @@
from abc import ABC, abstractmethod from abc import (
ABC,
abstractmethod,
)
from typing import (
Awaitable,
Callable,
Dict,
Sequence,
TYPE_CHECKING,
)
from multiaddr import Multiaddr
from libp2p.peer.id import ID
from libp2p.peer.peerstore import PeerStore
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
from libp2p.transport.listener_interface import IListener
from .stream.net_stream_interface import INetStream
if TYPE_CHECKING:
from .notifee_interface import INotifee
StreamHandlerFn = Callable[[INetStream], Awaitable[None]]
class INetwork(ABC): class INetwork(ABC):
peerstore: PeerStore
connections: Dict[ID, IMuxedConn]
listeners: Dict[str, IListener]
@abstractmethod @abstractmethod
def get_peer_id(self): def get_peer_id(self) -> ID:
""" """
:return: the peer id :return: the peer id
""" """
@abstractmethod @abstractmethod
def dial_peer(self, peer_id): async def dial_peer(self, peer_id: ID) -> IMuxedConn:
""" """
dial_peer try to create a connection to peer_id dial_peer try to create a connection to peer_id
@ -20,7 +49,7 @@ class INetwork(ABC):
""" """
@abstractmethod @abstractmethod
def set_stream_handler(self, protocol_id, stream_handler): def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
""" """
:param protocol_id: protocol id used on stream :param protocol_id: protocol id used on stream
:param stream_handler: a stream handler instance :param stream_handler: a stream handler instance
@ -28,7 +57,9 @@ class INetwork(ABC):
""" """
@abstractmethod @abstractmethod
def new_stream(self, peer_id, protocol_ids): async def new_stream(self,
peer_id: ID,
protocol_ids: Sequence[str]) -> INetStream:
""" """
:param peer_id: peer_id of destination :param peer_id: peer_id of destination
:param protocol_ids: available protocol ids to use for stream :param protocol_ids: available protocol ids to use for stream
@ -36,14 +67,14 @@ class INetwork(ABC):
""" """
@abstractmethod @abstractmethod
def listen(self, *args): async def listen(self, *args: Sequence[Multiaddr]) -> bool:
""" """
:param *args: one or many multiaddrs to start listening on :param *args: one or many multiaddrs to start listening on
:return: True if at least one success :return: True if at least one success
""" """
@abstractmethod @abstractmethod
def notify(self, notifee): def notify(self, notifee: 'INotifee') -> bool:
""" """
:param notifee: object implementing Notifee interface :param notifee: object implementing Notifee interface
:return: true if notifee registered successfully, false otherwise :return: true if notifee registered successfully, false otherwise

View File

@ -1,44 +1,58 @@
from abc import ABC, abstractmethod from abc import (
ABC,
abstractmethod,
)
from typing import TYPE_CHECKING
from multiaddr import Multiaddr
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
from libp2p.network.stream.net_stream_interface import INetStream
if TYPE_CHECKING:
from .network_interface import INetwork
class INotifee(ABC): class INotifee(ABC):
@abstractmethod @abstractmethod
async def opened_stream(self, network, stream): async def opened_stream(self, network: 'INetwork', stream: INetStream) -> None:
""" """
:param network: network the stream was opened on :param network: network the stream was opened on
:param stream: stream that was opened :param stream: stream that was opened
""" """
@abstractmethod @abstractmethod
async def closed_stream(self, network, stream): async def closed_stream(self, network: 'INetwork', stream: INetStream) -> None:
""" """
:param network: network the stream was closed on :param network: network the stream was closed on
:param stream: stream that was closed :param stream: stream that was closed
""" """
@abstractmethod @abstractmethod
async def connected(self, network, conn): async def connected(self, network: 'INetwork', conn: IMuxedConn) -> None:
""" """
:param network: network the connection was opened on :param network: network the connection was opened on
:param conn: connection that was opened :param conn: connection that was opened
""" """
@abstractmethod @abstractmethod
async def disconnected(self, network, conn): async def disconnected(self, network: 'INetwork', conn: IMuxedConn) -> None:
""" """
:param network: network the connection was closed on :param network: network the connection was closed on
:param conn: connection that was closed :param conn: connection that was closed
""" """
@abstractmethod @abstractmethod
async def listen(self, network, multiaddr): async def listen(self, network: 'INetwork', multiaddr: Multiaddr) -> None:
""" """
:param network: network the listener is listening on :param network: network the listener is listening on
:param multiaddr: multiaddress listener is listening on :param multiaddr: multiaddress listener is listening on
""" """
@abstractmethod @abstractmethod
async def listen_close(self, network, multiaddr): async def listen_close(self, network: 'INetwork', multiaddr: Multiaddr) -> None:
""" """
:param network: network the connection was opened on :param network: network the connection was opened on
:param multiaddr: multiaddress listener is no longer listening on :param multiaddr: multiaddress listener is no longer listening on

View File

@ -1,41 +1,48 @@
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream
from .net_stream_interface import INetStream from .net_stream_interface import INetStream
class NetStream(INetStream): class NetStream(INetStream):
def __init__(self, muxed_stream): muxed_stream: IMuxedStream
mplex_conn: IMuxedConn
protocol_id: str
def __init__(self, muxed_stream: IMuxedStream) -> None:
self.muxed_stream = muxed_stream self.muxed_stream = muxed_stream
self.mplex_conn = muxed_stream.mplex_conn self.mplex_conn = muxed_stream.mplex_conn
self.protocol_id = None self.protocol_id = None
def get_protocol(self): def get_protocol(self) -> str:
""" """
:return: protocol id that stream runs on :return: protocol id that stream runs on
""" """
return self.protocol_id return self.protocol_id
def set_protocol(self, protocol_id): def set_protocol(self, protocol_id: str) -> None:
""" """
:param protocol_id: protocol id that stream runs on :param protocol_id: protocol id that stream runs on
:return: true if successful :return: true if successful
""" """
self.protocol_id = protocol_id self.protocol_id = protocol_id
async def read(self): async def read(self) -> bytes:
""" """
read from stream read from stream
:return: bytes of input until EOF :return: bytes of input until EOF
""" """
return await self.muxed_stream.read() return await self.muxed_stream.read()
async def write(self, data): async def write(self, data: bytes) -> int:
""" """
write to stream write to stream
:return: number of bytes written :return: number of bytes written
""" """
return await self.muxed_stream.write(data) return await self.muxed_stream.write(data)
async def close(self): async def close(self) -> bool:
""" """
close stream close stream
:return: true if successful :return: true if successful

View File

@ -1,37 +1,41 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
class INetStream(ABC): class INetStream(ABC):
mplex_conn: IMuxedConn
@abstractmethod @abstractmethod
def get_protocol(self): def get_protocol(self) -> str:
""" """
:return: protocol id that stream runs on :return: protocol id that stream runs on
""" """
@abstractmethod @abstractmethod
def set_protocol(self, protocol_id): def set_protocol(self, protocol_id: str) -> bool:
""" """
:param protocol_id: protocol id that stream runs on :param protocol_id: protocol id that stream runs on
:return: true if successful :return: true if successful
""" """
@abstractmethod @abstractmethod
def read(self): async def read(self) -> bytes:
""" """
reads from the underlying muxed_stream reads from the underlying muxed_stream
:return: bytes of input :return: bytes of input
""" """
@abstractmethod @abstractmethod
def write(self, _bytes): async def write(self, data: bytes) -> int:
""" """
write to the underlying muxed_stream write to the underlying muxed_stream
:return: number of bytes written :return: number of bytes written
""" """
@abstractmethod @abstractmethod
def close(self): async def close(self) -> bool:
""" """
close the underlying muxed stream close the underlying muxed stream
:return: true if successful :return: true if successful

View File

@ -1,18 +1,62 @@
import asyncio import asyncio
from typing import (
Awaitable,
Callable,
Dict,
List,
Sequence,
)
from libp2p.protocol_muxer.multiselect_client import MultiselectClient from multiaddr import Multiaddr
from libp2p.peer.id import (
ID,
id_b58_decode,
)
from libp2p.peer.peerstore import PeerStore
from libp2p.protocol_muxer.multiselect import Multiselect from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.peer.id import id_b58_decode from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.routing.interfaces import IPeerRouting
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream
from libp2p.transport.upgrader import TransportUpgrader
from libp2p.transport.transport_interface import ITransport
from libp2p.transport.listener_interface import IListener
from .network_interface import INetwork from .network_interface import INetwork
from .notifee_interface import INotifee from .notifee_interface import INotifee
from .stream.net_stream import NetStream
from .connection.raw_connection import RawConnection from .connection.raw_connection import RawConnection
from .stream.net_stream import NetStream
from .stream.net_stream_interface import INetStream
StreamHandlerFn = Callable[[INetStream], Awaitable[None]]
class Swarm(INetwork): class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes,cell-var-from-loop,too-many-arguments # pylint: disable=too-many-instance-attributes,cell-var-from-loop,too-many-arguments
def __init__(self, peer_id, peerstore, upgrader, transport, router): self_id: ID
peerstore: PeerStore
upgrader: TransportUpgrader
transport: ITransport
router: IPeerRouting
connections: Dict[ID, IMuxedConn]
listeners: Dict[str, IListener]
stream_handlers: Dict[INetStream, Callable[[INetStream], None]]
multiselect: Multiselect
multiselect_client: MultiselectClient
notifees: List[INotifee]
def __init__(self,
peer_id: ID,
peerstore: PeerStore,
upgrader: TransportUpgrader,
transport: ITransport,
router: IPeerRouting):
self.self_id = peer_id self.self_id = peer_id
self.peerstore = peerstore self.peerstore = peerstore
self.upgrader = upgrader self.upgrader = upgrader
@ -32,10 +76,10 @@ class Swarm(INetwork):
# Create generic protocol handler # Create generic protocol handler
self.generic_protocol_handler = create_generic_protocol_handler(self) self.generic_protocol_handler = create_generic_protocol_handler(self)
def get_peer_id(self): def get_peer_id(self) -> ID:
return self.self_id return self.self_id
def set_stream_handler(self, protocol_id, stream_handler): def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
""" """
:param protocol_id: protocol id used on stream :param protocol_id: protocol id used on stream
:param stream_handler: a stream handler instance :param stream_handler: a stream handler instance
@ -44,7 +88,7 @@ class Swarm(INetwork):
self.multiselect.add_handler(protocol_id, stream_handler) self.multiselect.add_handler(protocol_id, stream_handler)
return True return True
async def dial_peer(self, peer_id): async def dial_peer(self, peer_id: ID) -> IMuxedConn:
""" """
dial_peer try to create a connection to peer_id dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial :param peer_id: peer if we want to dial
@ -87,7 +131,7 @@ class Swarm(INetwork):
return muxed_conn return muxed_conn
async def new_stream(self, peer_id, protocol_ids): async def new_stream(self, peer_id: ID, protocol_ids: Sequence[str]) -> NetStream:
""" """
:param peer_id: peer_id of destination :param peer_id: peer_id of destination
:param protocol_id: protocol id :param protocol_id: protocol id
@ -109,7 +153,10 @@ class Swarm(INetwork):
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
# Perform protocol muxing to determine protocol to use # Perform protocol muxing to determine protocol to use
selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream) selected_protocol = await self.multiselect_client.select_one_of(
list(protocol_ids),
muxed_stream,
)
# Create a net stream with the selected protocol # Create a net stream with the selected protocol
net_stream = NetStream(muxed_stream) net_stream = NetStream(muxed_stream)
@ -121,7 +168,7 @@ class Swarm(INetwork):
return net_stream return net_stream
async def listen(self, *args): async def listen(self, *args: Sequence[Multiaddr]) -> bool:
""" """
:param *args: one or many multiaddrs to start listening on :param *args: one or many multiaddrs to start listening on
:return: true if at least one success :return: true if at least one success
@ -139,7 +186,8 @@ class Swarm(INetwork):
if str(multiaddr) in self.listeners: if str(multiaddr) in self.listeners:
return True return True
async def conn_handler(reader, writer): async def conn_handler(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
# Read in first message (should be peer_id of initiator) and ack # Read in first message (should be peer_id of initiator) and ack
peer_id = id_b58_decode((await reader.read(1024)).decode()) peer_id = id_b58_decode((await reader.read(1024)).decode())
@ -182,7 +230,7 @@ class Swarm(INetwork):
# No multiaddr succeeded # No multiaddr succeeded
return False return False
def notify(self, notifee): def notify(self, notifee: INotifee) -> bool:
""" """
:param notifee: object implementing Notifee interface :param notifee: object implementing Notifee interface
:return: true if notifee registered successfully, false otherwise :return: true if notifee registered successfully, false otherwise
@ -192,7 +240,7 @@ class Swarm(INetwork):
return True return True
return False return False
def add_router(self, router): def add_router(self, router: IPeerRouting) -> None:
self.router = router self.router = router
# TODO: `tear_down` # TODO: `tear_down`
@ -204,7 +252,10 @@ class Swarm(INetwork):
# TODO: `disconnect`? # TODO: `disconnect`?
def create_generic_protocol_handler(swarm): GenericProtocolHandlerFn = Callable[[IMuxedStream], Awaitable[None]]
def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn:
""" """
Create a generic protocol handler from the given swarm. We use swarm Create a generic protocol handler from the given swarm. We use swarm
to extract the multiselect module so that generic_protocol_handler to extract the multiselect module so that generic_protocol_handler
@ -213,7 +264,7 @@ def create_generic_protocol_handler(swarm):
""" """
multiselect = swarm.multiselect multiselect = swarm.multiselect
async def generic_protocol_handler(muxed_stream): async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None:
# Perform protocol muxing to determine protocol to use # Perform protocol muxing to determine protocol to use
protocol, handler = await multiselect.negotiate(muxed_stream) protocol, handler = await multiselect.negotiate(muxed_stream)
@ -229,5 +280,6 @@ def create_generic_protocol_handler(swarm):
return generic_protocol_handler return generic_protocol_handler
class SwarmException(Exception): class SwarmException(Exception):
pass pass

View File

@ -1,13 +1,22 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import (
List,
Sequence,
)
from multiaddr import Multiaddr
from .id import ID
class IAddrBook(ABC): class IAddrBook(ABC):
def __init__(self): def __init__(self) -> None:
pass pass
@abstractmethod @abstractmethod
def add_addr(self, peer_id, addr, ttl): def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None:
""" """
Calls add_addrs(peer_id, [addr], ttl) Calls add_addrs(peer_id, [addr], ttl)
:param peer_id: the peer to add address for :param peer_id: the peer to add address for
@ -16,7 +25,7 @@ class IAddrBook(ABC):
""" """
@abstractmethod @abstractmethod
def add_addrs(self, peer_id, addrs, ttl): def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None:
""" """
Adds addresses for a given peer all with the same time-to-live. If one of the Adds addresses for a given peer all with the same time-to-live. If one of the
addresses already exists for the peer and has a longer TTL, no operation should take place. addresses already exists for the peer and has a longer TTL, no operation should take place.
@ -27,21 +36,21 @@ class IAddrBook(ABC):
""" """
@abstractmethod @abstractmethod
def addrs(self, peer_id): def addrs(self, peer_id: ID) -> List[Multiaddr]:
""" """
:param peer_id: peer to get addresses of :param peer_id: peer to get addresses of
:return: all known (and valid) addresses for the given peer :return: all known (and valid) addresses for the given peer
""" """
@abstractmethod @abstractmethod
def clear_addrs(self, peer_id): def clear_addrs(self, peer_id: ID) -> None:
""" """
Removes all previously stored addresses Removes all previously stored addresses
:param peer_id: peer to remove addresses of :param peer_id: peer to remove addresses of
""" """
@abstractmethod @abstractmethod
def peers_with_addrs(self): def peers_with_addrs(self) -> List[ID]:
""" """
:return: all of the peer IDs stored with addresses :return: all of the peer IDs stored with addresses
""" """

View File

@ -1,7 +1,14 @@
import hashlib import hashlib
from typing import (
Union,
)
import base58 import base58
import multihash import multihash
from Crypto.PublicKey.RSA import RsaKey
# MaxInlineKeyLength is the maximum length a key can be for it to be inlined in # MaxInlineKeyLength is the maximum length a key can be for it to be inlined in
# the peer ID. # the peer ID.
# * When `len(pubKey.Bytes()) <= MaxInlineKeyLength`, the peer ID is the # * When `len(pubKey.Bytes()) <= MaxInlineKeyLength`, the peer ID is the
@ -13,67 +20,71 @@ MAX_INLINE_KEY_LENGTH = 42
class ID: class ID:
def __init__(self, id_str): _id_str: bytes
def __init__(self, id_str: bytes) -> None:
self._id_str = id_str self._id_str = id_str
def to_bytes(self) -> bytes: def to_bytes(self) -> bytes:
return self._id_str return self._id_str
def get_raw_id(self): def get_raw_id(self) -> bytes:
return self._id_str return self._id_str
def pretty(self): def pretty(self) -> str:
return base58.b58encode(self._id_str).decode() return base58.b58encode(self._id_str).decode()
def get_xor_id(self): def get_xor_id(self) -> int:
return int(digest(self.get_raw_id()).hex(), 16) return int(digest(self.get_raw_id()).hex(), 16)
def __str__(self): def __str__(self) -> str:
pid = self.pretty() pid = self.pretty()
return pid return pid
__repr__ = __str__ __repr__ = __str__
def __eq__(self, other): def __eq__(self, other: object) -> bool:
#pylint: disable=protected-access #pylint: disable=protected-access
if not isinstance(other, ID):
return NotImplemented
return self._id_str == other._id_str return self._id_str == other._id_str
def __hash__(self): def __hash__(self) -> int:
return hash(self._id_str) return hash(self._id_str)
def id_b58_encode(peer_id): def id_b58_encode(peer_id: ID) -> str:
""" """
return a b58-encoded string return a b58-encoded string
""" """
#pylint: disable=protected-access #pylint: disable=protected-access
return base58.b58encode(peer_id._id_str).decode() return base58.b58encode(peer_id.get_raw_id()).decode()
def id_b58_decode(peer_id_str): def id_b58_decode(peer_id_str: str) -> ID:
""" """
return a base58-decoded peer ID return a base58-decoded peer ID
""" """
return ID(base58.b58decode(peer_id_str)) return ID(base58.b58decode(peer_id_str))
def id_from_public_key(key): def id_from_public_key(key: RsaKey) -> ID:
# export into binary format # export into binary format
key_bin = key.exportKey("DER") key_bin = key.exportKey("DER")
algo = multihash.Func.sha2_256 algo: int = multihash.Func.sha2_256
# TODO: seems identity is not yet supported in pymultihash # TODO: seems identity is not yet supported in pymultihash
# if len(b) <= MAX_INLINE_KEY_LENGTH: # if len(b) <= MAX_INLINE_KEY_LENGTH:
# algo multihash.func.identity # algo multihash.func.identity
mh_digest = multihash.digest(key_bin, algo) mh_digest: multihash.Multihash = multihash.digest(key_bin, algo)
return ID(mh_digest.encode()) return ID(mh_digest.encode())
def id_from_private_key(key): def id_from_private_key(key: RsaKey) -> ID:
return id_from_public_key(key.publickey()) return id_from_public_key(key.publickey())
def digest(string): def digest(data: Union[str, bytes]) -> bytes:
if not isinstance(string, bytes): if isinstance(data, str):
string = str(string).encode('utf8') data = data.encode('utf8')
return hashlib.sha1(string).digest() return hashlib.sha1(data).digest()

View File

@ -1,35 +1,48 @@
from typing import (
Any,
Dict,
List,
Sequence,
)
from multiaddr import Multiaddr
from .peerdata_interface import IPeerData from .peerdata_interface import IPeerData
class PeerData(IPeerData): class PeerData(IPeerData):
def __init__(self): metadata: Dict[Any, Any]
protocols: List[str]
addrs: List[Multiaddr]
def __init__(self) -> None:
self.metadata = {} self.metadata = {}
self.protocols = [] self.protocols = []
self.addrs = [] self.addrs = []
def get_protocols(self): def get_protocols(self) -> List[str]:
return self.protocols return self.protocols
def add_protocols(self, protocols): def add_protocols(self, protocols: Sequence[str]) -> None:
self.protocols.extend(protocols) self.protocols.extend(list(protocols))
def set_protocols(self, protocols): def set_protocols(self, protocols: Sequence[str]) -> None:
self.protocols = protocols self.protocols = list(protocols)
def add_addrs(self, addrs): def add_addrs(self, addrs: Sequence[Multiaddr]) -> None:
self.addrs.extend(addrs) self.addrs.extend(addrs)
def get_addrs(self): def get_addrs(self) -> List[Multiaddr]:
return self.addrs return self.addrs
def clear_addrs(self): def clear_addrs(self) -> None:
self.addrs = [] self.addrs = []
def put_metadata(self, key, val): def put_metadata(self, key: str, val: Any) -> None:
self.metadata[key] = val self.metadata[key] = val
def get_metadata(self, key): def get_metadata(self, key: str) -> Any:
if key in self.metadata: if key in self.metadata:
return self.metadata[key] return self.metadata[key]
raise PeerDataError("key not found") raise PeerDataError("key not found")

View File

@ -1,46 +1,55 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import (
Any,
List,
Sequence,
)
from multiaddr import Multiaddr
from .peermetadata_interface import IPeerMetadata
class IPeerData(ABC): class IPeerData(ABC):
@abstractmethod @abstractmethod
def get_protocols(self): def get_protocols(self) -> List[str]:
""" """
:return: all protocols associated with given peer :return: all protocols associated with given peer
""" """
@abstractmethod @abstractmethod
def add_protocols(self, protocols): def add_protocols(self, protocols: Sequence[str]) -> None:
""" """
:param protocols: protocols to add :param protocols: protocols to add
""" """
@abstractmethod @abstractmethod
def set_protocols(self, protocols): def set_protocols(self, protocols: Sequence[str]) -> None:
""" """
:param protocols: protocols to add :param protocols: protocols to add
""" """
@abstractmethod @abstractmethod
def add_addrs(self, addrs): def add_addrs(self, addrs: Sequence[Multiaddr]) -> None:
""" """
:param addrs: multiaddresses to add :param addrs: multiaddresses to add
""" """
@abstractmethod @abstractmethod
def get_addrs(self): def get_addrs(self) -> List[Multiaddr]:
""" """
:return: all multiaddresses :return: all multiaddresses
""" """
@abstractmethod @abstractmethod
def clear_addrs(self): def clear_addrs(self) -> None:
""" """
Clear all addresses Clear all addresses
""" """
@abstractmethod @abstractmethod
def put_metadata(self, key, val): def put_metadata(self, key: str, val: Any) -> None:
""" """
:param key: key in KV pair :param key: key in KV pair
:param val: val to associate with key :param val: val to associate with key
@ -48,7 +57,7 @@ class IPeerData(ABC):
""" """
@abstractmethod @abstractmethod
def get_metadata(self, key): def get_metadata(self, key: str) -> IPeerMetadata:
""" """
:param key: key in KV pair :param key: key in KV pair
:return: val for key :return: val for key

View File

@ -1,12 +1,23 @@
from typing import (
List,
)
import multiaddr import multiaddr
from .id import id_b58_decode from .id import (
ID,
id_b58_decode,
)
from .peerdata import PeerData from .peerdata import PeerData
class PeerInfo: class PeerInfo:
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, peer_id, peer_data=None):
peer_id: ID
addrs: List[multiaddr.Multiaddr]
def __init__(self, peer_id: ID, peer_data: PeerData = None) -> None:
self.peer_id = peer_id self.peer_id = peer_id
self.addrs = peer_data.get_addrs() if peer_data else None self.addrs = peer_data.get_addrs() if peer_data else None
@ -30,16 +41,16 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
) )
# make sure the /p2p value parses as a peer.ID # make sure the /p2p value parses as a peer.ID
peer_id_str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P) peer_id_str: str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P)
peer_id = id_b58_decode(peer_id_str) peer_id: ID = id_b58_decode(peer_id_str)
# we might have received just an / p2p part, which means there's no addr. # we might have received just an / p2p part, which means there's no addr.
if len(parts) > 1: if len(parts) > 1:
addr = multiaddr.Multiaddr.join(*parts[:-1]) addr = multiaddr.Multiaddr.join(*parts[:-1])
peer_data = PeerData() peer_data = PeerData()
peer_data.addrs = [addr] peer_data.add_addrs([addr])
peer_data.protocols = [p.code for p in addr.protocols()] peer_data.set_protocols([p.code for p in addr.protocols()])
return PeerInfo(peer_id, peer_data) return PeerInfo(peer_id, peer_data)

View File

@ -1,13 +1,20 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import (
Any,
)
from .id import (
ID,
)
class IPeerMetadata(ABC): class IPeerMetadata(ABC):
def __init__(self): def __init__(self) -> None:
pass pass
@abstractmethod @abstractmethod
def get(self, peer_id, key): def get(self, peer_id: ID, key: str) -> Any:
""" """
:param peer_id: peer ID to lookup key for :param peer_id: peer ID to lookup key for
:param key: key to look up :param key: key to look up
@ -16,7 +23,7 @@ class IPeerMetadata(ABC):
""" """
@abstractmethod @abstractmethod
def put(self, peer_id, key, val): def put(self, peer_id: ID, key: str, val: Any) -> None:
""" """
:param peer_id: peer ID to lookup key for :param peer_id: peer ID to lookup key for
:param key: key to associate with peer :param key: key to associate with peer

View File

@ -1,15 +1,28 @@
from .peerstore_interface import IPeerStore from typing import (
Any,
Dict,
List,
Optional,
Sequence,
)
from multiaddr import Multiaddr
from .id import ID
from .peerdata import PeerData from .peerdata import PeerData
from .peerinfo import PeerInfo from .peerinfo import PeerInfo
from .peerstore_interface import IPeerStore
class PeerStore(IPeerStore): class PeerStore(IPeerStore):
def __init__(self): peer_map: Dict[ID, PeerData]
def __init__(self) -> None:
IPeerStore.__init__(self) IPeerStore.__init__(self)
self.peer_map = {} self.peer_map = {}
def __create_or_get_peer(self, peer_id): def __create_or_get_peer(self, peer_id: ID) -> PeerData:
""" """
Returns the peer data for peer_id or creates a new Returns the peer data for peer_id or creates a new
peer data (and stores it in peer_map) if peer peer data (and stores it in peer_map) if peer
@ -23,65 +36,65 @@ class PeerStore(IPeerStore):
self.peer_map[peer_id] = data self.peer_map[peer_id] = data
return self.peer_map[peer_id] return self.peer_map[peer_id]
def peer_info(self, peer_id): def peer_info(self, peer_id: ID) -> Optional[PeerInfo]:
if peer_id in self.peer_map: if peer_id in self.peer_map:
peer = self.peer_map[peer_id] peer_data = self.peer_map[peer_id]
return PeerInfo(peer_id, peer) return PeerInfo(peer_id, peer_data)
return None return None
def get_protocols(self, peer_id): def get_protocols(self, peer_id: ID) -> List[str]:
if peer_id in self.peer_map: if peer_id in self.peer_map:
return self.peer_map[peer_id].get_protocols() return self.peer_map[peer_id].get_protocols()
raise PeerStoreError("peer ID not found") raise PeerStoreError("peer ID not found")
def add_protocols(self, peer_id, protocols): def add_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None:
peer = self.__create_or_get_peer(peer_id) peer = self.__create_or_get_peer(peer_id)
peer.add_protocols(protocols) peer.add_protocols(list(protocols))
def set_protocols(self, peer_id, protocols): def set_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None:
peer = self.__create_or_get_peer(peer_id) peer = self.__create_or_get_peer(peer_id)
peer.set_protocols(protocols) peer.set_protocols(list(protocols))
def peers(self): def peer_ids(self) -> List[ID]:
return list(self.peer_map.keys()) return list(self.peer_map.keys())
def get(self, peer_id, key): def get(self, peer_id: ID, key: str) -> Any:
if peer_id in self.peer_map: if peer_id in self.peer_map:
val = self.peer_map[peer_id].get_metadata(key) val = self.peer_map[peer_id].get_metadata(key)
return val return val
raise PeerStoreError("peer ID not found") raise PeerStoreError("peer ID not found")
def put(self, peer_id, key, val): def put(self, peer_id: ID, key: str, val: Any) -> None:
# <<?>> # <<?>>
# This can output an error, not sure what the possible errors are # This can output an error, not sure what the possible errors are
peer = self.__create_or_get_peer(peer_id) peer = self.__create_or_get_peer(peer_id)
peer.put_metadata(key, val) peer.put_metadata(key, val)
def add_addr(self, peer_id, addr, ttl): def add_addr(self, peer_id: ID, addr: Multiaddr, ttl: int) -> None:
self.add_addrs(peer_id, [addr], ttl) self.add_addrs(peer_id, [addr], ttl)
def add_addrs(self, peer_id, addrs, ttl): def add_addrs(self, peer_id: ID, addrs: Sequence[Multiaddr], ttl: int) -> None:
# Ignore ttl for now # Ignore ttl for now
peer = self.__create_or_get_peer(peer_id) peer = self.__create_or_get_peer(peer_id)
peer.add_addrs(addrs) peer.add_addrs(list(addrs))
def addrs(self, peer_id): def addrs(self, peer_id: ID) -> List[Multiaddr]:
if peer_id in self.peer_map: if peer_id in self.peer_map:
return self.peer_map[peer_id].get_addrs() return self.peer_map[peer_id].get_addrs()
raise PeerStoreError("peer ID not found") raise PeerStoreError("peer ID not found")
def clear_addrs(self, peer_id): def clear_addrs(self, peer_id: ID) -> None:
# Only clear addresses if the peer is in peer map # Only clear addresses if the peer is in peer map
if peer_id in self.peer_map: if peer_id in self.peer_map:
self.peer_map[peer_id].clear_addrs() self.peer_map[peer_id].clear_addrs()
def peers_with_addrs(self): def peers_with_addrs(self) -> List[ID]:
# Add all peers with addrs at least 1 to output # Add all peers with addrs at least 1 to output
output = [] output: List[ID] = []
for key in self.peer_map: for peer_id in self.peer_map:
if len(self.peer_map[key].get_addrs()) >= 1: if len(self.peer_map[peer_id].get_addrs()) >= 1:
output.append(key) output.append(peer_id)
return output return output

View File

@ -1,24 +1,31 @@
from abc import abstractmethod from abc import abstractmethod
from typing import (
List,
Sequence,
)
from .addrbook_interface import IAddrBook from .addrbook_interface import IAddrBook
from .id import ID
from .peerinfo import PeerInfo
from .peermetadata_interface import IPeerMetadata from .peermetadata_interface import IPeerMetadata
class IPeerStore(IAddrBook, IPeerMetadata): class IPeerStore(IAddrBook, IPeerMetadata):
def __init__(self): def __init__(self) -> None:
IPeerMetadata.__init__(self) IPeerMetadata.__init__(self)
IAddrBook.__init__(self) IAddrBook.__init__(self)
@abstractmethod @abstractmethod
def peer_info(self, peer_id): def peer_info(self, peer_id: ID) -> PeerInfo:
""" """
:param peer_id: peer ID to get info for :param peer_id: peer ID to get info for
:return: peer info object :return: peer info object
""" """
@abstractmethod @abstractmethod
def get_protocols(self, peer_id): def get_protocols(self, peer_id: ID) -> List[str]:
""" """
:param peer_id: peer ID to get protocols for :param peer_id: peer ID to get protocols for
:return: protocols (as strings) :return: protocols (as strings)
@ -26,7 +33,7 @@ class IPeerStore(IAddrBook, IPeerMetadata):
""" """
@abstractmethod @abstractmethod
def add_protocols(self, peer_id, protocols): def add_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None:
""" """
:param peer_id: peer ID to add protocols for :param peer_id: peer ID to add protocols for
:param protocols: protocols to add :param protocols: protocols to add
@ -34,7 +41,7 @@ class IPeerStore(IAddrBook, IPeerMetadata):
""" """
@abstractmethod @abstractmethod
def set_protocols(self, peer_id, protocols): def set_protocols(self, peer_id: ID, protocols: Sequence[str]) -> None:
""" """
:param peer_id: peer ID to set protocols for :param peer_id: peer ID to set protocols for
:param protocols: protocols to set :param protocols: protocols to set
@ -42,7 +49,7 @@ class IPeerStore(IAddrBook, IPeerMetadata):
""" """
@abstractmethod @abstractmethod
def peers(self): def peer_ids(self) -> List[ID]:
""" """
:return: all of the peer IDs stored in peer store :return: all of the peer IDs stored in peer store
""" """

View File

@ -1,5 +1,7 @@
from typing import ( from typing import (
Iterable, Iterable,
List,
Sequence,
) )
from libp2p.peer.id import ( from libp2p.peer.id import (
@ -8,23 +10,28 @@ from libp2p.peer.id import (
) )
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub import Pubsub
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
class FloodSub(IPubsubRouter): class FloodSub(IPubsubRouter):
# pylint: disable=no-member # pylint: disable=no-member
def __init__(self, protocols): protocols: List[str]
self.protocols = protocols
pubsub: Pubsub
def __init__(self, protocols: Sequence[str]) -> None:
self.protocols = list(protocols)
self.pubsub = None self.pubsub = None
def get_protocols(self): def get_protocols(self) -> List[str]:
""" """
:return: the list of protocols supported by the router :return: the list of protocols supported by the router
""" """
return self.protocols return self.protocols
def attach(self, pubsub): def attach(self, pubsub: Pubsub) -> None:
""" """
Attach is invoked by the PubSub constructor to attach the router to a Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance. freshly initialized PubSub instance.
@ -32,19 +39,19 @@ class FloodSub(IPubsubRouter):
""" """
self.pubsub = pubsub self.pubsub = pubsub
def add_peer(self, peer_id, protocol_id): def add_peer(self, peer_id: ID, protocol_id: str) -> None:
""" """
Notifies the router that a new peer has been connected Notifies the router that a new peer has been connected
:param peer_id: id of peer to add :param peer_id: id of peer to add
""" """
def remove_peer(self, peer_id): def remove_peer(self, peer_id: ID) -> None:
""" """
Notifies the router that a peer has been disconnected Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
async def handle_rpc(self, rpc, sender_peer_id): async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -80,7 +87,7 @@ class FloodSub(IPubsubRouter):
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
await stream.write(rpc_msg.SerializeToString()) await stream.write(rpc_msg.SerializeToString())
async def join(self, topic): async def join(self, topic: str) -> None:
""" """
Join notifies the router that we want to receive and Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the forward messages in a topic. It is invoked after the
@ -88,7 +95,7 @@ class FloodSub(IPubsubRouter):
:param topic: topic to join :param topic: topic to join
""" """
async def leave(self, topic): async def leave(self, topic: str) -> None:
""" """
Leave notifies the router that we are no longer interested in a topic. Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.

View File

@ -1,14 +1,15 @@
from ast import literal_eval
import asyncio import asyncio
import random import random
from typing import ( from typing import (
Any,
Dict,
Iterable, Iterable,
List, List,
MutableSet, Set,
Sequence, Sequence,
) )
from ast import literal_eval
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
id_b58_decode, id_b58_decode,
@ -16,6 +17,7 @@ from libp2p.peer.id import (
from .mcache import MessageCache from .mcache import MessageCache
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub import Pubsub
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
@ -24,10 +26,45 @@ class GossipSub(IPubsubRouter):
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods # pylint: disable=too-many-public-methods
def __init__(self, protocols, degree, degree_low, degree_high, time_to_live, gossip_window=3, protocols: List[str]
gossip_history=5, heartbeat_interval=120): pubsub: Pubsub
degree: int
degree_high: int
degree_low: int
time_to_live: int
# FIXME: Should be changed to `Dict[str, List[ID]]`
mesh: Dict[str, List[str]]
# FIXME: Should be changed to `Dict[str, List[ID]]`
fanout: Dict[str, List[str]]
# FIXME: Should be changed to `Dict[ID, str]`
peers_to_protocol: Dict[str, str]
time_since_last_publish: Dict[str, int]
#FIXME: Should be changed to List[ID]
peers_gossipsub: List[str]
#FIXME: Should be changed to List[ID]
peers_floodsub: List[str]
mcache: MessageCache
heartbeat_interval: int
def __init__(self,
protocols: Sequence[str],
degree: int,
degree_low: int,
degree_high: int,
time_to_live: int,
gossip_window: int = 3,
gossip_history: int = 5,
heartbeat_interval: int = 120) -> None:
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
self.protocols = protocols self.protocols = list(protocols)
self.pubsub = None self.pubsub = None
# Store target degree, upper degree bound, and lower degree bound # Store target degree, upper degree bound, and lower degree bound
@ -42,6 +79,9 @@ class GossipSub(IPubsubRouter):
self.mesh = {} self.mesh = {}
self.fanout = {} self.fanout = {}
# Create peer --> protocol mapping
self.peers_to_protocol = {}
# Create topic --> time since last publish map # Create topic --> time since last publish map
self.time_since_last_publish = {} self.time_since_last_publish = {}
@ -56,13 +96,13 @@ class GossipSub(IPubsubRouter):
# Interface functions # Interface functions
def get_protocols(self): def get_protocols(self) -> List[str]:
""" """
:return: the list of protocols supported by the router :return: the list of protocols supported by the router
""" """
return self.protocols return self.protocols
def attach(self, pubsub): def attach(self, pubsub: Pubsub) -> None:
""" """
Attach is invoked by the PubSub constructor to attach the router to a Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance. freshly initialized PubSub instance.
@ -74,50 +114,60 @@ class GossipSub(IPubsubRouter):
# TODO: Start after delay # TODO: Start after delay
asyncio.ensure_future(self.heartbeat()) asyncio.ensure_future(self.heartbeat())
def add_peer(self, peer_id, protocol_id): def add_peer(self, peer_id: ID, protocol_id: str) -> None:
""" """
Notifies the router that a new peer has been connected Notifies the router that a new peer has been connected
:param peer_id: id of peer to add :param peer_id: id of peer to add
:param protocol_id: router protocol the peer speaks, e.g., floodsub, gossipsub
""" """
# Add peer to the correct peer list # Add peer to the correct peer list
peer_type = GossipSub.get_peer_type(protocol_id) peer_type = GossipSub.get_peer_type(protocol_id)
peer_id_str = str(peer_id) peer_id_str = str(peer_id)
self.peers_to_protocol[peer_id_str] = protocol_id
if peer_type == "gossip": if peer_type == "gossip":
self.peers_gossipsub.append(peer_id_str) self.peers_gossipsub.append(peer_id_str)
elif peer_type == "flood": elif peer_type == "flood":
self.peers_floodsub.append(peer_id_str) self.peers_floodsub.append(peer_id_str)
def remove_peer(self, peer_id): def remove_peer(self, peer_id: ID) -> None:
""" """
Notifies the router that a peer has been disconnected Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
peer_id_str = str(peer_id) peer_id_str = str(peer_id)
self.peers_to_protocol.remove(peer_id_str) del self.peers_to_protocol[peer_id_str]
async def handle_rpc(self, rpc, sender_peer_id): if peer_id_str in self.peers_gossipsub:
self.peers_gossipsub.remove(peer_id_str)
if peer_id_str in self.peers_gossipsub:
self.peers_floodsub.remove(peer_id_str)
async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: ID) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
:param rpc: rpc message :param rpc: RPC message
:param sender_peer_id: id of the peer who sent the message
""" """
control_message = rpc.control control_message = rpc.control
sender_peer_id = str(sender_peer_id) sender_peer_id_str = str(sender_peer_id)
# Relay each rpc control to the appropriate handler # Relay each rpc control message to the appropriate handler
if control_message.ihave: if control_message.ihave:
for ihave in control_message.ihave: for ihave in control_message.ihave:
await self.handle_ihave(ihave, sender_peer_id) await self.handle_ihave(ihave, sender_peer_id_str)
if control_message.iwant: if control_message.iwant:
for iwant in control_message.iwant: for iwant in control_message.iwant:
await self.handle_iwant(iwant, sender_peer_id) await self.handle_iwant(iwant, sender_peer_id_str)
if control_message.graft: if control_message.graft:
for graft in control_message.graft: for graft in control_message.graft:
await self.handle_graft(graft, sender_peer_id) await self.handle_graft(graft, sender_peer_id_str)
if control_message.prune: if control_message.prune:
for prune in control_message.prune: for prune in control_message.prune:
await self.handle_prune(prune, sender_peer_id) await self.handle_prune(prune, sender_peer_id_str)
async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@ -152,7 +202,8 @@ class GossipSub(IPubsubRouter):
:param origin: peer id of the peer the message originate from. :param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to. :return: a generator of the peer ids who we send data to.
""" """
send_to: MutableSet[ID] = set() # pylint: disable=len-as-condition
send_to: Set[ID] = set()
for topic in topic_ids: for topic in topic_ids:
if topic not in self.pubsub.peer_topics: if topic not in self.pubsub.peer_topics:
continue continue
@ -177,7 +228,6 @@ class GossipSub(IPubsubRouter):
# I assume there could be short periods between heartbeats where topic may not # I assume there could be short periods between heartbeats where topic may not
# be but we should check that this path gets hit appropriately # be but we should check that this path gets hit appropriately
# pylint: disable=len-as-condition
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
# If no peers in fanout, choose some peers from gossipsub peers in topic. # If no peers in fanout, choose some peers from gossipsub peers in topic.
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
@ -191,7 +241,7 @@ class GossipSub(IPubsubRouter):
# Excludes `msg_forwarder` and `origin` # Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin]) yield from send_to.difference([msg_forwarder, origin])
async def join(self, topic): async def join(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec # Note: the comments here are the near-exact algorithm description from the spec
""" """
Join notifies the router that we want to receive and Join notifies the router that we want to receive and
@ -204,8 +254,9 @@ class GossipSub(IPubsubRouter):
# Create mesh[topic] if it does not yet exist # Create mesh[topic] if it does not yet exist
self.mesh[topic] = [] self.mesh[topic] = []
topic_in_fanout = topic in self.fanout topic_in_fanout: bool = topic in self.fanout
fanout_peers = self.fanout[topic] if topic_in_fanout else [] # FIXME: Should be changed to `List[ID]`
fanout_peers: List[str] = self.fanout[topic] if topic_in_fanout else []
fanout_size = len(fanout_peers) fanout_size = len(fanout_peers)
if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree): if not topic_in_fanout or (topic_in_fanout and fanout_size < self.degree):
# There are less than D peers (let this number be x) # There are less than D peers (let this number be x)
@ -229,7 +280,7 @@ class GossipSub(IPubsubRouter):
if topic_in_fanout: if topic_in_fanout:
del self.fanout[topic] del self.fanout[topic]
async def leave(self, topic): async def leave(self, topic: str) -> None:
# Note: the comments here are the near-exact algorithm description from the spec # Note: the comments here are the near-exact algorithm description from the spec
""" """
Leave notifies the router that we are no longer interested in a topic. Leave notifies the router that we are no longer interested in a topic.
@ -247,7 +298,7 @@ class GossipSub(IPubsubRouter):
# Interface Helper Functions # Interface Helper Functions
@staticmethod @staticmethod
def get_peer_type(protocol_id): def get_peer_type(protocol_id: str) -> str:
# TODO: Do this in a better, more efficient way # TODO: Do this in a better, more efficient way
if "gossipsub" in protocol_id: if "gossipsub" in protocol_id:
return "gossip" return "gossip"
@ -255,7 +306,13 @@ class GossipSub(IPubsubRouter):
return "flood" return "flood"
return "unknown" return "unknown"
async def deliver_messages_to_peers(self, peers, msg_sender, origin_id, serialized_packet): # FIXME: type of `peers` should be changed to `List[ID]`
# FIXME: type of `msg_sender` and `origin_id` should be changed to `ID`
async def deliver_messages_to_peers(self,
peers: List[str],
msg_sender: str,
origin_id: str,
serialized_packet: bytes) -> None:
for peer_id_in_topic in peers: for peer_id_in_topic in peers:
# Forward to all peers that are not the # Forward to all peers that are not the
# message sender and are not the message origin # message sender and are not the message origin
@ -267,7 +324,7 @@ class GossipSub(IPubsubRouter):
await stream.write(serialized_packet) await stream.write(serialized_packet)
# Heartbeat # Heartbeat
async def heartbeat(self): async def heartbeat(self) -> None:
""" """
Call individual heartbeats. Call individual heartbeats.
Note: the heartbeats are called with awaits because each heartbeat depends on the Note: the heartbeats are called with awaits because each heartbeat depends on the
@ -281,7 +338,7 @@ class GossipSub(IPubsubRouter):
await asyncio.sleep(self.heartbeat_interval) await asyncio.sleep(self.heartbeat_interval)
async def mesh_heartbeat(self): async def mesh_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec # Note: the comments here are the exact pseudocode from the spec
for topic in self.mesh: for topic in self.mesh:
# Skip if no peers have subscribed to the topic # Skip if no peers have subscribed to the topic
@ -297,7 +354,8 @@ class GossipSub(IPubsubRouter):
self.mesh[topic], self.mesh[topic],
) )
fanout_peers_not_in_mesh = [ # FIXME: Should be changed to `List[ID]`
fanout_peers_not_in_mesh: List[str] = [
peer peer
for peer in selected_peers for peer in selected_peers
if peer not in self.mesh[topic] if peer not in self.mesh[topic]
@ -311,8 +369,12 @@ class GossipSub(IPubsubRouter):
if num_mesh_peers_in_topic > self.degree_high: if num_mesh_peers_in_topic > self.degree_high:
# Select |mesh[topic]| - D peers from mesh[topic] # Select |mesh[topic]| - D peers from mesh[topic]
selected_peers = GossipSub.select_from_minus(num_mesh_peers_in_topic - self.degree, # FIXME: Should be changed to `List[ID]`
self.mesh[topic], []) selected_peers = GossipSub.select_from_minus(
num_mesh_peers_in_topic - self.degree,
self.mesh[topic],
[],
)
for peer in selected_peers: for peer in selected_peers:
# Remove peer from mesh[topic] # Remove peer from mesh[topic]
self.mesh[topic].remove(peer) self.mesh[topic].remove(peer)
@ -320,7 +382,7 @@ class GossipSub(IPubsubRouter):
# Emit PRUNE(topic) control message to peer # Emit PRUNE(topic) control message to peer
await self.emit_prune(topic, peer) await self.emit_prune(topic, peer)
async def fanout_heartbeat(self): async def fanout_heartbeat(self) -> None:
# Note: the comments here are the exact pseudocode from the spec # Note: the comments here are the exact pseudocode from the spec
for topic in self.fanout: for topic in self.fanout:
# If time since last published > ttl # If time since last published > ttl
@ -328,7 +390,7 @@ class GossipSub(IPubsubRouter):
if self.time_since_last_publish[topic] > self.time_to_live: if self.time_since_last_publish[topic] > self.time_to_live:
# Remove topic from fanout # Remove topic from fanout
del self.fanout[topic] del self.fanout[topic]
self.time_since_last_publish.remove(topic) del self.time_since_last_publish[topic]
else: else:
num_fanout_peers_in_topic = len(self.fanout[topic]) num_fanout_peers_in_topic = len(self.fanout[topic])
@ -343,7 +405,7 @@ class GossipSub(IPubsubRouter):
# Add the peers to fanout[topic] # Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers) self.fanout[topic].extend(selected_peers)
async def gossip_heartbeat(self): async def gossip_heartbeat(self) -> None:
# pylint: disable=too-many-nested-blocks # pylint: disable=too-many-nested-blocks
for topic in self.mesh: for topic in self.mesh:
msg_ids = self.mcache.window(topic) msg_ids = self.mcache.window(topic)
@ -362,8 +424,8 @@ class GossipSub(IPubsubRouter):
# TODO: this line is a monster, can hopefully be simplified # TODO: this line is a monster, can hopefully be simplified
if (topic not in self.mesh or (peer not in self.mesh[topic]))\ if (topic not in self.mesh or (peer not in self.mesh[topic]))\
and (topic not in self.fanout or (peer not in self.fanout[topic])): and (topic not in self.fanout or (peer not in self.fanout[topic])):
msg_ids = [str(msg) for msg in msg_ids] msg_id_strs = [str(msg_id) for msg_id in msg_ids]
await self.emit_ihave(topic, msg_ids, peer) await self.emit_ihave(topic, msg_id_strs, peer)
# TODO: Refactor and Dedup. This section is the roughly the same as the above. # TODO: Refactor and Dedup. This section is the roughly the same as the above.
# Do the same for fanout, for all topics not already hit in mesh # Do the same for fanout, for all topics not already hit in mesh
@ -383,13 +445,15 @@ class GossipSub(IPubsubRouter):
for peer in peers_to_emit_ihave_to: for peer in peers_to_emit_ihave_to:
if peer not in self.mesh[topic] and peer not in self.fanout[topic]: if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
msg_ids = [str(msg) for msg in msg_ids] msg_id_strs = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_ids, peer) await self.emit_ihave(topic, msg_id_strs, peer)
self.mcache.shift() self.mcache.shift()
@staticmethod @staticmethod
def select_from_minus(num_to_select, pool, minus): def select_from_minus(num_to_select: int,
pool: Sequence[Any],
minus: Sequence[Any]) -> List[Any]:
""" """
Select at most num_to_select subset of elements from the set (pool - minus) randomly. Select at most num_to_select subset of elements from the set (pool - minus) randomly.
:param num_to_select: number of elements to randomly select :param num_to_select: number of elements to randomly select
@ -400,10 +464,10 @@ class GossipSub(IPubsubRouter):
# Create selection pool, which is selection_pool = pool - minus # Create selection pool, which is selection_pool = pool - minus
if minus: if minus:
# Create a new selection pool by removing elements of minus # Create a new selection pool by removing elements of minus
selection_pool = [x for x in pool if x not in minus] selection_pool: List[Any] = [x for x in pool if x not in minus]
else: else:
# Don't create a new selection_pool if we are not subbing anything # Don't create a new selection_pool if we are not subbing anything
selection_pool = pool selection_pool = list(pool)
# If num_to_select > size(selection_pool), then return selection_pool (which has the most # If num_to_select > size(selection_pool), then return selection_pool (which has the most
# possible elements s.t. the number of elements is less than num_to_select) # possible elements s.t. the number of elements is less than num_to_select)
@ -411,15 +475,17 @@ class GossipSub(IPubsubRouter):
return selection_pool return selection_pool
# Random selection # Random selection
selection = random.sample(selection_pool, num_to_select) selection: List[Any] = random.sample(selection_pool, num_to_select)
return selection return selection
# FIXME: type of `minus` should be changed to type `Sequence[ID]`
# FIXME: return type should be changed to type `List[ID]`
def _get_in_topic_gossipsub_peers_from_minus( def _get_in_topic_gossipsub_peers_from_minus(
self, self,
topic: str, topic: str,
num_to_select: int, num_to_select: int,
minus: Sequence[ID]) -> List[ID]: minus: Sequence[str]) -> List[str]:
gossipsub_peers_in_topic = [ gossipsub_peers_in_topic = [
peer_str peer_str
for peer_str in self.pubsub.peer_topics[topic] for peer_str in self.pubsub.peer_topics[topic]
@ -433,7 +499,7 @@ class GossipSub(IPubsubRouter):
# RPC handlers # RPC handlers
async def handle_ihave(self, ihave_msg, sender_peer_id): async def handle_ihave(self, ihave_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
""" """
Checks the seen set and requests unknown messages with an IWANT message. Checks the seen set and requests unknown messages with an IWANT message.
""" """
@ -442,29 +508,37 @@ class GossipSub(IPubsubRouter):
from_id_str = sender_peer_id from_id_str = sender_peer_id
# Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache # Get list of all seen (seqnos, from) from the (seqno, from) tuples in seen_messages cache
seen_seqnos_and_peers = [seqno_and_from seen_seqnos_and_peers = [
for seqno_and_from in self.pubsub.seen_messages.keys()] seqno_and_from
for seqno_and_from in self.pubsub.seen_messages.keys()
]
# Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list # Add all unknown message ids (ids that appear in ihave_msg but not in seen_seqnos) to list
# of messages we want to request # of messages we want to request
msg_ids_wanted = [msg_id for msg_id in ihave_msg.messageIDs # FIXME: Update type of message ID
if literal_eval(msg_id) not in seen_seqnos_and_peers] msg_ids_wanted: List[Any] = [
msg_id
for msg_id in ihave_msg.messageIDs
if literal_eval(msg_id) not in seen_seqnos_and_peers
]
# Request messages with IWANT message # Request messages with IWANT message
if msg_ids_wanted: if msg_ids_wanted:
await self.emit_iwant(msg_ids_wanted, from_id_str) await self.emit_iwant(msg_ids_wanted, from_id_str)
async def handle_iwant(self, iwant_msg, sender_peer_id): async def handle_iwant(self, iwant_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
""" """
Forwards all request messages that are present in mcache to the requesting peer. Forwards all request messages that are present in mcache to the requesting peer.
""" """
from_id_str = sender_peer_id from_id_str = sender_peer_id
msg_ids = [literal_eval(msg) for msg in iwant_msg.messageIDs] # FIXME: Update type of message ID
msgs_to_forward = [] # FIXME: Find a better way to parse the msg ids
msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
msgs_to_forward: List[rpc_pb2.Message] = []
for msg_id_iwant in msg_ids: for msg_id_iwant in msg_ids:
# Check if the wanted message ID is present in mcache # Check if the wanted message ID is present in mcache
msg = self.mcache.get(msg_id_iwant) msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant)
# Cache hit # Cache hit
if msg: if msg:
@ -476,12 +550,12 @@ class GossipSub(IPubsubRouter):
# because then the message will forwarded to peers in the topics contained in the messages. # because then the message will forwarded to peers in the topics contained in the messages.
# We should # We should
# 1) Package these messages into a single packet # 1) Package these messages into a single packet
packet = rpc_pb2.RPC() packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.publish.extend(msgs_to_forward) packet.publish.extend(msgs_to_forward)
# 2) Serialize that packet # 2) Serialize that packet
rpc_msg = packet.SerializeToString() rpc_msg: bytes = packet.SerializeToString()
# 3) Get the stream to this peer # 3) Get the stream to this peer
# TODO: Should we pass in from_id or from_id_str here? # TODO: Should we pass in from_id or from_id_str here?
@ -490,8 +564,8 @@ class GossipSub(IPubsubRouter):
# 4) And write the packet to the stream # 4) And write the packet to the stream
await peer_stream.write(rpc_msg) await peer_stream.write(rpc_msg)
async def handle_graft(self, graft_msg, sender_peer_id): async def handle_graft(self, graft_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
topic = graft_msg.topicID topic: str = graft_msg.topicID
from_id_str = sender_peer_id from_id_str = sender_peer_id
@ -503,8 +577,8 @@ class GossipSub(IPubsubRouter):
# Respond with PRUNE if not subscribed to the topic # Respond with PRUNE if not subscribed to the topic
await self.emit_prune(topic, sender_peer_id) await self.emit_prune(topic, sender_peer_id)
async def handle_prune(self, prune_msg, sender_peer_id): async def handle_prune(self, prune_msg: rpc_pb2.Message, sender_peer_id: str) -> None:
topic = prune_msg.topicID topic: str = prune_msg.topicID
from_id_str = sender_peer_id from_id_str = sender_peer_id
@ -514,65 +588,65 @@ class GossipSub(IPubsubRouter):
# RPC emitters # RPC emitters
async def emit_ihave(self, topic, msg_ids, to_peer): async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: str) -> None:
""" """
Emit ihave message, sent to to_peer, for topic and msg_ids Emit ihave message, sent to to_peer, for topic and msg_ids
""" """
ihave_msg = rpc_pb2.ControlIHave() ihave_msg: rpc_pb2.ControlIHave = rpc_pb2.ControlIHave()
ihave_msg.messageIDs.extend(msg_ids) ihave_msg.messageIDs.extend(msg_ids)
ihave_msg.topicID = topic ihave_msg.topicID = topic
control_msg = rpc_pb2.ControlMessage() control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.ihave.extend([ihave_msg]) control_msg.ihave.extend([ihave_msg])
await self.emit_control_message(control_msg, to_peer) await self.emit_control_message(control_msg, to_peer)
async def emit_iwant(self, msg_ids, to_peer): async def emit_iwant(self, msg_ids: Any, to_peer: str) -> None:
""" """
Emit iwant message, sent to to_peer, for msg_ids Emit iwant message, sent to to_peer, for msg_ids
""" """
iwant_msg = rpc_pb2.ControlIWant() iwant_msg: rpc_pb2.ControlIWant = rpc_pb2.ControlIWant()
iwant_msg.messageIDs.extend(msg_ids) iwant_msg.messageIDs.extend(msg_ids)
control_msg = rpc_pb2.ControlMessage() control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.iwant.extend([iwant_msg]) control_msg.iwant.extend([iwant_msg])
await self.emit_control_message(control_msg, to_peer) await self.emit_control_message(control_msg, to_peer)
async def emit_graft(self, topic, to_peer): async def emit_graft(self, topic: str, to_peer: str) -> None:
""" """
Emit graft message, sent to to_peer, for topic Emit graft message, sent to to_peer, for topic
""" """
graft_msg = rpc_pb2.ControlGraft() graft_msg: rpc_pb2.ControlGraft = rpc_pb2.ControlGraft()
graft_msg.topicID = topic graft_msg.topicID = topic
control_msg = rpc_pb2.ControlMessage() control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.graft.extend([graft_msg]) control_msg.graft.extend([graft_msg])
await self.emit_control_message(control_msg, to_peer) await self.emit_control_message(control_msg, to_peer)
async def emit_prune(self, topic, to_peer): async def emit_prune(self, topic: str, to_peer: str) -> None:
""" """
Emit graft message, sent to to_peer, for topic Emit graft message, sent to to_peer, for topic
""" """
prune_msg = rpc_pb2.ControlPrune() prune_msg: rpc_pb2.ControlPrune = rpc_pb2.ControlPrune()
prune_msg.topicID = topic prune_msg.topicID = topic
control_msg = rpc_pb2.ControlMessage() control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg]) control_msg.prune.extend([prune_msg])
await self.emit_control_message(control_msg, to_peer) await self.emit_control_message(control_msg, to_peer)
async def emit_control_message(self, control_msg, to_peer): async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: str) -> None:
# Add control message to packet # Add control message to packet
packet = rpc_pb2.RPC() packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.control.CopyFrom(control_msg) packet.control.CopyFrom(control_msg)
rpc_msg = packet.SerializeToString() rpc_msg: bytes = packet.SerializeToString()
# Get stream for peer from pubsub # Get stream for peer from pubsub
peer_stream = self.pubsub.peers[to_peer] peer_stream = self.pubsub.peers[to_peer]

View File

@ -1,20 +1,44 @@
class MessageCache: from typing import (
Dict,
List,
Optional,
Sequence,
Tuple,
)
from .pb import rpc_pb2
class CacheEntry: class CacheEntry:
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
mid: Tuple[bytes, bytes]
topics: List[str]
""" """
A logical representation of an entry in the mcache's _history_. A logical representation of an entry in the mcache's _history_.
""" """
def __init__(self, mid, topics): def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None:
""" """
Constructor. Constructor.
:param mid: (seqno, from_id) of the msg :param mid: (seqno, from_id) of the msg
:param topics: list of topics this message was sent on :param topics: list of topics this message was sent on
""" """
self.mid = mid self.mid = mid
self.topics = topics self.topics = list(topics)
def __init__(self, window_size, history_size):
class MessageCache:
window_size: int
history_size: int
msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message]
history: List[List[CacheEntry]]
def __init__(self, window_size: int, history_size: int) -> None:
""" """
Constructor. Constructor.
:param window_size: Size of the window desired. :param window_size: Size of the window desired.
@ -29,25 +53,22 @@ class MessageCache:
# max length of history_size. each item is a list of CacheEntry. # max length of history_size. each item is a list of CacheEntry.
# messages lost upon shift(). # messages lost upon shift().
self.history = [] self.history = [
[]
for _ in range(history_size)
]
for _ in range(history_size): def put(self, msg: rpc_pb2.Message) -> None:
self.history.append([])
def put(self, msg):
""" """
Put a message into the mcache. Put a message into the mcache.
:param msg: The rpc message to put in. Should contain seqno and from_id :param msg: The rpc message to put in. Should contain seqno and from_id
""" """
mid = (msg.seqno, msg.from_id) mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id)
self.msgs[mid] = msg self.msgs[mid] = msg
if not self.history[0]: self.history[0].append(CacheEntry(mid, msg.topicIDs))
self.history[0] = []
self.history[0].append(self.CacheEntry(mid, msg.topicIDs)) def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]:
def get(self, mid):
""" """
Get a message from the mcache. Get a message from the mcache.
:param mid: (seqno, from_id) of the message to get. :param mid: (seqno, from_id) of the message to get.
@ -58,13 +79,13 @@ class MessageCache:
return None return None
def window(self, topic): def window(self, topic: str) -> List[Tuple[bytes, bytes]]:
""" """
Get the window for this topic. Get the window for this topic.
:param topic: Topic whose message ids we desire. :param topic: Topic whose message ids we desire.
:return: List of mids in the current window. :return: List of mids in the current window.
""" """
mids = [] mids: List[Tuple[bytes, bytes]] = []
for entries_list in self.history[: self.window_size]: for entries_list in self.history[: self.window_size]:
for entry in entries_list: for entry in entries_list:
@ -74,16 +95,16 @@ class MessageCache:
return mids return mids
def shift(self): def shift(self) -> None:
""" """
Shift the window over by 1 position, dropping the last element of the history. Shift the window over by 1 position, dropping the last element of the history.
""" """
last_entries = self.history[len(self.history) - 1] last_entries: List[CacheEntry] = self.history[len(self.history) - 1]
for entry in last_entries: for entry in last_entries:
del self.msgs[entry.mid] del self.msgs[entry.mid]
i = len(self.history) - 2 i: int = len(self.history) - 2
while i >= 0: while i >= 0:
self.history[i + 1] = self.history[i] self.history[i + 1] = self.history[i]

View File

@ -5,27 +5,22 @@ from typing import (
Any, Any,
Dict, Dict,
List, List,
Sequence,
Tuple, Tuple,
TYPE_CHECKING,
) )
from lru import LRU from lru import LRU
from libp2p.host.host_interface import ( from libp2p.host.host_interface import IHost
IHost, from libp2p.peer.id import ID
)
from libp2p.peer.id import ( from libp2p.network.stream.net_stream_interface import INetStream
ID,
)
from libp2p.network.stream.net_stream_interface import (
INetStream,
)
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .pubsub_router_interface import (
IPubsubRouter, if TYPE_CHECKING:
) from .pubsub_router_interface import IPubsubRouter
def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]:
@ -34,28 +29,35 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]:
class Pubsub: class Pubsub:
# pylint: disable=too-many-instance-attributes, no-member # pylint: disable=too-many-instance-attributes, no-member, unsubscriptable-object
host: IHost host: IHost
my_id: ID my_id: ID
router: IPubsubRouter
peer_queue: asyncio.Queue router: 'IPubsubRouter'
protocols: Sequence[str]
incoming_msgs_from_peers: asyncio.Queue() peer_queue: 'asyncio.Queue[ID]'
outgoing_messages: asyncio.Queue()
protocols: List[str]
incoming_msgs_from_peers: 'asyncio.Queue[rpc_pb2.Message]'
outgoing_messages: 'asyncio.Queue[rpc_pb2.Message]'
seen_messages: LRU seen_messages: LRU
my_topics: Dict[str, asyncio.Queue]
my_topics: Dict[str, 'asyncio.Queue[rpc_pb2.Message]']
# FIXME: Should be changed to `Dict[str, List[ID]]` # FIXME: Should be changed to `Dict[str, List[ID]]`
peer_topics: Dict[str, List[str]] peer_topics: Dict[str, List[str]]
# FIXME: Should be changed to `Dict[ID, INetStream]` # FIXME: Should be changed to `Dict[ID, INetStream]`
peers: Dict[str, INetStream] peers: Dict[str, INetStream]
# NOTE: Be sure it is increased atomically everytime. # NOTE: Be sure it is increased atomically everytime.
counter: int # uint64 counter: int # uint64
def __init__( def __init__(self,
self,
host: IHost, host: IHost,
router: IPubsubRouter, router: 'IPubsubRouter',
my_id: ID, my_id: ID,
cache_size: int = None) -> None: cache_size: int = None) -> None:
""" """
@ -99,9 +101,11 @@ class Pubsub:
self.my_topics = {} self.my_topics = {}
# Map of topic to peers to keep track of what peers are subscribed to # Map of topic to peers to keep track of what peers are subscribed to
# FIXME: Should be changed to `Dict[str, ID]`
self.peer_topics = {} self.peer_topics = {}
# Create peers map, which maps peer_id (as string) to stream (to a given peer) # Create peers map, which maps peer_id (as string) to stream (to a given peer)
# FIXME: Should be changed to `Dict[ID, INetStream]`
self.peers = {} self.peers = {}
self.counter = time.time_ns() self.counter = time.time_ns()
@ -114,7 +118,7 @@ class Pubsub:
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics only send hello packet if we have subscribed topics
""" """
packet = rpc_pb2.RPC() packet: rpc_pb2.RPC = rpc_pb2.RPC()
if self.my_topics: if self.my_topics:
for topic_id in self.my_topics: for topic_id in self.my_topics:
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
@ -131,8 +135,8 @@ class Pubsub:
peer_id = stream.mplex_conn.peer_id peer_id = stream.mplex_conn.peer_id
while True: while True:
incoming = (await stream.read()) incoming: bytes = (await stream.read())
rpc_incoming = rpc_pb2.RPC() rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming) rpc_incoming.ParseFromString(incoming)
if rpc_incoming.publish: if rpc_incoming.publish:
@ -168,12 +172,12 @@ class Pubsub:
""" """
# Add peer # Add peer
# Map peer to stream # Map peer to stream
peer_id = stream.mplex_conn.peer_id peer_id: ID = stream.mplex_conn.peer_id
self.peers[str(peer_id)] = stream self.peers[str(peer_id)] = stream
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello: bytes = self.get_hello_packet()
await stream.write(hello) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
@ -188,12 +192,12 @@ class Pubsub:
""" """
while True: while True:
peer_id = await self.peer_queue.get() peer_id: ID = await self.peer_queue.get()
# Open a stream to peer on existing connection # Open a stream to peer on existing connection
# (we know connection exists since that's the only way # (we know connection exists since that's the only way
# an element gets added to peer_queue) # an element gets added to peer_queue)
stream = await self.host.new_stream(peer_id, self.protocols) stream: INetStream = await self.host.new_stream(peer_id, self.protocols)
# Add Peer # Add Peer
# Map peer to stream # Map peer to stream
@ -201,7 +205,7 @@ class Pubsub:
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello: bytes = self.get_hello_packet()
await stream.write(hello) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
@ -219,24 +223,24 @@ class Pubsub:
:param origin_id: id of the peer who subscribe to the message :param origin_id: id of the peer who subscribe to the message
:param sub_message: RPC.SubOpts :param sub_message: RPC.SubOpts
""" """
origin_id = str(origin_id) origin_id_str = str(origin_id)
if sub_message.subscribe: if sub_message.subscribe:
if sub_message.topicid not in self.peer_topics: if sub_message.topicid not in self.peer_topics:
self.peer_topics[sub_message.topicid] = [origin_id] self.peer_topics[sub_message.topicid] = [origin_id_str]
elif origin_id not in self.peer_topics[sub_message.topicid]: elif origin_id_str not in self.peer_topics[sub_message.topicid]:
# Add peer to topic # Add peer to topic
self.peer_topics[sub_message.topicid].append(origin_id) self.peer_topics[sub_message.topicid].append(origin_id_str)
else: else:
if sub_message.topicid in self.peer_topics: if sub_message.topicid in self.peer_topics:
if origin_id in self.peer_topics[sub_message.topicid]: if origin_id_str in self.peer_topics[sub_message.topicid]:
self.peer_topics[sub_message.topicid].remove(origin_id) self.peer_topics[sub_message.topicid].remove(origin_id_str)
# FIXME(mhchia): Change the function name? # FIXME(mhchia): Change the function name?
# FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf # FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf
async def handle_talk(self, publish_message: Any) -> None: async def handle_talk(self, publish_message: Any) -> None:
""" """
Put incoming message from a peer onto my blocking queue Put incoming message from a peer onto my blocking queue
:param talk: RPC.Message format :param publish_message: RPC.Message format
""" """
# Check if this message has any topics that we are subscribed to # Check if this message has any topics that we are subscribed to
@ -247,7 +251,7 @@ class Pubsub:
# for each topic # for each topic
await self.my_topics[topic].put(publish_message) await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id: str) -> asyncio.Queue: async def subscribe(self, topic_id: str) -> 'asyncio.Queue[rpc_pb2.Message]':
""" """
Subscribe ourself to a topic Subscribe ourself to a topic
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
@ -261,7 +265,7 @@ class Pubsub:
self.my_topics[topic_id] = asyncio.Queue() self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message # Create subscribe message
packet = rpc_pb2.RPC() packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe=True, subscribe=True,
topicid=topic_id.encode('utf-8') topicid=topic_id.encode('utf-8')
@ -289,7 +293,7 @@ class Pubsub:
del self.my_topics[topic_id] del self.my_topics[topic_id]
# Create unsubscribe message # Create unsubscribe message
packet = rpc_pb2.RPC() packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe=False, subscribe=False,
topicid=topic_id.encode('utf-8') topicid=topic_id.encode('utf-8')
@ -301,8 +305,7 @@ class Pubsub:
# Tell router we are leaving this topic # Tell router we are leaving this topic
await self.router.leave(topic_id) await self.router.leave(topic_id)
# FIXME: `rpc_msg` can be further type hinted with mypy_protobuf async def message_all_peers(self, raw_msg: bytes) -> None:
async def message_all_peers(self, rpc_msg: Any) -> None:
""" """
Broadcast a message to peers Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast :param raw_msg: raw contents of the message to broadcast
@ -311,7 +314,7 @@ class Pubsub:
# Broadcast message # Broadcast message
for _, stream in self.peers.items(): for _, stream in self.peers.items():
# Write message to stream # Write message to stream
await stream.write(rpc_msg) await stream.write(raw_msg)
async def publish(self, topic_id: str, data: bytes) -> None: async def publish(self, topic_id: str, data: bytes) -> None:
""" """
@ -370,6 +373,6 @@ class Pubsub:
self.seen_messages[msg_id] = 1 self.seen_messages[msg_id] = 1
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
if len(self.my_topics) == 0: if not self.my_topics:
return False return False
return all([topic in self.my_topics for topic in msg.topicIDs]) return all([topic in self.my_topics for topic in msg.topicIDs])

View File

@ -1,23 +1,39 @@
from typing import (
TYPE_CHECKING,
)
from multiaddr import Multiaddr
from libp2p.network.network_interface import INetwork
from libp2p.network.notifee_interface import INotifee from libp2p.network.notifee_interface import INotifee
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
from libp2p.network.stream.net_stream_interface import INetStream
if TYPE_CHECKING:
import asyncio
from libp2p.peer.id import ID
class PubsubNotifee(INotifee): class PubsubNotifee(INotifee):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop # pylint: disable=too-many-instance-attributes, cell-var-from-loop, unsubscriptable-object
def __init__(self, initiator_peers_queue): initiator_peers_queue: 'asyncio.Queue[ID]'
def __init__(self, initiator_peers_queue: 'asyncio.Queue[ID]') -> None:
""" """
:param initiator_peers_queue: queue to add new peers to so that pubsub :param initiator_peers_queue: queue to add new peers to so that pubsub
can process new peers after we connect to them can process new peers after we connect to them
""" """
self.initiator_peers_queue = initiator_peers_queue self.initiator_peers_queue = initiator_peers_queue
async def opened_stream(self, network, stream): async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
pass pass
async def closed_stream(self, network, stream): async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
pass pass
async def connected(self, network, conn): async def connected(self, network: INetwork, conn: IMuxedConn) -> None:
""" """
Add peer_id to initiator_peers_queue, so that this peer_id can be used to Add peer_id to initiator_peers_queue, so that this peer_id can be used to
create a stream and we only want to have one pubsub stream with each peer. create a stream and we only want to have one pubsub stream with each peer.
@ -30,11 +46,11 @@ class PubsubNotifee(INotifee):
if conn.initiator: if conn.initiator:
await self.initiator_peers_queue.put(conn.peer_id) await self.initiator_peers_queue.put(conn.peer_id)
async def disconnected(self, network, conn): async def disconnected(self, network: INetwork, conn: IMuxedConn) -> None:
pass pass
async def listen(self, network, multiaddr): async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:
pass pass
async def listen_close(self, network, multiaddr): async def listen_close(self, network: INetwork, multiaddr: Multiaddr) -> None:
pass pass

View File

@ -1,15 +1,26 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import (
List,
TYPE_CHECKING,
)
from libp2p.peer.id import ID
from .pb import rpc_pb2
if TYPE_CHECKING:
from .pubsub import Pubsub
class IPubsubRouter(ABC): class IPubsubRouter(ABC):
@abstractmethod @abstractmethod
def get_protocols(self): def get_protocols(self) -> List[str]:
""" """
:return: the list of protocols supported by the router :return: the list of protocols supported by the router
""" """
@abstractmethod @abstractmethod
def attach(self, pubsub): def attach(self, pubsub: 'Pubsub') -> None:
""" """
Attach is invoked by the PubSub constructor to attach the router to a Attach is invoked by the PubSub constructor to attach the router to a
freshly initialized PubSub instance. freshly initialized PubSub instance.
@ -17,21 +28,21 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def add_peer(self, peer_id, protocol_id): def add_peer(self, peer_id: ID, protocol_id: str) -> None:
""" """
Notifies the router that a new peer has been connected Notifies the router that a new peer has been connected
:param peer_id: id of peer to add :param peer_id: id of peer to add
""" """
@abstractmethod @abstractmethod
def remove_peer(self, peer_id): def remove_peer(self, peer_id: ID) -> None:
""" """
Notifies the router that a peer has been disconnected Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
@abstractmethod @abstractmethod
def handle_rpc(self, rpc, sender_peer_id): async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -41,8 +52,9 @@ class IPubsubRouter(ABC):
:param rpc: rpc message :param rpc: rpc message
""" """
# FIXME: Should be changed to type 'peer.ID'
@abstractmethod @abstractmethod
async def publish(self, msg_forwarder, pubsub_msg): async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
""" """
Invoked to forward a new message that has been validated Invoked to forward a new message that has been validated
:param msg_forwarder: peer_id of message sender :param msg_forwarder: peer_id of message sender
@ -50,7 +62,7 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def join(self, topic): async def join(self, topic: str) -> None:
""" """
Join notifies the router that we want to receive and Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the forward messages in a topic. It is invoked after the
@ -59,7 +71,7 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def leave(self, topic): async def leave(self, topic: str) -> None:
""" """
Leave notifies the router that we are no longer interested in a topic. Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.

View File

@ -1,11 +1,18 @@
from abc import ABC, abstractmethod from abc import (
ABC,
abstractmethod,
)
from typing import Iterable
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
class IContentRouting(ABC): class IContentRouting(ABC):
@abstractmethod @abstractmethod
def provide(self, cid, announce=True): def provide(self, cid: bytes, announce: bool = True) -> None:
""" """
Provide adds the given cid to the content routing system. If announce is True, Provide adds the given cid to the content routing system. If announce is True,
it also announces it, otherwise it is just kept in the local it also announces it, otherwise it is just kept in the local
@ -13,7 +20,7 @@ class IContentRouting(ABC):
""" """
@abstractmethod @abstractmethod
def find_provider_iter(self, cid, count): def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]:
""" """
Search for peers who are able to provide a given key Search for peers who are able to provide a given key
returns an iterator of peer.PeerInfo returns an iterator of peer.PeerInfo
@ -23,7 +30,7 @@ class IContentRouting(ABC):
class IPeerRouting(ABC): class IPeerRouting(ABC):
@abstractmethod @abstractmethod
def find_peer(self, peer_id): async def find_peer(self, peer_id: ID) -> PeerInfo:
""" """
Find specific Peer Find specific Peer
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo

View File

@ -1,9 +1,14 @@
from typing import (
Iterable,
)
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.interfaces import IContentRouting from libp2p.routing.interfaces import IContentRouting
class KadmeliaContentRouter(IContentRouting): class KadmeliaContentRouter(IContentRouting):
def provide(self, cid, announce=True): def provide(self, cid: bytes, announce: bool = True) -> None:
""" """
Provide adds the given cid to the content routing system. If announce is True, Provide adds the given cid to the content routing system. If announce is True,
it also announces it, otherwise it is just kept in the local it also announces it, otherwise it is just kept in the local
@ -12,7 +17,7 @@ class KadmeliaContentRouter(IContentRouting):
# the DHT finds the closest peers to `key` using the `FIND_NODE` RPC # the DHT finds the closest peers to `key` using the `FIND_NODE` RPC
# then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers. # then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers.
def find_provider_iter(self, cid, count): def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]:
""" """
Search for peers who are able to provide a given key Search for peers who are able to provide a given key
returns an iterator of peer.PeerInfo returns an iterator of peer.PeerInfo

View File

@ -1,16 +1,26 @@
import ast import ast
from typing import (
Union,
)
from libp2p.kademlia.kad_peerinfo import (
KadPeerInfo,
create_kad_peerinfo,
)
from libp2p.kademlia.network import KademliaServer
from libp2p.peer.id import ID
from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.interfaces import IPeerRouting
from libp2p.kademlia.kad_peerinfo import create_kad_peerinfo
class KadmeliaPeerRouter(IPeerRouting): class KadmeliaPeerRouter(IPeerRouting):
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, dht_server): server: KademliaServer
def __init__(self, dht_server: KademliaServer) -> None:
self.server = dht_server self.server = dht_server
async def find_peer(self, peer_id): async def find_peer(self, peer_id: ID) -> KadPeerInfo:
""" """
Find a specific peer Find a specific peer
:param peer_id: peer to search for :param peer_id: peer to search for
@ -21,7 +31,7 @@ class KadmeliaPeerRouter(IPeerRouting):
value = await self.server.get(xor_id) value = await self.server.get(xor_id)
return decode_peerinfo(value) return decode_peerinfo(value)
def decode_peerinfo(encoded): def decode_peerinfo(encoded: Union[bytes, str]) -> KadPeerInfo:
if isinstance(encoded, bytes): if isinstance(encoded, bytes):
encoded = encoded.decode() encoded = encoded.decode()
try: try:

View File

@ -1,7 +1,8 @@
import asyncio import asyncio
from libp2p.stream_muxer.muxed_stream_interface import IMuxedStream
from .utils import get_flag from .utils import get_flag
from ..muxed_stream_interface import IMuxedStream
class MplexStream(IMuxedStream): class MplexStream(IMuxedStream):

View File

@ -1,11 +1,16 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from libp2p.peer.id import ID
class IMuxedConn(ABC): class IMuxedConn(ABC):
""" """
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
""" """
initiator: bool
peer_id: ID
@abstractmethod @abstractmethod
def __init__(self, conn, generic_protocol_handler, peer_id): def __init__(self, conn, generic_protocol_handler, peer_id):
""" """

View File

@ -1,8 +1,15 @@
from abc import ABC, abstractmethod from abc import (
ABC,
abstractmethod,
)
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
class IMuxedStream(ABC): class IMuxedStream(ABC):
mplex_conn: IMuxedConn
@abstractmethod @abstractmethod
def read(self): def read(self):
""" """

View File

@ -316,20 +316,20 @@ async def test_host_connect():
transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] transport_opt_list = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]]
(node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list) (node_a, node_b) = await set_up_nodes_by_transport_opt(transport_opt_list)
assert not node_a.get_peerstore().peers() assert not node_a.get_peerstore().peer_ids()
addr = node_b.get_addrs()[0] addr = node_b.get_addrs()[0]
info = info_from_p2p_addr(addr) info = info_from_p2p_addr(addr)
await node_a.connect(info) await node_a.connect(info)
assert len(node_a.get_peerstore().peers()) == 1 assert len(node_a.get_peerstore().peer_ids()) == 1
await node_a.connect(info) await node_a.connect(info)
# make sure we don't do double connection # make sure we don't do double connection
assert len(node_a.get_peerstore().peers()) == 1 assert len(node_a.get_peerstore().peer_ids()) == 1
assert node_b.get_id() in node_a.get_peerstore().peers() assert node_b.get_id() in node_a.get_peerstore().peer_ids()
ma_node_b = multiaddr.Multiaddr('/p2p/%s' % node_b.get_id().pretty()) ma_node_b = multiaddr.Multiaddr('/p2p/%s' % node_b.get_id().pretty())
for addr in node_a.get_peerstore().addrs(node_b.get_id()): for addr in node_a.get_peerstore().addrs(node_b.get_id()):
assert addr.encapsulate(ma_node_b) in node_b.get_addrs() assert addr.encapsulate(ma_node_b) in node_b.get_addrs()

View File

@ -55,4 +55,4 @@ def test_peers():
store.put("peer2", "key", "val") store.put("peer2", "key", "val")
store.add_addr("peer3", "/foo", 10) store.add_addr("peer3", "/foo", 10)
assert set(store.peers()) == set(["peer1", "peer2", "peer3"]) assert set(store.peer_ids()) == set(["peer1", "peer2", "peer3"])

View File

@ -19,5 +19,6 @@ basepython =
basepython = python3 basepython = python3
extras = dev extras = dev
commands = commands =
pylint --rcfile={toxinidir}/.pylintrc libp2p examples tests # TODO: Add the tests/ folder back to pylint
pylint --rcfile={toxinidir}/.pylintrc libp2p examples
mypy -p libp2p -p examples --config-file {toxinidir}/mypy.ini mypy -p libp2p -p examples --config-file {toxinidir}/mypy.ini