Remove kademlia module (#377)

* Remove kademlia and routing/kademlia

* cleanup

* Fix routed_host test

* lint mypy

* fix doc

* remove set_up_nodes_by_transport_and_disc_opt and fix typing
This commit is contained in:
Chih Cheng Liang 2019-12-06 14:14:33 +08:00 committed by GitHub
parent dfdcf524b7
commit 82dcce214a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 44 additions and 1398 deletions

View File

@ -139,7 +139,7 @@ py-libp2p aims for conformity with [the standard libp2p modules](https://github.
| Peer Discovery | Status |
| -------------------------------------------- | :-----------: |
| **`bootstrap list`** | :tomato: |
| **`Kademlia DHT`** | :lemon: |
| **`Kademlia DHT`** | :chestnut: |
| **`mDNS`** | :chestnut: |
| **`PEX`** | :chestnut: |
| **`DNS`** | :chestnut: |
@ -147,7 +147,7 @@ py-libp2p aims for conformity with [the standard libp2p modules](https://github.
| Content Routing | Status |
| -------------------------------------------- | :-----------: |
| **`Kademlia DHT`** | :lemon: |
| **`Kademlia DHT`** | :chestnut: |
| **`floodsub`** | :green_apple: |
| **`gossipsub`** | :green_apple: |
| **`PHT`** | :chestnut: |
@ -155,7 +155,7 @@ py-libp2p aims for conformity with [the standard libp2p modules](https://github.
| Peer Routing | Status |
| -------------------------------------------- | :-----------: |
| **`Kademlia DHT`** | :green_apple: |
| **`Kademlia DHT`** | :chestnut: |
| **`floodsub`** | :green_apple: |
| **`gossipsub`** | :green_apple: |
| **`PHT`** | :chestnut: |

View File

@ -1,70 +0,0 @@
libp2p.kademlia package
=======================
Submodules
----------
libp2p.kademlia.crawling module
-------------------------------
.. automodule:: libp2p.kademlia.crawling
:members:
:undoc-members:
:show-inheritance:
libp2p.kademlia.kad\_peerinfo module
------------------------------------
.. automodule:: libp2p.kademlia.kad_peerinfo
:members:
:undoc-members:
:show-inheritance:
libp2p.kademlia.network module
------------------------------
.. automodule:: libp2p.kademlia.network
:members:
:undoc-members:
:show-inheritance:
libp2p.kademlia.protocol module
-------------------------------
.. automodule:: libp2p.kademlia.protocol
:members:
:undoc-members:
:show-inheritance:
libp2p.kademlia.routing module
------------------------------
.. automodule:: libp2p.kademlia.routing
:members:
:undoc-members:
:show-inheritance:
libp2p.kademlia.storage module
------------------------------
.. automodule:: libp2p.kademlia.storage
:members:
:undoc-members:
:show-inheritance:
libp2p.kademlia.utils module
----------------------------
.. automodule:: libp2p.kademlia.utils
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: libp2p.kademlia
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,30 +0,0 @@
libp2p.routing.kademlia package
===============================
Submodules
----------
libp2p.routing.kademlia.kademlia\_content\_router module
--------------------------------------------------------
.. automodule:: libp2p.routing.kademlia.kademlia_content_router
:members:
:undoc-members:
:show-inheritance:
libp2p.routing.kademlia.kademlia\_peer\_router module
-----------------------------------------------------
.. automodule:: libp2p.routing.kademlia.kademlia_peer_router
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: libp2p.routing.kademlia
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,12 +1,6 @@
libp2p.routing package
======================
Subpackages
-----------
.. toctree::
libp2p.routing.kademlia
Submodules
----------

View File

@ -10,7 +10,6 @@ Subpackages
libp2p.host
libp2p.identity
libp2p.io
libp2p.kademlia
libp2p.network
libp2p.peer
libp2p.protocol_muxer

View File

@ -6,15 +6,12 @@ from libp2p.crypto.rsa import create_new_key_pair
from libp2p.host.basic_host import BasicHost
from libp2p.host.host_interface import IHost
from libp2p.host.routed_host import RoutedHost
from libp2p.kademlia.network import KademliaServer
from libp2p.kademlia.storage import IStorage
from libp2p.network.network_interface import INetwork
from libp2p.network.swarm import Swarm
from libp2p.peer.id import ID
from libp2p.peer.peerstore import PeerStore
from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.routing.interfaces import IPeerRouting
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport
import libp2p.security.secio.transport as secio
from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex
@ -45,31 +42,6 @@ def generate_peer_id_from(key_pair: KeyPair) -> ID:
return ID.from_pubkey(public_key)
def initialize_default_kademlia_router(
ksize: int = 20, alpha: int = 3, id_opt: ID = None, storage: IStorage = None
) -> KadmeliaPeerRouter:
"""
initialize kadmelia router when no kademlia router is passed in.
:param ksize: The k parameter from the paper
:param alpha: The alpha parameter from the paper
:param id_opt: optional id for host
:param storage: An instance that implements
:class:`~kademlia.storage.IStorage`
:return: return a default kademlia instance
"""
if not id_opt:
key_pair = generate_new_rsa_identity()
id_opt = generate_peer_id_from(key_pair)
node_id = id_opt.to_bytes()
# ignore type for Kademlia module
server = KademliaServer( # type: ignore
ksize=ksize, alpha=alpha, node_id=node_id, storage=storage
)
return KadmeliaPeerRouter(server)
def initialize_default_swarm(
key_pair: KeyPair,
id_opt: ID = None,

View File

@ -1,3 +0,0 @@
"""Kademlia is a Python implementation of the Kademlia protocol which utilizes
the asyncio library."""
__version__ = "2.0"

View File

@ -1,173 +0,0 @@
from collections import Counter
import logging
from .kad_peerinfo import KadPeerHeap, create_kad_peerinfo
from .utils import gather_dict
log = logging.getLogger(__name__)
class SpiderCrawl:
"""Crawl the network and look for given 160-bit keys."""
def __init__(self, protocol, node, peers, ksize, alpha):
"""
Create a new C{SpiderCrawl}er.
Args:
protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance.
node: A :class:`~kademlia.node.Node` representing the key we're
looking for
peers: A list of :class:`~kademlia.node.Node` instances that
provide the entry point for the network
ksize: The value for k based on the paper
alpha: The value for alpha based on the paper
"""
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.node = node
self.nearest = KadPeerHeap(self.node, self.ksize)
self.last_ids_crawled = []
log.info("creating spider with peers: %s", peers)
self.nearest.push(peers)
async def _find(self, rpcmethod):
"""
Get either a value or list of nodes.
Args:
rpcmethod: The protocol's callfindValue or call_find_node.
The process:
1. calls find_* to current ALPHA nearest not already queried nodes,
adding results to current nearest list of k nodes.
2. current nearest list needs to keep track of who has been queried
already sort by nearest, keep KSIZE
3. if list is same as last time, next call should be to everyone not
yet queried
4. repeat, unless nearest list has all been queried, then ur done
"""
log.info("crawling network with nearest: %s", str(tuple(self.nearest)))
count = self.alpha
if self.nearest.get_ids() == self.last_ids_crawled:
count = len(self.nearest)
self.last_ids_crawled = self.nearest.get_ids()
dicts = {}
for peer in self.nearest.get_uncontacted()[:count]:
dicts[peer.peer_id_bytes] = rpcmethod(peer, self.node)
self.nearest.mark_contacted(peer)
found = await gather_dict(dicts)
return await self._nodes_found(found)
async def _nodes_found(self, responses):
raise NotImplementedError
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found
self.nearest_without_value = KadPeerHeap(self.node, 1)
async def find(self):
"""Find either the closest nodes or the value requested."""
return await self._find(self.protocol.call_find_value)
async def _nodes_found(self, responses):
"""Handle the result of an iteration in _find."""
toremove = []
found_values = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.has_value():
found_values.append(response.get_value())
else:
peer = self.nearest.get_node(peerid)
self.nearest_without_value.push(peer)
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if found_values:
return await self._handle_found_values(found_values)
if self.nearest.have_contacted_all():
# not found!
return None
return await self.find()
async def _handle_found_values(self, values):
"""
We got some values!
Exciting. But let's make sure they're all the same or freak out
a little bit. Also, make sure we tell the nearest node that
*didn't* have the value to store it.
"""
value_counts = Counter(values)
if len(value_counts) != 1:
log.warning(
"Got multiple values for key %i: %s", self.node.xor_id, str(values)
)
value = value_counts.most_common(1)[0][0]
peer = self.nearest_without_value.popleft()
if peer:
await self.protocol.call_store(peer, self.node.peer_id_bytes, value)
return value
class NodeSpiderCrawl(SpiderCrawl):
async def find(self):
"""Find the closest nodes."""
return await self._find(self.protocol.call_find_node)
async def _nodes_found(self, responses):
"""Handle the result of an iteration in _find."""
toremove = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
else:
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if self.nearest.have_contacted_all():
return list(self.nearest)
return await self.find()
class RPCFindResponse:
def __init__(self, response):
"""
A wrapper for the result of a RPC find.
Args:
response: This will be a tuple of (<response received>, <value>)
where <value> will be a list of tuples if not found or
a dictionary of {'value': v} where v is the value desired
"""
self.response = response
def happened(self):
"""Did the other host actually respond?"""
return self.response[0]
def has_value(self):
return isinstance(self.response[1], dict)
def get_value(self):
return self.response[1]["value"]
def get_node_list(self):
"""
Get the node list in the response.
If there's no value, this should be set.
"""
nodelist = self.response[1] or []
return [create_kad_peerinfo(*nodeple) for nodeple in nodelist]

View File

@ -1,153 +0,0 @@
import heapq
from operator import itemgetter
import random
from typing import List
from multiaddr import Multiaddr
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from .utils import digest
P_IP = "ip4"
P_UDP = "udp"
class KadPeerInfo(PeerInfo):
def __init__(self, peer_id, addrs):
super(KadPeerInfo, self).__init__(peer_id, addrs)
self.peer_id_bytes = peer_id.to_bytes()
self.xor_id = peer_id.xor_id
self.addrs = addrs
self.ip = self.addrs[0].value_for_protocol(P_IP) if addrs else None
self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if addrs else None
def same_home_as(self, node):
return sorted(self.addrs) == sorted(node.addrs)
def distance_to(self, node):
"""Get the distance between this node and another."""
return self.xor_id ^ node.xor_id
def __iter__(self):
"""
Enables use of Node as a tuple - i.e., tuple(node) works.
"""
return iter([self.peer_id_bytes, self.ip, self.port])
def __repr__(self):
return repr([self.xor_id, self.ip, self.port, self.peer_id_bytes])
def __str__(self):
return "%s:%s" % (self.ip, str(self.port))
def encode(self):
return (
str(self.peer_id_bytes)
+ "\n"
+ str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))
)
class KadPeerHeap:
"""A heap of peers ordered by distance to a given node."""
def __init__(self, node, maxsize):
"""
Constructor.
@param node: The node to measure all distnaces from.
@param maxsize: The maximum size that this heap can grow to.
"""
self.node = node
self.heap = []
self.contacted = set()
self.maxsize = maxsize
def remove(self, peers):
"""
Remove a list of peer ids from this heap.
Note that while this heap retains a constant visible size (based
on the iterator), it's actual size may be quite a bit larger
than what's exposed. Therefore, removal of nodes may not change
the visible size as previously added nodes suddenly become
visible.
"""
peers = set(peers)
if not peers:
return
nheap = []
for distance, node in self.heap:
if node.peer_id_bytes not in peers:
heapq.heappush(nheap, (distance, node))
self.heap = nheap
def get_node(self, node_id):
for _, node in self.heap:
if node.peer_id_bytes == node_id:
return node
return None
def have_contacted_all(self):
return len(self.get_uncontacted()) == 0
def get_ids(self):
return [n.peer_id_bytes for n in self]
def mark_contacted(self, node):
self.contacted.add(node.peer_id_bytes)
def popleft(self):
return heapq.heappop(self.heap)[1] if self else None
def push(self, nodes):
"""
Push nodes onto heap.
@param nodes: This can be a single item or a C{list}.
"""
if not isinstance(nodes, list):
nodes = [nodes]
for node in nodes:
if node not in self:
distance = self.node.distance_to(node)
heapq.heappush(self.heap, (distance, node))
def __len__(self):
return min(len(self.heap), self.maxsize)
def __iter__(self):
nodes = heapq.nsmallest(self.maxsize, self.heap)
return iter(map(itemgetter(1), nodes))
def __contains__(self, node):
for _, other in self.heap:
if node.peer_id_bytes == other.peer_id_bytes:
return True
return False
def get_uncontacted(self):
return [n for n in self if n.peer_id_bytes not in self.contacted]
def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None):
node_id = (
ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255)))
)
addrs: List[Multiaddr]
if sender_ip and sender_port:
addrs = [
Multiaddr(
"/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port)
)
]
else:
addrs = []
return KadPeerInfo(node_id, addrs)

