diff --git a/README.md b/README.md index 9b79954..417c27c 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ py-libp2p aims for conformity with [the standard libp2p modules](https://github. | Peer Discovery | Status | | -------------------------------------------- | :-----------: | | **`bootstrap list`** | :tomato: | -| **`Kademlia DHT`** | :lemon: | +| **`Kademlia DHT`** | :chestnut: | | **`mDNS`** | :chestnut: | | **`PEX`** | :chestnut: | | **`DNS`** | :chestnut: | @@ -147,7 +147,7 @@ py-libp2p aims for conformity with [the standard libp2p modules](https://github. | Content Routing | Status | | -------------------------------------------- | :-----------: | -| **`Kademlia DHT`** | :lemon: | +| **`Kademlia DHT`** | :chestnut: | | **`floodsub`** | :green_apple: | | **`gossipsub`** | :green_apple: | | **`PHT`** | :chestnut: | @@ -155,7 +155,7 @@ py-libp2p aims for conformity with [the standard libp2p modules](https://github. | Peer Routing | Status | | -------------------------------------------- | :-----------: | -| **`Kademlia DHT`** | :green_apple: | +| **`Kademlia DHT`** | :chestnut: | | **`floodsub`** | :green_apple: | | **`gossipsub`** | :green_apple: | | **`PHT`** | :chestnut: | diff --git a/docs/libp2p.kademlia.rst b/docs/libp2p.kademlia.rst deleted file mode 100644 index aa0417f..0000000 --- a/docs/libp2p.kademlia.rst +++ /dev/null @@ -1,70 +0,0 @@ -libp2p.kademlia package -======================= - -Submodules ----------- - -libp2p.kademlia.crawling module -------------------------------- - -.. automodule:: libp2p.kademlia.crawling - :members: - :undoc-members: - :show-inheritance: - -libp2p.kademlia.kad\_peerinfo module ------------------------------------- - -.. automodule:: libp2p.kademlia.kad_peerinfo - :members: - :undoc-members: - :show-inheritance: - -libp2p.kademlia.network module ------------------------------- - -.. automodule:: libp2p.kademlia.network - :members: - :undoc-members: - :show-inheritance: - -libp2p.kademlia.protocol module -------------------------------- - -.. automodule:: libp2p.kademlia.protocol - :members: - :undoc-members: - :show-inheritance: - -libp2p.kademlia.routing module ------------------------------- - -.. automodule:: libp2p.kademlia.routing - :members: - :undoc-members: - :show-inheritance: - -libp2p.kademlia.storage module ------------------------------- - -.. automodule:: libp2p.kademlia.storage - :members: - :undoc-members: - :show-inheritance: - -libp2p.kademlia.utils module ----------------------------- - -.. automodule:: libp2p.kademlia.utils - :members: - :undoc-members: - :show-inheritance: - - -Module contents ---------------- - -.. automodule:: libp2p.kademlia - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/libp2p.routing.kademlia.rst b/docs/libp2p.routing.kademlia.rst deleted file mode 100644 index c52dcdc..0000000 --- a/docs/libp2p.routing.kademlia.rst +++ /dev/null @@ -1,30 +0,0 @@ -libp2p.routing.kademlia package -=============================== - -Submodules ----------- - -libp2p.routing.kademlia.kademlia\_content\_router module --------------------------------------------------------- - -.. automodule:: libp2p.routing.kademlia.kademlia_content_router - :members: - :undoc-members: - :show-inheritance: - -libp2p.routing.kademlia.kademlia\_peer\_router module ------------------------------------------------------ - -.. automodule:: libp2p.routing.kademlia.kademlia_peer_router - :members: - :undoc-members: - :show-inheritance: - - -Module contents ---------------- - -.. automodule:: libp2p.routing.kademlia - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/libp2p.routing.rst b/docs/libp2p.routing.rst index 1595e4e..88b3c5b 100644 --- a/docs/libp2p.routing.rst +++ b/docs/libp2p.routing.rst @@ -1,12 +1,6 @@ libp2p.routing package ====================== -Subpackages ------------ - -.. toctree:: - - libp2p.routing.kademlia Submodules ---------- diff --git a/docs/libp2p.rst b/docs/libp2p.rst index 8aa6bc5..a50db7e 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -10,7 +10,6 @@ Subpackages libp2p.host libp2p.identity libp2p.io - libp2p.kademlia libp2p.network libp2p.peer libp2p.protocol_muxer diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 27c8239..8813fd3 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -6,15 +6,12 @@ from libp2p.crypto.rsa import create_new_key_pair from libp2p.host.basic_host import BasicHost from libp2p.host.host_interface import IHost from libp2p.host.routed_host import RoutedHost -from libp2p.kademlia.network import KademliaServer -from libp2p.kademlia.storage import IStorage from libp2p.network.network_interface import INetwork from libp2p.network.swarm import Swarm from libp2p.peer.id import ID from libp2p.peer.peerstore import PeerStore from libp2p.peer.peerstore_interface import IPeerStore from libp2p.routing.interfaces import IPeerRouting -from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport import libp2p.security.secio.transport as secio from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex @@ -45,31 +42,6 @@ def generate_peer_id_from(key_pair: KeyPair) -> ID: return ID.from_pubkey(public_key) -def initialize_default_kademlia_router( - ksize: int = 20, alpha: int = 3, id_opt: ID = None, storage: IStorage = None -) -> KadmeliaPeerRouter: - """ - initialize kadmelia router when no kademlia router is passed in. - - :param ksize: The k parameter from the paper - :param alpha: The alpha parameter from the paper - :param id_opt: optional id for host - :param storage: An instance that implements - :class:`~kademlia.storage.IStorage` - :return: return a default kademlia instance - """ - if not id_opt: - key_pair = generate_new_rsa_identity() - id_opt = generate_peer_id_from(key_pair) - - node_id = id_opt.to_bytes() - # ignore type for Kademlia module - server = KademliaServer( # type: ignore - ksize=ksize, alpha=alpha, node_id=node_id, storage=storage - ) - return KadmeliaPeerRouter(server) - - def initialize_default_swarm( key_pair: KeyPair, id_opt: ID = None, diff --git a/libp2p/kademlia/__init__.py b/libp2p/kademlia/__init__.py deleted file mode 100644 index 1456859..0000000 --- a/libp2p/kademlia/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Kademlia is a Python implementation of the Kademlia protocol which utilizes -the asyncio library.""" -__version__ = "2.0" diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py deleted file mode 100644 index 1a5566f..0000000 --- a/libp2p/kademlia/crawling.py +++ /dev/null @@ -1,173 +0,0 @@ -from collections import Counter -import logging - -from .kad_peerinfo import KadPeerHeap, create_kad_peerinfo -from .utils import gather_dict - -log = logging.getLogger(__name__) - - -class SpiderCrawl: - """Crawl the network and look for given 160-bit keys.""" - - def __init__(self, protocol, node, peers, ksize, alpha): - """ - Create a new C{SpiderCrawl}er. - - Args: - protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance. - node: A :class:`~kademlia.node.Node` representing the key we're - looking for - peers: A list of :class:`~kademlia.node.Node` instances that - provide the entry point for the network - ksize: The value for k based on the paper - alpha: The value for alpha based on the paper - """ - self.protocol = protocol - self.ksize = ksize - self.alpha = alpha - self.node = node - self.nearest = KadPeerHeap(self.node, self.ksize) - self.last_ids_crawled = [] - log.info("creating spider with peers: %s", peers) - self.nearest.push(peers) - - async def _find(self, rpcmethod): - """ - Get either a value or list of nodes. - - Args: - rpcmethod: The protocol's callfindValue or call_find_node. - - The process: - 1. calls find_* to current ALPHA nearest not already queried nodes, - adding results to current nearest list of k nodes. - 2. current nearest list needs to keep track of who has been queried - already sort by nearest, keep KSIZE - 3. if list is same as last time, next call should be to everyone not - yet queried - 4. repeat, unless nearest list has all been queried, then ur done - """ - log.info("crawling network with nearest: %s", str(tuple(self.nearest))) - count = self.alpha - if self.nearest.get_ids() == self.last_ids_crawled: - count = len(self.nearest) - self.last_ids_crawled = self.nearest.get_ids() - - dicts = {} - for peer in self.nearest.get_uncontacted()[:count]: - dicts[peer.peer_id_bytes] = rpcmethod(peer, self.node) - self.nearest.mark_contacted(peer) - found = await gather_dict(dicts) - return await self._nodes_found(found) - - async def _nodes_found(self, responses): - raise NotImplementedError - - -class ValueSpiderCrawl(SpiderCrawl): - def __init__(self, protocol, node, peers, ksize, alpha): - SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) - # keep track of the single nearest node without value - per - # section 2.3 so we can set the key there if found - self.nearest_without_value = KadPeerHeap(self.node, 1) - - async def find(self): - """Find either the closest nodes or the value requested.""" - return await self._find(self.protocol.call_find_value) - - async def _nodes_found(self, responses): - """Handle the result of an iteration in _find.""" - toremove = [] - found_values = [] - for peerid, response in responses.items(): - response = RPCFindResponse(response) - if not response.happened(): - toremove.append(peerid) - elif response.has_value(): - found_values.append(response.get_value()) - else: - peer = self.nearest.get_node(peerid) - self.nearest_without_value.push(peer) - self.nearest.push(response.get_node_list()) - self.nearest.remove(toremove) - - if found_values: - return await self._handle_found_values(found_values) - if self.nearest.have_contacted_all(): - # not found! - return None - return await self.find() - - async def _handle_found_values(self, values): - """ - We got some values! - - Exciting. But let's make sure they're all the same or freak out - a little bit. Also, make sure we tell the nearest node that - *didn't* have the value to store it. - """ - value_counts = Counter(values) - if len(value_counts) != 1: - log.warning( - "Got multiple values for key %i: %s", self.node.xor_id, str(values) - ) - value = value_counts.most_common(1)[0][0] - - peer = self.nearest_without_value.popleft() - if peer: - await self.protocol.call_store(peer, self.node.peer_id_bytes, value) - return value - - -class NodeSpiderCrawl(SpiderCrawl): - async def find(self): - """Find the closest nodes.""" - return await self._find(self.protocol.call_find_node) - - async def _nodes_found(self, responses): - """Handle the result of an iteration in _find.""" - toremove = [] - for peerid, response in responses.items(): - response = RPCFindResponse(response) - if not response.happened(): - toremove.append(peerid) - else: - self.nearest.push(response.get_node_list()) - self.nearest.remove(toremove) - - if self.nearest.have_contacted_all(): - return list(self.nearest) - return await self.find() - - -class RPCFindResponse: - def __init__(self, response): - """ - A wrapper for the result of a RPC find. - - Args: - response: This will be a tuple of (, ) - where will be a list of tuples if not found or - a dictionary of {'value': v} where v is the value desired - """ - self.response = response - - def happened(self): - """Did the other host actually respond?""" - return self.response[0] - - def has_value(self): - return isinstance(self.response[1], dict) - - def get_value(self): - return self.response[1]["value"] - - def get_node_list(self): - """ - Get the node list in the response. - - If there's no value, this should be set. - """ - nodelist = self.response[1] or [] - return [create_kad_peerinfo(*nodeple) for nodeple in nodelist] diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py deleted file mode 100644 index efb20f1..0000000 --- a/libp2p/kademlia/kad_peerinfo.py +++ /dev/null @@ -1,153 +0,0 @@ -import heapq -from operator import itemgetter -import random -from typing import List - -from multiaddr import Multiaddr - -from libp2p.peer.id import ID -from libp2p.peer.peerinfo import PeerInfo - -from .utils import digest - -P_IP = "ip4" -P_UDP = "udp" - - -class KadPeerInfo(PeerInfo): - def __init__(self, peer_id, addrs): - super(KadPeerInfo, self).__init__(peer_id, addrs) - - self.peer_id_bytes = peer_id.to_bytes() - self.xor_id = peer_id.xor_id - - self.addrs = addrs - - self.ip = self.addrs[0].value_for_protocol(P_IP) if addrs else None - self.port = int(self.addrs[0].value_for_protocol(P_UDP)) if addrs else None - - def same_home_as(self, node): - return sorted(self.addrs) == sorted(node.addrs) - - def distance_to(self, node): - """Get the distance between this node and another.""" - return self.xor_id ^ node.xor_id - - def __iter__(self): - """ - Enables use of Node as a tuple - i.e., tuple(node) works. - """ - return iter([self.peer_id_bytes, self.ip, self.port]) - - def __repr__(self): - return repr([self.xor_id, self.ip, self.port, self.peer_id_bytes]) - - def __str__(self): - return "%s:%s" % (self.ip, str(self.port)) - - def encode(self): - return ( - str(self.peer_id_bytes) - + "\n" - + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port)) - ) - - -class KadPeerHeap: - """A heap of peers ordered by distance to a given node.""" - - def __init__(self, node, maxsize): - """ - Constructor. - - @param node: The node to measure all distnaces from. - @param maxsize: The maximum size that this heap can grow to. - """ - self.node = node - self.heap = [] - self.contacted = set() - self.maxsize = maxsize - - def remove(self, peers): - """ - Remove a list of peer ids from this heap. - - Note that while this heap retains a constant visible size (based - on the iterator), it's actual size may be quite a bit larger - than what's exposed. Therefore, removal of nodes may not change - the visible size as previously added nodes suddenly become - visible. - """ - peers = set(peers) - if not peers: - return - nheap = [] - for distance, node in self.heap: - if node.peer_id_bytes not in peers: - heapq.heappush(nheap, (distance, node)) - self.heap = nheap - - def get_node(self, node_id): - for _, node in self.heap: - if node.peer_id_bytes == node_id: - return node - return None - - def have_contacted_all(self): - return len(self.get_uncontacted()) == 0 - - def get_ids(self): - return [n.peer_id_bytes for n in self] - - def mark_contacted(self, node): - self.contacted.add(node.peer_id_bytes) - - def popleft(self): - return heapq.heappop(self.heap)[1] if self else None - - def push(self, nodes): - """ - Push nodes onto heap. - - @param nodes: This can be a single item or a C{list}. - """ - if not isinstance(nodes, list): - nodes = [nodes] - - for node in nodes: - if node not in self: - distance = self.node.distance_to(node) - heapq.heappush(self.heap, (distance, node)) - - def __len__(self): - return min(len(self.heap), self.maxsize) - - def __iter__(self): - nodes = heapq.nsmallest(self.maxsize, self.heap) - return iter(map(itemgetter(1), nodes)) - - def __contains__(self, node): - for _, other in self.heap: - if node.peer_id_bytes == other.peer_id_bytes: - return True - return False - - def get_uncontacted(self): - return [n for n in self if n.peer_id_bytes not in self.contacted] - - -def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None): - node_id = ( - ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255))) - ) - addrs: List[Multiaddr] - if sender_ip and sender_port: - addrs = [ - Multiaddr( - "/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port) - ) - ] - else: - addrs = [] - - return KadPeerInfo(node_id, addrs) diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py deleted file mode 100644 index 59f4206..0000000 --- a/libp2p/kademlia/protocol.py +++ /dev/null @@ -1,188 +0,0 @@ -import asyncio -import logging -import random - -from rpcudp.protocol import RPCProtocol - -from .kad_peerinfo import create_kad_peerinfo -from .routing import RoutingTable - -log = logging.getLogger(__name__) - - -class KademliaProtocol(RPCProtocol): - """ - There are four main RPCs in the Kademlia protocol PING, STORE, FIND_NODE, - FIND_VALUE. - - - PING probes if a node is still online - - STORE instructs a node to store (key, value) - - FIND_NODE takes a 160-bit ID and gets back - (ip, udp_port, node_id) for k closest nodes to target - - FIND_VALUE behaves like FIND_NODE unless a value is stored. - """ - - def __init__(self, source_node, storage, ksize): - RPCProtocol.__init__(self) - self.router = RoutingTable(self, ksize, source_node) - self.storage = storage - self.source_node = source_node - - def get_refresh_ids(self): - """Get ids to search for to keep old buckets up to date.""" - ids = [] - for bucket in self.router.lonely_buckets(): - rid = random.randint(*bucket.range).to_bytes(20, byteorder="big") - ids.append(rid) - return ids - - def rpc_stun(self, sender): - return sender - - def rpc_ping(self, sender, nodeid): - source = create_kad_peerinfo(nodeid, sender[0], sender[1]) - - self.welcome_if_new(source) - return self.source_node.peer_id_bytes - - def rpc_store(self, sender, nodeid, key, value): - source = create_kad_peerinfo(nodeid, sender[0], sender[1]) - - self.welcome_if_new(source) - log.debug( - "got a store request from %s, storing '%s'='%s'", sender, key.hex(), value - ) - self.storage[key] = value - return True - - def rpc_find_node(self, sender, nodeid, key): - log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16)) - source = create_kad_peerinfo(nodeid, sender[0], sender[1]) - - self.welcome_if_new(source) - node = create_kad_peerinfo(key) - neighbors = self.router.find_neighbors(node, exclude=source) - return list(map(tuple, neighbors)) - - def rpc_find_value(self, sender, nodeid, key): - source = create_kad_peerinfo(nodeid, sender[0], sender[1]) - - self.welcome_if_new(source) - value = self.storage.get(key, None) - if value is None: - return self.rpc_find_node(sender, nodeid, key) - return {"value": value} - - def rpc_add_provider(self, sender, nodeid, key, provider_id): - """rpc when receiving an add_provider call should validate received - PeerInfo matches sender nodeid if it does, receipient must store a - record in its datastore we store a map of content_id to peer_id (non - xor)""" - if nodeid == provider_id: - log.info( - "adding provider %s for key %s in local table", provider_id, str(key) - ) - self.storage[key] = provider_id - return True - return False - - def rpc_get_providers(self, sender, key): - """rpc when receiving a get_providers call should look up key in data - store and respond with records plus a list of closer peers in its - routing table.""" - providers = [] - record = self.storage.get(key, None) - - if record: - providers.append(record) - - keynode = create_kad_peerinfo(key) - neighbors = self.router.find_neighbors(keynode) - for neighbor in neighbors: - if neighbor.peer_id_bytes != record: - providers.append(neighbor.peer_id_bytes) - - return providers - - async def call_find_node(self, node_to_ask, node_to_find): - address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_node( - address, self.source_node.peer_id_bytes, node_to_find.peer_id_bytes - ) - return self.handle_call_response(result, node_to_ask) - - async def call_find_value(self, node_to_ask, node_to_find): - address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_value( - address, self.source_node.peer_id_bytes, node_to_find.peer_id_bytes - ) - return self.handle_call_response(result, node_to_ask) - - async def call_ping(self, node_to_ask): - address = (node_to_ask.ip, node_to_ask.port) - result = await self.ping(address, self.source_node.peer_id_bytes) - return self.handle_call_response(result, node_to_ask) - - async def call_store(self, node_to_ask, key, value): - address = (node_to_ask.ip, node_to_ask.port) - result = await self.store(address, self.source_node.peer_id_bytes, key, value) - return self.handle_call_response(result, node_to_ask) - - async def call_add_provider(self, node_to_ask, key, provider_id): - address = (node_to_ask.ip, node_to_ask.port) - result = await self.add_provider( - address, self.source_node.peer_id_bytes, key, provider_id - ) - - return self.handle_call_response(result, node_to_ask) - - async def call_get_providers(self, node_to_ask, key): - address = (node_to_ask.ip, node_to_ask.port) - result = await self.get_providers(address, key) - return self.handle_call_response(result, node_to_ask) - - def welcome_if_new(self, node): - """ - Given a new node, send it all the keys/values it should be storing, - then add it to the routing table. - - @param node: A new node that just joined (or that we just found out - about). - - Process: - For each key in storage, get k closest nodes. If newnode is closer - than the furtherst in that list, and the node for this server - is closer than the closest in that list, then store the key/value - on the new node (per section 2.5 of the paper) - """ - if not self.router.is_new_node(node): - return - - log.info("never seen %s before, adding to router", node) - for key, value in self.storage: - keynode = create_kad_peerinfo(key) - neighbors = self.router.find_neighbors(keynode) - if neighbors: - last = neighbors[-1].distance_to(keynode) - new_node_close = node.distance_to(keynode) < last - first = neighbors[0].distance_to(keynode) - this_closest = self.source_node.distance_to(keynode) < first - if not neighbors or (new_node_close and this_closest): - asyncio.ensure_future(self.call_store(node, key, value)) - self.router.add_contact(node) - - def handle_call_response(self, result, node): - """ - If we get a response, add the node to the routing table. - - If we get no response, make sure it's removed from the routing - table. - """ - if not result[0]: - log.warning("no response from %s, removing from router", node) - self.router.remove_contact(node) - return result - - log.info("got successful response from %s", node) - self.welcome_if_new(node) - return result diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py deleted file mode 100644 index f65c90a..0000000 --- a/libp2p/kademlia/routing.py +++ /dev/null @@ -1,184 +0,0 @@ -import asyncio -from collections import OrderedDict -import heapq -import operator -import time - -from .utils import OrderedSet, bytes_to_bit_string, shared_prefix - - -class KBucket: - """each node keeps a list of (ip, udp_port, node_id) for nodes of distance - between 2^i and 2^(i+1) this list that every node keeps is a k-bucket each - k-bucket implements a last seen eviction policy except that live nodes are - never removed.""" - - def __init__(self, rangeLower, rangeUpper, ksize): - self.range = (rangeLower, rangeUpper) - self.nodes = OrderedDict() - self.replacement_nodes = OrderedSet() - self.touch_last_updated() - self.ksize = ksize - - def touch_last_updated(self): - self.last_updated = time.monotonic() - - def get_nodes(self): - return list(self.nodes.values()) - - def split(self): - midpoint = (self.range[0] + self.range[1]) / 2 - one = KBucket(self.range[0], midpoint, self.ksize) - two = KBucket(midpoint + 1, self.range[1], self.ksize) - for node in self.nodes.values(): - bucket = one if node.xor_id <= midpoint else two - bucket.nodes[node.peer_id_bytes] = node - return (one, two) - - def remove_node(self, node): - if node.peer_id_bytes not in self.nodes: - return - - # delete node, and see if we can add a replacement - del self.nodes[node.peer_id_bytes] - if self.replacement_nodes: - newnode = self.replacement_nodes.pop() - self.nodes[newnode.peer_id_bytes] = newnode - - def has_in_range(self, node): - return self.range[0] <= node.xor_id <= self.range[1] - - def is_new_node(self, node): - return node.peer_id_bytes not in self.nodes - - def add_node(self, node): - """ - Add a C{Node} to the C{KBucket}. Return True if successful, False if - the bucket is full. - - If the bucket is full, keep track of node in a replacement list, - per section 4.1 of the paper. - """ - if node.peer_id_bytes in self.nodes: - del self.nodes[node.peer_id_bytes] - self.nodes[node.peer_id_bytes] = node - elif len(self) < self.ksize: - self.nodes[node.peer_id_bytes] = node - else: - self.replacement_nodes.push(node) - return False - return True - - def depth(self): - vals = self.nodes.values() - sprefix = shared_prefix([bytes_to_bit_string(n.peer_id_bytes) for n in vals]) - return len(sprefix) - - def head(self): - return list(self.nodes.values())[0] - - def __getitem__(self, node_id): - return self.nodes.get(node_id, None) - - def __len__(self): - return len(self.nodes) - - -class TableTraverser: - def __init__(self, table, startNode): - index = table.get_bucket_for(startNode) - table.buckets[index].touch_last_updated() - self.current_nodes = table.buckets[index].get_nodes() - self.left_buckets = table.buckets[:index] - self.right_buckets = table.buckets[(index + 1) :] - self.left = True - - def __iter__(self): - return self - - def __next__(self): - """Pop an item from the left subtree, then right, then left, etc.""" - if self.current_nodes: - return self.current_nodes.pop() - - if self.left and self.left_buckets: - self.current_nodes = self.left_buckets.pop().get_nodes() - self.left = False - return next(self) - - if self.right_buckets: - self.current_nodes = self.right_buckets.pop(0).get_nodes() - self.left = True - return next(self) - - raise StopIteration - - -class RoutingTable: - def __init__(self, protocol, ksize, node): - """ - @param node: The node that represents this server. It won't - be added to the routing table, but will be needed later to - determine which buckets to split or not. - """ - self.node = node - self.protocol = protocol - self.ksize = ksize - self.flush() - - def flush(self): - self.buckets = [KBucket(0, 2 ** 160, self.ksize)] - - def split_bucket(self, index): - one, two = self.buckets[index].split() - self.buckets[index] = one - self.buckets.insert(index + 1, two) - - def lonely_buckets(self): - """Get all of the buckets that haven't been updated in over an hour.""" - hrago = time.monotonic() - 3600 - return [b for b in self.buckets if b.last_updated < hrago] - - def remove_contact(self, node): - index = self.get_bucket_for(node) - self.buckets[index].remove_node(node) - - def is_new_node(self, node): - index = self.get_bucket_for(node) - return self.buckets[index].is_new_node(node) - - def add_contact(self, node): - index = self.get_bucket_for(node) - bucket = self.buckets[index] - - # this will succeed unless the bucket is full - if bucket.add_node(node): - return - - # Per section 4.2 of paper, split if the bucket has the node - # in its range or if the depth is not congruent to 0 mod 5 - if bucket.has_in_range(self.node) or bucket.depth() % 5 != 0: - self.split_bucket(index) - self.add_contact(node) - else: - asyncio.ensure_future(self.protocol.call_ping(bucket.head())) - - def get_bucket_for(self, node): - """Get the index of the bucket that the given node would fall into.""" - for index, bucket in enumerate(self.buckets): - if node.xor_id < bucket.range[1]: - return index - # we should never be here, but make linter happy - return None - - def find_neighbors(self, node, k=None, exclude=None): - k = k or self.ksize - nodes = [] - for neighbor in TableTraverser(self, node): - notexcluded = exclude is None or not neighbor.same_home_as(exclude) - if neighbor.peer_id_bytes != node.peer_id_bytes and notexcluded: - heapq.heappush(nodes, (node.distance_to(neighbor), neighbor)) - if len(nodes) == k: - break - - return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes))) diff --git a/libp2p/kademlia/rpc.proto b/libp2p/kademlia/rpc.proto deleted file mode 100644 index 96c14a8..0000000 --- a/libp2p/kademlia/rpc.proto +++ /dev/null @@ -1,78 +0,0 @@ -// Record represents a dht record that contains a value -// for a key value pair -message Record { - // The key that references this record - bytes key = 1; - - // The actual value this record is storing - bytes value = 2; - - // Note: These fields were removed from the Record message - // hash of the authors public key - //optional string author = 3; - // A PKI signature for the key+value+author - //optional bytes signature = 4; - - // Time the record was received, set by receiver - string timeReceived = 5; -}; - -message Message { - enum MessageType { - PUT_VALUE = 0; - GET_VALUE = 1; - ADD_PROVIDER = 2; - GET_PROVIDERS = 3; - FIND_NODE = 4; - PING = 5; - } - - enum ConnectionType { - // sender does not have a connection to peer, and no extra information (default) - NOT_CONNECTED = 0; - - // sender has a live connection to peer - CONNECTED = 1; - - // sender recently connected to peer - CAN_CONNECT = 2; - - // sender recently tried to connect to peer repeatedly but failed to connect - // ("try" here is loose, but this should signal "made strong effort, failed") - CANNOT_CONNECT = 3; - } - - message Peer { - // ID of a given peer. - bytes id = 1; - - // multiaddrs for a given peer - repeated bytes addrs = 2; - - // used to signal the sender's connection capabilities to the peer - ConnectionType connection = 3; - } - - // defines what type of message it is. - MessageType type = 1; - - // defines what coral cluster level this query/response belongs to. - // in case we want to implement coral's cluster rings in the future. - int32 clusterLevelRaw = 10; // NOT USED - - // Used to specify the key associated with this message. - // PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS - bytes key = 2; - - // Used to return a value - // PUT_VALUE, GET_VALUE - Record record = 3; - - // Used to return peers closer to a key in a query - // GET_VALUE, GET_PROVIDERS, FIND_NODE - repeated Peer closerPeers = 8; - - // Used to return Providers - // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS - repeated Peer providerPeers = 9; -} \ No newline at end of file diff --git a/libp2p/kademlia/storage.py b/libp2p/kademlia/storage.py deleted file mode 100644 index 014853c..0000000 --- a/libp2p/kademlia/storage.py +++ /dev/null @@ -1,93 +0,0 @@ -from abc import ABC, abstractmethod -from collections import OrderedDict -from itertools import takewhile -import operator -import time - - -class IStorage(ABC): - """ - Local storage for this node. - - IStorage implementations of get must return the same type as put in - by set - """ - - @abstractmethod - def __setitem__(self, key, value): - """Set a key to the given value.""" - - @abstractmethod - def __getitem__(self, key): - """ - Get the given key. - - If item doesn't exist, raises C{KeyError} - """ - - @abstractmethod - def get(self, key, default=None): - """ - Get given key. - - If not found, return default. - """ - - @abstractmethod - def iter_older_than(self, seconds_old): - """Return the an iterator over (key, value) tuples for items older than - the given seconds_old.""" - - @abstractmethod - def __iter__(self): - """Get the iterator for this storage, should yield tuple of (key, - value)""" - - -class ForgetfulStorage(IStorage): - def __init__(self, ttl=604800): - """By default, max age is a week.""" - self.data = OrderedDict() - self.ttl = ttl - - def __setitem__(self, key, value): - if key in self.data: - del self.data[key] - self.data[key] = (time.monotonic(), value) - self.cull() - - def cull(self): - for _, _ in self.iter_older_than(self.ttl): - self.data.popitem(last=False) - - def get(self, key, default=None): - self.cull() - if key in self.data: - return self[key] - return default - - def __getitem__(self, key): - self.cull() - return self.data[key][1] - - def __repr__(self): - self.cull() - return repr(self.data) - - def iter_older_than(self, seconds_old): - min_birthday = time.monotonic() - seconds_old - zipped = self._triple_iter() - matches = takewhile(lambda r: min_birthday >= r[1], zipped) - return list(map(operator.itemgetter(0, 2), matches)) - - def _triple_iter(self): - ikeys = self.data.keys() - ibirthday = map(operator.itemgetter(0), self.data.values()) - ivalues = map(operator.itemgetter(1), self.data.values()) - return zip(ikeys, ibirthday, ivalues) - - def __iter__(self): - self.cull() - ikeys = self.data.keys() - ivalues = map(operator.itemgetter(1), self.data.values()) - return zip(ikeys, ivalues) diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py deleted file mode 100644 index f18a671..0000000 --- a/libp2p/kademlia/utils.py +++ /dev/null @@ -1,56 +0,0 @@ -"""General catchall for functions that don't make sense as methods.""" -import asyncio -import hashlib -import operator - - -async def gather_dict(dic): - cors = list(dic.values()) - results = await asyncio.gather(*cors) - return dict(zip(dic.keys(), results)) - - -def digest(string): - if not isinstance(string, bytes): - string = str(string).encode("utf8") - return hashlib.sha1(string).digest() - - -class OrderedSet(list): - """ - Acts like a list in all ways, except in the behavior of the. - - :meth:`push` method. - """ - - def push(self, thing): - """ - 1. If the item exists in the list, it's removed - 2. The item is pushed to the end of the list - """ - if thing in self: - self.remove(thing) - self.append(thing) - - -def shared_prefix(args): - """ - Find the shared prefix between the strings. - - For instance: - - sharedPrefix(['blahblah', 'blahwhat']) - - returns 'blah'. - """ - i = 0 - while i < min(map(len, args)): - if len(set(map(operator.itemgetter(i), args))) != 1: - break - i += 1 - return args[0][:i] - - -def bytes_to_bit_string(bites): - bits = [bin(bite)[2:].rjust(8, "0") for bite in bites] - return "".join(bits) diff --git a/libp2p/routing/kademlia/__init__.py b/libp2p/routing/kademlia/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/libp2p/routing/kademlia/kademlia_content_router.py b/libp2p/routing/kademlia/kademlia_content_router.py deleted file mode 100644 index b623a25..0000000 --- a/libp2p/routing/kademlia/kademlia_content_router.py +++ /dev/null @@ -1,21 +0,0 @@ -from typing import Iterable - -from libp2p.peer.peerinfo import PeerInfo -from libp2p.routing.interfaces import IContentRouting - - -class KadmeliaContentRouter(IContentRouting): - def provide(self, cid: bytes, announce: bool = True) -> None: - """ - Provide adds the given cid to the content routing system. - - If announce is True, it also announces it, otherwise it is just - kept in the local accounting of which objects are being - provided. - """ - # the DHT finds the closest peers to `key` using the `FIND_NODE` RPC - # then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers. - - def find_provider_iter(self, cid: bytes, count: int) -> Iterable[PeerInfo]: - """Search for peers who are able to provide a given key returns an - iterator of peer.PeerInfo.""" diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py deleted file mode 100644 index 32f2075..0000000 --- a/libp2p/routing/kademlia/kademlia_peer_router.py +++ /dev/null @@ -1,43 +0,0 @@ -import json - -import multiaddr - -from libp2p.kademlia.network import KademliaServer -from libp2p.peer.id import ID -from libp2p.peer.peerinfo import PeerInfo -from libp2p.routing.interfaces import IPeerRouting - - -class KadmeliaPeerRouter(IPeerRouting): - server: KademliaServer - - def __init__(self, dht_server: KademliaServer) -> None: - self.server = dht_server - - async def find_peer(self, peer_id: ID) -> PeerInfo: - """ - Find a specific peer. - - :param peer_id: peer to search for - :return: PeerInfo of specified peer - """ - # switching peer_id to xor_id used by kademlia as node_id - xor_id = peer_id.xor_id - # ignore type for kad - value = await self.server.get(xor_id) # type: ignore - return ( - peer_info_from_str(value) if value else None - ) # TODO: should raise error if None? - - -def peer_info_to_str(peer_info: PeerInfo) -> str: - return json.dumps( - [peer_info.peer_id.to_string(), list(map(lambda a: str(a), peer_info.addrs))] - ) - - -def peer_info_from_str(string: str) -> PeerInfo: - peer_id, raw_addrs = json.loads(string) - return PeerInfo( - ID.from_base58(peer_id), list(map(lambda a: multiaddr.Multiaddr(a), raw_addrs)) - ) diff --git a/libp2p/tools/utils.py b/libp2p/tools/utils.py index 84e3edf..d779266 100644 --- a/libp2p/tools/utils.py +++ b/libp2p/tools/utils.py @@ -1,16 +1,16 @@ -from typing import List, Sequence, Tuple +from typing import Dict, Sequence, Tuple, cast import multiaddr from libp2p import new_node from libp2p.host.basic_host import BasicHost from libp2p.host.host_interface import IHost -from libp2p.kademlia.network import KademliaServer +from libp2p.host.routed_host import RoutedHost from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.swarm import Swarm -from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr from libp2p.routing.interfaces import IPeerRouting -from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter from libp2p.typing import StreamHandlerFn, TProtocol from .constants import MAX_READ_LEN @@ -47,35 +47,6 @@ async def set_up_nodes_by_transport_opt( return tuple(nodes_list) -async def set_up_nodes_by_transport_and_disc_opt( - transport_disc_opt_list: Sequence[Tuple[Sequence[str], IPeerRouting]] -) -> Tuple[BasicHost, ...]: - nodes_list = [] - for transport_opt, disc_opt in transport_disc_opt_list: - node = await new_node(transport_opt=transport_opt, disc_opt=disc_opt) - await node.get_network().listen(multiaddr.Multiaddr(transport_opt[0])) - nodes_list.append(node) - return tuple(nodes_list) - - -async def set_up_routers( - router_confs: Tuple[int, int] = (0, 0) -) -> List[KadmeliaPeerRouter]: - """The default ``router_confs`` selects two free ports local to this - machine.""" - bootstrap_node = KademliaServer() # type: ignore - await bootstrap_node.listen(router_confs[0]) - - routers = [KadmeliaPeerRouter(bootstrap_node)] - for port in router_confs[1:]: - node = KademliaServer() # type: ignore - await node.listen(port) - - await node.bootstrap_node(bootstrap_node.address) - routers.append(KadmeliaPeerRouter(node)) - return routers - - async def echo_stream_handler(stream: INetStream) -> None: while True: read_string = (await stream.read(MAX_READ_LEN)).decode() @@ -95,3 +66,33 @@ async def perform_two_host_set_up( # Associate the peer with local ip address (see default parameters of Libp2p()) node_a.get_peerstore().add_addrs(node_b.get_id(), node_b.get_addrs(), 10) return node_a, node_b + + +class DummyRouter(IPeerRouting): + _routing_table: Dict[ID, PeerInfo] + + def __init__(self) -> None: + self._routing_table = dict() + + async def find_peer(self, peer_id: ID) -> PeerInfo: + return self._routing_table.get(peer_id, None) + + +async def set_up_routed_hosts() -> Tuple[RoutedHost, RoutedHost]: + router_a, router_b = DummyRouter(), DummyRouter() + transport = "/ip4/127.0.0.1/tcp/0" + host_a = await new_node(transport_opt=[transport], disc_opt=router_a) + host_b = await new_node(transport_opt=[transport], disc_opt=router_b) + + address = multiaddr.Multiaddr(transport) + await host_a.get_network().listen(address) + await host_b.get_network().listen(address) + + mock_routing_table = { + host_a.get_id(): PeerInfo(host_a.get_id(), host_a.get_addrs()), + host_b.get_id(): PeerInfo(host_b.get_id(), host_b.get_addrs()), + } + + router_a._routing_table = router_b._routing_table = mock_routing_table + + return cast(RoutedHost, host_a), cast(RoutedHost, host_b) diff --git a/mypy.ini b/mypy.ini index fffd2aa..653e3e2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,6 +15,3 @@ warn_redundant_casts = True warn_return_any = False warn_unused_configs = True warn_unreachable = True - -[mypy-libp2p.kademlia.*] -ignore_errors = True diff --git a/tests/host/test_routed_host.py b/tests/host/test_routed_host.py index 9083d3f..271246c 100644 --- a/tests/host/test_routed_host.py +++ b/tests/host/test_routed_host.py @@ -4,71 +4,30 @@ import pytest from libp2p.host.exceptions import ConnectionFailure from libp2p.peer.peerinfo import PeerInfo -from libp2p.routing.kademlia.kademlia_peer_router import peer_info_to_str -from libp2p.tools.utils import ( - set_up_nodes_by_transport_and_disc_opt, - set_up_nodes_by_transport_opt, - set_up_routers, -) +from libp2p.tools.utils import set_up_nodes_by_transport_opt, set_up_routed_hosts @pytest.mark.asyncio async def test_host_routing_success(): - routers = await set_up_routers() - transports = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] - transport_disc_opt_list = zip(transports, routers) - (host_a, host_b) = await set_up_nodes_by_transport_and_disc_opt( - transport_disc_opt_list - ) - - # Set routing info - await routers[0].server.set( - host_a.get_id().xor_id, - peer_info_to_str(PeerInfo(host_a.get_id(), host_a.get_addrs())), - ) - await routers[1].server.set( - host_b.get_id().xor_id, - peer_info_to_str(PeerInfo(host_b.get_id(), host_b.get_addrs())), - ) - + host_a, host_b = await set_up_routed_hosts() # forces to use routing as no addrs are provided await host_a.connect(PeerInfo(host_b.get_id(), [])) await host_b.connect(PeerInfo(host_a.get_id(), [])) # Clean up await asyncio.gather(*[host_a.close(), host_b.close()]) - routers[0].server.stop() - routers[1].server.stop() @pytest.mark.asyncio async def test_host_routing_fail(): - routers = await set_up_routers() - transports = [["/ip4/127.0.0.1/tcp/0"], ["/ip4/127.0.0.1/tcp/0"]] - transport_disc_opt_list = zip(transports, routers) - (host_a, host_b) = await set_up_nodes_by_transport_and_disc_opt( - transport_disc_opt_list - ) - - host_c = (await set_up_nodes_by_transport_opt([["/ip4/127.0.0.1/tcp/0"]]))[0] - - # Set routing info - await routers[0].server.set( - host_a.get_id().xor_id, - peer_info_to_str(PeerInfo(host_a.get_id(), host_a.get_addrs())), - ) - await routers[1].server.set( - host_b.get_id().xor_id, - peer_info_to_str(PeerInfo(host_b.get_id(), host_b.get_addrs())), - ) + host_a, host_b = await set_up_routed_hosts() + basic_host_c = (await set_up_nodes_by_transport_opt([["/ip4/127.0.0.1/tcp/0"]]))[0] # routing fails because host_c does not use routing with pytest.raises(ConnectionFailure): - await host_a.connect(PeerInfo(host_c.get_id(), [])) + await host_a.connect(PeerInfo(basic_host_c.get_id(), [])) with pytest.raises(ConnectionFailure): - await host_b.connect(PeerInfo(host_c.get_id(), [])) + await host_b.connect(PeerInfo(basic_host_c.get_id(), [])) # Clean up - await asyncio.gather(*[host_a.close(), host_b.close(), host_c.close()]) - routers[0].server.stop() - routers[1].server.stop() + await asyncio.gather(*[host_a.close(), host_b.close(), basic_host_c.close()]) diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py deleted file mode 100644 index 655d471..0000000 --- a/tests/kademlia/test_basic.py +++ /dev/null @@ -1,79 +0,0 @@ -import pytest - -from libp2p.kademlia.network import KademliaServer - - -@pytest.mark.asyncio -async def test_example(): - node_a = KademliaServer() - await node_a.listen() - - node_b = KademliaServer() - await node_b.listen() - - # Bootstrap the node by connecting to other known nodes, in this case - # replace 123.123.123.123 with the IP of another node and optionally - # give as many ip/port combos as you can for other nodes. - await node_b.bootstrap([node_a.address]) - - # set a value for the key "my-key" on the network - value = "my-value" - key = "my-key" - await node_b.set(key, value) - - # get the value associated with "my-key" from the network - assert await node_b.get(key) == value - assert await node_a.get(key) == value - - -@pytest.mark.parametrize("nodes_nr", [(2 ** i) for i in range(2, 5)]) -@pytest.mark.asyncio -async def test_multiple_nodes_bootstrap_set_get(nodes_nr): - - node_bootstrap = KademliaServer() - await node_bootstrap.listen(3000 + nodes_nr * 2) - - nodes = [] - for i in range(nodes_nr): - node = KademliaServer() - addrs = [("127.0.0.1", 3000 + nodes_nr * 2)] - await node.listen(3001 + i + nodes_nr * 2) - await node.bootstrap(addrs) - nodes.append(node) - - for i, node in enumerate(nodes): - # set a value for the key "my-key" on the network - value = "my awesome value %d" % i - key = "set from %d" % i - await node.set(key, value) - - for i in range(nodes_nr): - for node in nodes: - value = "my awesome value %d" % i - key = "set from %d" % i - assert await node.get(key) == value - - -@pytest.mark.parametrize("nodes_nr", [(2 ** i) for i in range(2, 5)]) -@pytest.mark.asyncio -async def test_multiple_nodes_set_bootstrap_get(nodes_nr): - node_bootstrap = KademliaServer() - await node_bootstrap.listen(2000 + nodes_nr * 2) - - nodes = [] - for i in range(nodes_nr): - node = KademliaServer() - addrs = [("127.0.0.1", 2000 + nodes_nr * 2)] - await node.listen(2001 + i + nodes_nr * 2) - await node.bootstrap(addrs) - - value = "my awesome value %d" % i - key = "set from %d" % i - await node.set(key, value) - nodes.append(node) - - for i in range(nodes_nr): - for node in nodes: - value = "my awesome value %d" % i - key = "set from %d" % i - assert await node.get(key) == value diff --git a/tests/kademlia/test_providers.py b/tests/kademlia/test_providers.py deleted file mode 100644 index 45993d9..0000000 --- a/tests/kademlia/test_providers.py +++ /dev/null @@ -1,30 +0,0 @@ -import pytest - -from libp2p.kademlia.network import KademliaServer - - -@pytest.mark.asyncio -async def test_example(): - node_a = KademliaServer() - await node_a.listen() - - node_b = KademliaServer() - await node_b.listen() - await node_b.bootstrap([node_a.address]) - - key = "hello" - value = "world" - await node_b.set(key, value) - await node_b.provide("hello") - - providers = await node_b.get_providers("hello") - - # bmuller's handle_call_response wraps - # every rpc call result in a list of tuples - # [(True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F'])] - first_tuple = providers[0] - # (True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F']) - first_providers = first_tuple[1] - # [b'\xf9\xa1\xf5\x10a\xe5\xe0F'] - first_provider = first_providers[0] - assert node_b.node.peer_id_bytes == first_provider diff --git a/tests/routing/test_kad_peer_router.py b/tests/routing/test_kad_peer_router.py deleted file mode 100644 index d3f3836..0000000 --- a/tests/routing/test_kad_peer_router.py +++ /dev/null @@ -1,75 +0,0 @@ -import pytest - -from libp2p.kademlia.network import KademliaServer -from libp2p.peer.id import ID -from libp2p.routing.kademlia.kademlia_peer_router import ( - KadmeliaPeerRouter, - peer_info_to_str, -) - - -@pytest.mark.asyncio -async def test_simple_two_nodes(): - node_a = KademliaServer() - await node_a.listen(5678) - - node_b = KademliaServer() - await node_b.listen(5679) - - node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)]) - node_a_kad_peerinfo = node_a_value[0] - await node_a.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo)) - - router = KadmeliaPeerRouter(node_b) - returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes)) - assert returned_info == node_a_kad_peerinfo - - -@pytest.mark.asyncio -async def test_simple_three_nodes(): - node_a = KademliaServer() - await node_a.listen(5701) - - node_b = KademliaServer() - await node_b.listen(5702) - - node_c = KademliaServer() - await node_c.listen(5703) - - node_a_value = await node_b.bootstrap([("127.0.0.1", 5701)]) - node_a_kad_peerinfo = node_a_value[0] - - await node_c.bootstrap([("127.0.0.1", 5702)]) - await node_a.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo)) - - router = KadmeliaPeerRouter(node_c) - returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes)) - assert returned_info == node_a_kad_peerinfo - - -@pytest.mark.asyncio -async def test_simple_four_nodes(): - node_a = KademliaServer() - await node_a.listen(5801) - - node_b = KademliaServer() - await node_b.listen(5802) - - node_c = KademliaServer() - await node_c.listen(5803) - - node_d = KademliaServer() - await node_d.listen(5804) - - node_a_value = await node_b.bootstrap([("127.0.0.1", 5801)]) - node_a_kad_peerinfo = node_a_value[0] - - await node_c.bootstrap([("127.0.0.1", 5802)]) - - await node_d.bootstrap([("127.0.0.1", 5803)]) - - await node_b.set(node_a_kad_peerinfo.xor_id, peer_info_to_str(node_a_kad_peerinfo)) - - router = KadmeliaPeerRouter(node_d) - returned_info = await router.find_peer(ID(node_a_kad_peerinfo.peer_id_bytes)) - assert returned_info == node_a_kad_peerinfo