View File

@ -1,188 +0,0 @@
import asyncio
import logging
import random
from rpcudp.protocol import RPCProtocol
from .kad_peerinfo import create_kad_peerinfo
from .routing import RoutingTable
log = logging.getLogger(__name__)
class KademliaProtocol(RPCProtocol):
"""
There are four main RPCs in the Kademlia protocol PING, STORE, FIND_NODE,
FIND_VALUE.
- PING probes if a node is still online
- STORE instructs a node to store (key, value)
- FIND_NODE takes a 160-bit ID and gets back
(ip, udp_port, node_id) for k closest nodes to target
- FIND_VALUE behaves like FIND_NODE unless a value is stored.
"""
def __init__(self, source_node, storage, ksize):
RPCProtocol.__init__(self)
self.router = RoutingTable(self, ksize, source_node)
self.storage = storage
self.source_node = source_node
def get_refresh_ids(self):
"""Get ids to search for to keep old buckets up to date."""
ids = []
for bucket in self.router.lonely_buckets():
rid = random.randint(*bucket.range).to_bytes(20, byteorder="big")
ids.append(rid)
return ids
def rpc_stun(self, sender):
return sender
def rpc_ping(self, sender, nodeid):
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
return self.source_node.peer_id_bytes
def rpc_store(self, sender, nodeid, key, value):
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
log.debug(
"got a store request from %s, storing '%s'='%s'", sender, key.hex(), value
)
self.storage[key] = value
return True
def rpc_find_node(self, sender, nodeid, key):
log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16))
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
node = create_kad_peerinfo(key)
neighbors = self.router.find_neighbors(node, exclude=source)
return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key):
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
value = self.storage.get(key, None)
if value is None:
return self.rpc_find_node(sender, nodeid, key)
return {"value": value}
def rpc_add_provider(self, sender, nodeid, key, provider_id):
"""rpc when receiving an add_provider call should validate received
PeerInfo matches sender nodeid if it does, receipient must store a
record in its datastore we store a map of content_id to peer_id (non
xor)"""
if nodeid == provider_id:
log.info(
"adding provider %s for key %s in local table", provider_id, str(key)
)
self.storage[key] = provider_id
return True
return False
def rpc_get_providers(self, sender, key):
"""rpc when receiving a get_providers call should look up key in data
store and respond with records plus a list of closer peers in its
routing table."""
providers = []
record = self.storage.get(key, None)
if record:
providers.append(record)
keynode = create_kad_peerinfo(key)
neighbors = self.router.find_neighbors(keynode)
for neighbor in neighbors:
if neighbor.peer_id_bytes != record:
providers.append(neighbor.peer_id_bytes)
return providers
async def call_find_node(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_node(
address, self.source_node.peer_id_bytes, node_to_find.peer_id_bytes
)
return self.handle_call_response(result, node_to_ask)
async def call_find_value(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_value(
address, self.source_node.peer_id_bytes, node_to_find.peer_id_bytes
)
return self.handle_call_response(result, node_to_ask)
async def call_ping(self, node_to_ask):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.ping(address, self.source_node.peer_id_bytes)
return self.handle_call_response(result, node_to_ask)
async def call_store(self, node_to_ask, key, value):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.store(address, self.source_node.peer_id_bytes, key, value)
return self.handle_call_response(result, node_to_ask)
async def call_add_provider(self, node_to_ask, key, provider_id):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.add_provider(
address, self.source_node.peer_id_bytes, key, provider_id
)
return self.handle_call_response(result, node_to_ask)
async def call_get_providers(self, node_to_ask, key):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.get_providers(address, key)
return self.handle_call_response(result, node_to_ask)
def welcome_if_new(self, node):
"""
Given a new node, send it all the keys/values it should be storing,
then add it to the routing table.
@param node: A new node that just joined (or that we just found out
about).
Process:
For each key in storage, get k closest nodes. If newnode is closer
than the furtherst in that list, and the node for this server
is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper)
"""
if not self.router.is_new_node(node):
return
log.info("never seen %s before, adding to router", node)
for key, value in self.storage:
keynode = create_kad_peerinfo(key)
neighbors = self.router.find_neighbors(keynode)
if neighbors:
last = neighbors[-1].distance_to(keynode)
new_node_close = node.distance_to(keynode) < last
first = neighbors[0].distance_to(keynode)
this_closest = self.source_node.distance_to(keynode) < first
if not neighbors or (new_node_close and this_closest):
asyncio.ensure_future(self.call_store(node, key, value))
self.router.add_contact(node)
def handle_call_response(self, result, node):
"""
If we get a response, add the node to the routing table.
If we get no response, make sure it's removed from the routing
table.
"""
if not result[0]:
log.warning("no response from %s, removing from router", node)
self.router.remove_contact(node)
return result
log.info("got successful response from %s", node)
self.welcome_if_new(node)
return result

View File

@ -1,184 +0,0 @@
import asyncio
from collections import OrderedDict
import heapq
import operator
import time
from .utils import OrderedSet, bytes_to_bit_string, shared_prefix
class KBucket:
"""each node keeps a list of (ip, udp_port, node_id) for nodes of distance
between 2^i and 2^(i+1) this list that every node keeps is a k-bucket each
k-bucket implements a last seen eviction policy except that live nodes are
never removed."""
def __init__(self, rangeLower, rangeUpper, ksize):
self.range = (rangeLower, rangeUpper)
self.nodes = OrderedDict()
self.replacement_nodes = OrderedSet()
self.touch_last_updated()
self.ksize = ksize
def touch_last_updated(self):
self.last_updated = time.monotonic()
def get_nodes(self):
return list(self.nodes.values())
def split(self):
midpoint = (self.range[0] + self.range[1]) / 2
one = KBucket(self.range[0], midpoint, self.ksize)
two = KBucket(midpoint + 1, self.range[1], self.ksize)
for node in self.nodes.values():
bucket = one if node.xor_id <= midpoint else two
bucket.nodes[node.peer_id_bytes] = node
return (one, two)
def remove_node(self, node):
if node.peer_id_bytes not in self.nodes:
return
# delete node, and see if we can add a replacement
del self.nodes[node.peer_id_bytes]
if self.replacement_nodes:
newnode = self.replacement_nodes.pop()
self.nodes[newnode.peer_id_bytes] = newnode
def has_in_range(self, node):
return self.range[0] <= node.xor_id <= self.range[1]
def is_new_node(self, node):
return node.peer_id_bytes not in self.nodes
def add_node(self, node):
"""
Add a C{Node} to the C{KBucket}. Return True if successful, False if
the bucket is full.
If the bucket is full, keep track of node in a replacement list,
per section 4.1 of the paper.
"""
if node.peer_id_bytes in self.nodes:
del self.nodes[node.peer_id_bytes]
self.nodes[node.peer_id_bytes] = node
elif len(self) < self.ksize:
self.nodes[node.peer_id_bytes] = node
else:
self.replacement_nodes.push(node)
return False
return True
def depth(self):
vals = self.nodes.values()
sprefix = shared_prefix([bytes_to_bit_string(n.peer_id_bytes) for n in vals])
return len(sprefix)
def head(self):
return list(self.nodes.values())[0]
def __getitem__(self, node_id):
return self.nodes.get(node_id, None)
def __len__(self):
return len(self.nodes)
class TableTraverser:
def __init__(self, table, startNode):
index = table.get_bucket_for(startNode)
table.buckets[index].touch_last_updated()
self.current_nodes = table.buckets[index].get_nodes()
self.left_buckets = table.buckets[:index]
self.right_buckets = table.buckets[(index + 1) :]
self.left = True
def __iter__(self):
return self
def __next__(self):
"""Pop an item from the left subtree, then right, then left, etc."""
if self.current_nodes:
return self.current_nodes.pop()
if self.left and self.left_buckets:
self.current_nodes = self.left_buckets.pop().get_nodes()
self.left = False
return next(self)
if self.right_buckets:
self.current_nodes = self.right_buckets.pop(0).get_nodes()
self.left = True
return next(self)
raise StopIteration
class RoutingTable:
def __init__(self, protocol, ksize, node):
"""
@param node: The node that represents this server. It won't
be added to the routing table, but will be needed later to
determine which buckets to split or not.
"""
self.node = node
self.protocol = protocol
self.ksize = ksize
self.flush()
def flush(self):
self.buckets = [KBucket(0, 2 ** 160, self.ksize)]
def split_bucket(self, index):
one, two = self.buckets[index].split()
self.buckets[index] = one
self.buckets.insert(index + 1, two)
def lonely_buckets(self):
"""Get all of the buckets that haven't been updated in over an hour."""
hrago = time.monotonic() - 3600
return [b for b in self.buckets if b.last_updated < hrago]
def remove_contact(self, node):
index = self.get_bucket_for(node)
self.buckets[index].remove_node(node)
def is_new_node(self, node):
index = self.get_bucket_for(node)
return self.buckets[index].is_new_node(node)
def add_contact(self, node):
index = self.get_bucket_for(node)
bucket = self.buckets[index]
# this will succeed unless the bucket is full
if bucket.add_node(node):
return
# Per section 4.2 of paper, split if the bucket has the node
# in its range or if the depth is not congruent to 0 mod 5
if bucket.has_in_range(self.node) or bucket.depth() % 5 != 0:
self.split_bucket(index)
self.add_contact(node)
else:
asyncio.ensure_future(self.protocol.call_ping(bucket.head()))
def get_bucket_for(self, node):
"""Get the index of the bucket that the given node would fall into."""
for index, bucket in enumerate(self.buckets):
if node.xor_id < bucket.range[1]:
return index
# we should never be here, but make linter happy
return None
def find_neighbors(self, node, k=None, exclude=None):
k = k or self.ksize
nodes = []
for neighbor in TableTraverser(self, node):
notexcluded = exclude is None or not neighbor.same_home_as(exclude)
if neighbor.peer_id_bytes != node.peer_id_bytes and notexcluded:
heapq.heappush(nodes, (node.distance_to(neighbor), neighbor))
if len(nodes) == k:
break
return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes)))

View File

@ -1,78 +0,0 @@
// Record represents a dht record that contains a value
// for a key value pair
message Record {
// The key that references this record
bytes key = 1;
// The actual value this record is storing
bytes value = 2;
// Note: These fields were removed from the Record message
// hash of the authors public key
//optional string author = 3;
// A PKI signature for the key+value+author
//optional bytes signature = 4;
// Time the record was received, set by receiver
string timeReceived = 5;
};
message Message {
enum MessageType {
PUT_VALUE = 0;
GET_VALUE = 1;
ADD_PROVIDER = 2;
GET_PROVIDERS = 3;
FIND_NODE = 4;
PING = 5;
}
enum ConnectionType {
// sender does not have a connection to peer, and no extra information (default)
NOT_CONNECTED = 0;
// sender has a live connection to peer
CONNECTED = 1;
// sender recently connected to peer
CAN_CONNECT = 2;
// sender recently tried to connect to peer repeatedly but failed to connect
// ("try" here is loose, but this should signal "made strong effort, failed")
CANNOT_CONNECT = 3;
}
message Peer {
// ID of a given peer.
bytes id = 1;
// multiaddrs for a given peer
repeated bytes addrs = 2;
// used to signal the sender's connection capabilities to the peer
ConnectionType connection = 3;
}
// defines what type of message it is.
MessageType type = 1;
// defines what coral cluster level this query/response belongs to.
// in case we want to implement coral's cluster rings in the future.
int32 clusterLevelRaw = 10; // NOT USED
// Used to specify the key associated with this message.
// PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS
bytes key = 2;
// Used to return a value
// PUT_VALUE, GET_VALUE
Record record = 3;
// Used to return peers closer to a key in a query
// GET_VALUE, GET_PROVIDERS, FIND_NODE
repeated Peer closerPeers = 8;
// Used to return Providers
// GET_VALUE, ADD_PROVIDER, GET_PROVIDERS
repeated Peer providerPeers = 9;
}

View File

@ -1,93 +0,0 @@
from abc import ABC, abstractmethod
from collections import OrderedDict
from itertools import takewhile
import operator
import time
class IStorage(ABC):
"""
Local storage for this node.
IStorage implementations of get must return the same type as put in
by set
"""
@abstractmethod
def __setitem__(self, key, value):
"""Set a key to the given value."""
@abstractmethod
def __getitem__(self, key):
"""
Get the given key.
If item doesn't exist, raises C{KeyError}
"""
@abstractmethod
def get(self, key, default=None):
"""
Get given key.
If not found, return default.
"""
@abstractmethod
def iter_older_than(self, seconds_old):
"""Return the an iterator over (key, value) tuples for items older than
the given seconds_old."""
@abstractmethod
def __iter__(self):
"""Get the iterator for this storage, should yield tuple of (key,
value)"""
class ForgetfulStorage(IStorage):
def __init__(self, ttl=604800):
"""By default, max age is a week."""
self.data = OrderedDict()
self.ttl = ttl
def __setitem__(self, key, value):
if key in self.data:
del self.data[key]
self.data[key] = (time.monotonic(), value)
self.cull()
def cull(self):
for _, _ in self.iter_older_than(self.ttl):
self.data.popitem(last=False)
def get(self, key, default=None):
self.cull()
if key in self.data:
return self[key]
return default
def __getitem__(self, key):
self.cull()
return self.data[key][1]
def __repr__(self):
self.cull()
return repr(self.data)
def iter_older_than(self, seconds_old):
min_birthday = time.monotonic() - seconds_old
zipped = self._triple_iter()
matches = takewhile(lambda r: min_birthday >= r[1], zipped)
return list(map(operator.itemgetter(0, 2), matches))
def _triple_iter(self):
ikeys = self.data.keys()
ibirthday = map(operator.itemgetter(0), self.data.values())
ivalues = map(operator.itemgetter(1), self.data.values())
return zip(ikeys, ibirthday, ivalues)
def __iter__(self):
self.cull()
ikeys = self.data.keys()
ivalues = map(operator.itemgetter(1), self.data.values())
return zip(ikeys, ivalues)

View File

@ -1,56 +0,0 @@
"""General catchall for functions that don't make sense as methods."""
import asyncio
import hashlib
import operator
async def gather_dict(dic):
cors = list(dic.values())
results = await asyncio.gather(*cors)
return dict(zip(dic.keys(), results))
def digest(string):
if not isinstance(string, bytes):
string = str(string).encode("utf8")
return hashlib.sha1(string).digest()
class OrderedSet(list):
"""
Acts like a list in all ways, except in the behavior of the.
:meth:`push` method.
"""
def push(self, thing):
"""
1. If the item exists in the list, it's removed
2. The item is pushed to the end of the list
"""
if thing in self:
self.remove(thing)
self.append(thing)
def shared_prefix(args):
"""
Find the shared prefix between the strings.
For instance:
sharedPrefix(['blahblah', 'blahwhat'])
returns 'blah'.
"""
i = 0
while i < min(map(len, args)):
if len(set(map(operator.itemgetter(i), args))) != 1:
break
i += 1
return args[0][:i]
def bytes_to_bit_string(bites):
bits = [bin(bite)[2:].rjust(8, "0") for bite in bites]
return "".join(bits)

View File

@ -1,21 +0,0 @@
from typing import Iterable
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.interfaces import IContentRouting
class KadmeliaContentRouter(IContentRouting):
def provide(self, cid: bytes, announce: bool = True) -> None:
"""
Provide adds the given cid to the content routing system.
If announce is True, it also announces it, otherwise it is just
kept in the local accounting of which objects are being
provided.
"""
# the DHT finds the closest peers to `key` using the `FIND_NODE` RPC
# then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers.
def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]:
"""Search for peers who are able to provide a given key returns an
iterator of peer.PeerInfo."""

View File

@ -1,43 +0,0 @@
import json
import multiaddr
from libp2p.kademlia.network import KademliaServer
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.interfaces import IPeerRouting
class KadmeliaPeerRouter(IPeerRouting):
server: KademliaServer
def __init__(self, dht_server: KademliaServer) -> None:
self.server = dht_server
async def find_peer(self, peer_id: ID) -> PeerInfo:
"""
Find a specific peer.
:param peer_id: peer to search for
:return: PeerInfo of specified peer
"""
# switching peer_id to xor_id used by kademlia as node_id
xor_id = peer_id.xor_id
# ignore type for kad
value = await self.server.get(xor_id) # type: ignore
return (
peer_info_from_str(value) if value else None
) # TODO: should raise error if None?
def peer_info_to_str(peer_info: PeerInfo) -> str:
return json.dumps(
[peer_info.peer_id.to_string(), list(map(lambda a: str(a), peer_info.addrs))]
)
def peer_info_from_str(string: str) -> PeerInfo:
peer_id, raw_addrs = json.loads(string)
return PeerInfo(
ID.from_base58(peer_id), list(map(lambda a: multiaddr.Multiaddr(a), raw_addrs))
)

View File

@ -1,16 +1,16 @@
from typing import List, Sequence, Tuple
from typing import Dict, Sequence, Tuple, cast
import multiaddr
from libp2p import new_node
from libp2p.host.basic_host import BasicHost
from libp2p.host.host_interface import IHost
from libp2p.kademlia.network import KademliaServer
from libp2p.host.routed_host import RoutedHost
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.network.swarm import Swarm
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
from libp2p.routing.interfaces import IPeerRouting
from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
from libp2p.typing import StreamHandlerFn, TProtocol
from .constants import MAX_READ_LEN
@ -47,35 +47,6 @@ async def set_up_nodes_by_transport_opt(
return tuple(nodes_list)
async def set_up_nodes_by_transport_and_disc_opt(
transport_disc_opt_list: Sequence[Tuple[Sequence[str], IPeerRouting]]
) -> Tuple[BasicHost, ...]:
nodes_list = []
for transport_opt, disc_opt in transport_disc_opt_list:
node = await new_node(transport_opt=transport_opt, disc_opt=disc_opt)
await node.get_network().listen(multiaddr.Multiaddr(transport_opt[0]))
nodes_list.append(node)
return tuple(nodes_list)
async def set_up_routers(
router_confs: Tuple[int, int] = (0, 0)
) -> List[KadmeliaPeerRouter]:
"""The default ``router_confs`` selects two free ports local to this
machine."""
bootstrap_node = KademliaServer() # type: ignore
await bootstrap_node.listen(router_confs[0])
routers = [KadmeliaPeerRouter(bootstrap_node)]
for port in router_confs[1:]:
node = KademliaServer() # type: ignore
await node.listen(port)
await node.bootstrap_node(bootstrap_node.address)
routers.append(KadmeliaPeerRouter(node))
return routers
async def echo_stream_handler(stream: INetStream) -> None:
while True:
read_string = (await stream.read(MAX_READ_LEN)).decode()
@ -95,3 +66,33 @@ async def perform_two_host_set_up(
# Associate the peer with local ip address (see default parameters of Libp2p())
node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10)
return node_a, node_b
class DummyRouter(IPeerRouting):
_routing_table: Dict[ID, PeerInfo]
def __init__(self) -> None:
self._routing_table = dict()
async def find_peer(self, peer_id: ID) -> PeerInfo:
return self._routing_table.get(peer_id, None)
async def set_up_routed_hosts() -> Tuple[RoutedHost, RoutedHost]:
router_a, router_b = DummyRouter(), DummyRouter()
transport = "/ip4/127.0.0.1/tcp/0"
host_a = await new_node(transport_opt=[transport], disc_opt=router_a)
host_b = await new_node(transport_opt=[transport], disc_opt=router_b)
address = multiaddr.Multiaddr(transport)
await host_a.get_network().listen(address)
await host_b.get_network().listen(address)
mock_routing_table = {
host_a.get_id(): PeerInfo(host_a.get_id(), host_a.get_addrs()),
host_b.get_id(): PeerInfo(host_b.get_id(), host_b.get_addrs()),
}
router_a._routing_table = router_b._routing_table = mock_routing_table
return cast(RoutedHost, host_a), cast(RoutedHost, host_b)

View File

@ -15,6 +15,3 @@ warn_redundant_casts = True
warn_return_any = False
warn_unused_configs = True
warn_unreachable = True
[mypy-libp2p.kademlia.*]
ignore_errors = True

View File

@ -4,71 +4,30 @@ import pytest
from libp2p.host.exceptions import ConnectionFailure
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.kademlia.kademlia_peer_router import peer_info_to_str
from libp2p.tools.utils import (
set_up_nodes_by_transport_and_disc_opt,
set_up_nodes_by_transport_opt,
set_up_routers,
)
from libp2p.tools.utils import set_up_nodes_by_transport_opt, set_up_routed_hosts
@pytest.mark.asyncio
async def test_host_routing_success():
routers = await set_up_routers()
transports = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]]
transport_disc_opt_list = zip(transports, routers)
(host_a, host_b) = await set_up_nodes_by_transport_and_disc_opt(
transport_disc_opt_list
)
# Set routing info
await routers[0].server.set(
host_a.get_id().xor_id,
peer_info_to_str(PeerInfo(host_a.get_id(), host_a.get_addrs())),
)
await routers[1].server.set(
host_b.get_id().xor_id,
peer_info_to_str(PeerInfo(host_b.get_id(), host_b.get_addrs())),
)
host_a, host_b = await set_up_routed_hosts()
# forces to use routing as no addrs are provided
await host_a.connect(PeerInfo(host_b.get_id(), []))
await host_b.connect(PeerInfo(host_a.get_id(), []))
# Clean up
await asyncio.gather(*[host_a.close(), host_b.close()])
routers[0].server.stop()
routers[1].server.stop()
@pytest.mark.asyncio
async def test_host_routing_fail():
routers = await set_up_routers()
transports = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]]
transport_disc_opt_list = zip(transports, routers)
(host_a, host_b) = await set_up_nodes_by_transport_and_disc_opt(
transport_disc_opt_list
)
host_c = (await set_up_nodes_by_transport_opt([["/ip4/127.0.0.1/tcp/0"]]))[0]
# Set routing info
await routers[0].server.set(
host_a.get_id().xor_id,
peer_info_to_str(PeerInfo(host_a.get_id(), host_a.get_addrs())),
)
await routers[1].server.set(
host_b.get_id().xor_id,
peer_info_to_str(PeerInfo(host_b.get_id(), host_b.get_addrs())),
)
host_a, host_b = await set_up_routed_hosts()
basic_host_c = (await set_up_nodes_by_transport_opt([["/ip4/127.0.0.1/tcp/0"]]))[0]
# routing fails because host_c does not use routing
with pytest.raises(ConnectionFailure):
await host_a.connect(PeerInfo(host_c.get_id(), []))
await host_a.connect(PeerInfo(basic_host_c.get_id(), []))
with pytest.raises(ConnectionFailure):
await host_b.connect(PeerInfo(host_c.get_id(), []))
await host_b.connect(PeerInfo(basic_host_c.get_id(), []))
# Clean up
await asyncio.gather(*[host_a.close(), host_b.close(), host_c.close()])
routers[0].server.stop()
routers[1].server.stop()
await asyncio.gather(*[host_a.close(), host_b.close(), basic_host_c.close()])

View File

@ -1,79 +0,0 @@
import pytest
from libp2p.kademlia.network import KademliaServer
@pytest.mark.asyncio
async def test_example():
node_a = KademliaServer()
await node_a.listen()
node_b = KademliaServer()
await node_b.listen()
# Bootstrap the node by connecting to other known nodes, in this case
# replace 123.123.123.123 with the IP of another node and optionally
# give as many ip/port combos as you can for other nodes.
await node_b.bootstrap([node_a.address])
# set a value for the key "my-key" on the network
value = "my-value"
key = "my-key"
await node_b.set(key, value)
# get the value associated with "my-key" from the network
assert await node_b.get(key) == value
assert await node_a.get(key) == value
@pytest.mark.parametrize("nodes_nr", [(2 ** i) for i in range(2, 5)])
@pytest.mark.asyncio
async def test_multiple_nodes_bootstrap_set_get(nodes_nr):
node_bootstrap = KademliaServer()
await node_bootstrap.listen(3000 + nodes_nr * 2)
nodes = []
for i in range(nodes_nr):
node = KademliaServer()
addrs = [("127.0.0.1", 3000 + nodes_nr * 2)]
await node.listen(3001 + i + nodes_nr * 2)
await node.bootstrap(addrs)
nodes.append(node)
for i, node in enumerate(nodes):
# set a value for the key "my-key" on the network
value = "my awesome value %d" % i
key = "set from %d" % i
await node.set(key, value)
for i in range(nodes_nr):
for node in nodes:
value = "my awesome value %d" % i
key = "set from %d" % i
assert await node.get(key) == value
@pytest.mark.parametrize("nodes_nr", [(2 ** i) for i in range(2, 5)])
@pytest.mark.asyncio
async def test_multiple_nodes_set_bootstrap_get(nodes_nr):
node_bootstrap = KademliaServer()
await node_bootstrap.listen(2000 + nodes_nr * 2)
nodes = []
for i in range(nodes_nr):
node = KademliaServer()
addrs = [("127.0.0.1", 2000 + nodes_nr * 2)]
await node.listen(2001 + i + nodes_nr * 2)
await node.bootstrap(addrs)
value = "my awesome value %d" % i
key = "set from %d" % i
await node.set(key, value)
nodes.append(node)
for i in range(nodes_nr):
for node in nodes:
value = "my awesome value %d" % i
key = "set from %d" % i
assert await node.get(key) == value

View File

@ -1,30 +0,0 @@
import pytest
from libp2p.kademlia.network import KademliaServer
@pytest.mark.asyncio
async def test_example():
node_a = KademliaServer()
await node_a.listen()
node_b = KademliaServer()
await node_b.listen()
await node_b.bootstrap([node_a.address])
key = "hello"
value = "world"
await node_b.set(key, value)
await node_b.provide("hello")
providers = await node_b.get_providers("hello")
# bmuller's handle_call_response wraps
# every rpc call result in a list of tuples
# [(True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F'])]
first_tuple = providers[0]
# (True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F'])
first_providers = first_tuple[1]
# [b'\xf9\xa1\xf5\x10a\xe5\xe0F']
first_provider = first_providers[0]
assert node_b.node.peer_id_bytes == first_provider

View File

@ -1,75 +0,0 @@
import pytest
from libp2p.kademlia.network import KademliaServer
from libp2p.peer.id import ID
from libp2p.routing.kademlia.kademlia_peer_router import (
KadmeliaPeerRouter,
peer_info_to_str,
)
@pytest.mark.asyncio
async def test_simple_two_nodes():
node_a = KademliaServer()
await node_a.listen(5678)
node_b = KademliaServer()
await node_b.listen(5679)
node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)])
node_a_kad_peerinfo = node_a_value[0]
await node_a.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_b)
returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes))
assert returned_info == node_a_kad_peerinfo
@pytest.mark.asyncio
async def test_simple_three_nodes():
node_a = KademliaServer()
await node_a.listen(5701)
node_b = KademliaServer()
await node_b.listen(5702)
node_c = KademliaServer()
await node_c.listen(5703)
node_a_value = await node_b.bootstrap([("127.0.0.1", 5701)])
node_a_kad_peerinfo = node_a_value[0]
await node_c.bootstrap([("127.0.0.1", 5702)])
await node_a.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_c)
returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes))
assert returned_info == node_a_kad_peerinfo
@pytest.mark.asyncio
async def test_simple_four_nodes():
node_a = KademliaServer()
await node_a.listen(5801)
node_b = KademliaServer()
await node_b.listen(5802)
node_c = KademliaServer()
await node_c.listen(5803)
node_d = KademliaServer()
await node_d.listen(5804)
node_a_value = await node_b.bootstrap([("127.0.0.1", 5801)])
node_a_kad_peerinfo = node_a_value[0]
await node_c.bootstrap([("127.0.0.1", 5802)])
await node_d.bootstrap([("127.0.0.1", 5803)])
await node_b.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo))
router = KadmeliaPeerRouter(node_d)
returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes))
assert returned_info == node_a_kad_peerinfo