From c24a279d2d83575ecb0d02c03a1fcfbd742601c8 Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Pereira Martins Date: Tue, 15 Jan 2019 18:41:41 +0100 Subject: [PATCH 01/10] update kadmelia lib --- libp2p/kademlia/__init__.py | 2 +- libp2p/kademlia/crawling.py | 85 ++++++++++++++++--------------- libp2p/kademlia/network.py | 99 ++++++++++++++++++------------------- libp2p/kademlia/node.py | 38 +++++++------- libp2p/kademlia/protocol.py | 98 ++++++++++++++++++------------------ libp2p/kademlia/routing.py | 95 +++++++++++++++++------------------ libp2p/kademlia/storage.py | 33 ++++++------- libp2p/kademlia/utils.py | 18 +++---- 8 files changed, 233 insertions(+), 235 deletions(-) diff --git a/libp2p/kademlia/__init__.py b/libp2p/kademlia/__init__.py index 7999eca..bce7672 100644 --- a/libp2p/kademlia/__init__.py +++ b/libp2p/kademlia/__init__.py @@ -2,4 +2,4 @@ Kademlia is a Python implementation of the Kademlia protocol which utilizes the asyncio library. """ -__version__ = "1.1" +__version__ = "2.0" diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index fc5d918..6dabd10 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -1,16 +1,19 @@ from collections import Counter import logging -from .kademlia.node import Node, NodeHeap -from .kademlia.utils import gather_dict - -log = logging.getLogger(__name__) +from .node import Node, NodeHeap +from .utils import gather_dict +log = logging.getLogger(__name__) # pylint: disable=invalid-name + + +# pylint: disable=too-few-public-methods class SpiderCrawl: """ Crawl the network and look for given 160-bit keys. """ + def __init__(self, protocol, node, peers, ksize, alpha): """ Create a new C{SpiderCrawl}er. @@ -29,7 +32,7 @@ class SpiderCrawl: self.alpha = alpha self.node = node self.nearest = NodeHeap(self.node, self.ksize) - self.lastIDsCrawled = [] + self.last_ids_crawled = [] log.info("creating spider with peers: %s", peers) self.nearest.push(peers) @@ -38,7 +41,7 @@ class SpiderCrawl: Get either a value or list of nodes. Args: - rpcmethod: The protocol's callfindValue or callFindNode. + rpcmethod: The protocol's callfindValue or call_find_node. The process: 1. calls find_* to current ALPHA nearest not already queried nodes, @@ -51,18 +54,18 @@ class SpiderCrawl: """ log.info("crawling network with nearest: %s", str(tuple(self.nearest))) count = self.alpha - if self.nearest.getIDs() == self.lastIDsCrawled: + if self.nearest.get_ids() == self.last_ids_crawled: count = len(self.nearest) - self.lastIDsCrawled = self.nearest.getIDs() + self.last_ids_crawled = self.nearest.get_ids() - ds = {} - for peer in self.nearest.getUncontacted()[:count]: - ds[peer.id] = rpcmethod(peer, self.node) - self.nearest.markContacted(peer) - found = await gather_dict(ds) - return await self._nodesFound(found) + dicts = {} + for peer in self.nearest.get_uncontacted()[:count]: + dicts[peer.id] = rpcmethod(peer, self.node) + self.nearest.mark_contacted(peer) + found = await gather_dict(dicts) + return await self._nodes_found(found) - async def _nodesFound(self, responses): + async def _nodes_found(self, responses): raise NotImplementedError @@ -71,55 +74,55 @@ class ValueSpiderCrawl(SpiderCrawl): SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) # keep track of the single nearest node without value - per # section 2.3 so we can set the key there if found - self.nearestWithoutValue = NodeHeap(self.node, 1) + self.nearest_without_value = NodeHeap(self.node, 1) async def find(self): """ Find either the closest nodes or the value requested. """ - return await self._find(self.protocol.callFindValue) + return await self._find(self.protocol.call_find_value) - async def _nodesFound(self, responses): + async def _nodes_found(self, responses): """ Handle the result of an iteration in _find. """ toremove = [] - foundValues = [] + found_values = [] for peerid, response in responses.items(): response = RPCFindResponse(response) if not response.happened(): toremove.append(peerid) - elif response.hasValue(): - foundValues.append(response.getValue()) + elif response.has_value(): + found_values.append(response.get_value()) else: - peer = self.nearest.getNodeById(peerid) - self.nearestWithoutValue.push(peer) - self.nearest.push(response.getNodeList()) + peer = self.nearest.get_node(peerid) + self.nearest_without_value.push(peer) + self.nearest.push(response.get_node_list()) self.nearest.remove(toremove) - if len(foundValues) > 0: - return await self._handleFoundValues(foundValues) - if self.nearest.allBeenContacted(): + if found_values: + return await self._handle_found_values(found_values) + if self.nearest.have_contacted_all(): # not found! return None return await self.find() - async def _handleFoundValues(self, values): + async def _handle_found_values(self, values): """ We got some values! Exciting. But let's make sure they're all the same or freak out a little bit. Also, make sure we tell the nearest node that *didn't* have the value to store it. """ - valueCounts = Counter(values) - if len(valueCounts) != 1: + value_counts = Counter(values) + if len(value_counts) != 1: log.warning("Got multiple values for key %i: %s", self.node.long_id, str(values)) - value = valueCounts.most_common(1)[0][0] + value = value_counts.most_common(1)[0][0] - peerToSaveTo = self.nearestWithoutValue.popleft() - if peerToSaveTo is not None: - await self.protocol.callStore(peerToSaveTo, self.node.id, value) + peer = self.nearest_without_value.popleft() + if peer: + await self.protocol.call_store(peer, self.node.id, value) return value @@ -128,9 +131,9 @@ class NodeSpiderCrawl(SpiderCrawl): """ Find the closest nodes. """ - return await self._find(self.protocol.callFindNode) + return await self._find(self.protocol.call_find_node) - async def _nodesFound(self, responses): + async def _nodes_found(self, responses): """ Handle the result of an iteration in _find. """ @@ -140,10 +143,10 @@ class NodeSpiderCrawl(SpiderCrawl): if not response.happened(): toremove.append(peerid) else: - self.nearest.push(response.getNodeList()) + self.nearest.push(response.get_node_list()) self.nearest.remove(toremove) - if self.nearest.allBeenContacted(): + if self.nearest.have_contacted_all(): return list(self.nearest) return await self.find() @@ -166,13 +169,13 @@ class RPCFindResponse: """ return self.response[0] - def hasValue(self): + def has_value(self): return isinstance(self.response[1], dict) - def getValue(self): + def get_value(self): return self.response[1]['value'] - def getNodeList(self): + def get_node_list(self): """ Get the node list in the response. If there's no value, this should be set. diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index fb433db..bbfbad6 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -6,16 +6,17 @@ import pickle import asyncio import logging -from .kademlia.protocol import KademliaProtocol -from .kademlia.utils import digest -from .kademlia.storage import ForgetfulStorage -from .kademlia.node import Node -from .kademlia.crawling import ValueSpiderCrawl -from .kademlia.crawling import NodeSpiderCrawl +from .protocol import KademliaProtocol +from .utils import digest +from .storage import ForgetfulStorage +from .node import Node +from .crawling import ValueSpiderCrawl +from .crawling import NodeSpiderCrawl -log = logging.getLogger(__name__) +log = logging.getLogger(__name__) # pylint: disable=invalid-name +# pylint: disable=too-many-instance-attributes class Server: """ High level view of a node instance. This is the object that should be @@ -57,7 +58,7 @@ class Server: def _create_protocol(self): return self.protocol_class(self.node, self.storage, self.ksize) - def listen(self, port, interface='0.0.0.0'): + async def listen(self, port, interface='0.0.0.0'): """ Start listening on the given port. @@ -68,7 +69,7 @@ class Server: local_addr=(interface, port)) log.info("Node %i listening on %s:%i", self.node.long_id, interface, port) - self.transport, self.protocol = loop.run_until_complete(listen) + self.transport, self.protocol = await listen # finally, schedule refreshing table self.refresh_table() @@ -83,22 +84,22 @@ class Server: Refresh buckets that haven't had any lookups in the last hour (per section 2.3 of the paper). """ - ds = [] - for node_id in self.protocol.getRefreshIDs(): + results = [] + for node_id in self.protocol.get_refresh_ids(): node = Node(node_id) - nearest = self.protocol.router.findNeighbors(node, self.alpha) + nearest = self.protocol.router.find_neighbors(node, self.alpha) spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - ds.append(spider.find()) + results.append(spider.find()) # do our crawling - await asyncio.gather(*ds) + await asyncio.gather(*results) # now republish keys older than one hour - for dkey, value in self.storage.iteritemsOlderThan(3600): + for dkey, value in self.storage.iter_older_than(3600): await self.set_digest(dkey, value) - def bootstrappableNeighbors(self): + def bootstrappable_neighbors(self): """ Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for use as an argument to the bootstrap method. @@ -108,7 +109,7 @@ class Server: storing them if this server is going down for a while. When it comes back up, the list of nodes can be used to bootstrap. """ - neighbors = self.protocol.router.findNeighbors(self.node) + neighbors = self.protocol.router.find_neighbors(self.node) return [tuple(n)[-2:] for n in neighbors] async def bootstrap(self, addrs): @@ -145,8 +146,8 @@ class Server: if self.storage.get(dkey) is not None: return self.storage.get(dkey) node = Node(dkey) - nearest = self.protocol.router.findNeighbors(node) - if len(nearest) == 0: + nearest = self.protocol.router.find_neighbors(node) + if not nearest: log.warning("There are no known neighbors to get key %s", key) return None spider = ValueSpiderCrawl(self.protocol, node, nearest, @@ -172,8 +173,8 @@ class Server: """ node = Node(dkey) - nearest = self.protocol.router.findNeighbors(node) - if len(nearest) == 0: + nearest = self.protocol.router.find_neighbors(node) + if not nearest: log.warning("There are no known neighbors to set key %s", dkey.hex()) return False @@ -184,14 +185,14 @@ class Server: log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes))) # if this node is close too, then store here as well - biggest = max([n.distanceTo(node) for n in nodes]) - if self.node.distanceTo(node) < biggest: + biggest = max([n.distance_to(node) for n in nodes]) + if self.node.distance_to(node) < biggest: self.storage[dkey] = value - ds = [self.protocol.callStore(n, dkey, value) for n in nodes] + results = [self.protocol.call_store(n, dkey, value) for n in nodes] # return true only if at least one store call succeeded - return any(await asyncio.gather(*ds)) + return any(await asyncio.gather(*results)) - def saveState(self, fname): + def save_state(self, fname): """ Save the state of this node (the alpha/ksize/id/immediate neighbors) to a cache file with the given fname. @@ -201,29 +202,29 @@ class Server: 'ksize': self.ksize, 'alpha': self.alpha, 'id': self.node.id, - 'neighbors': self.bootstrappableNeighbors() + 'neighbors': self.bootstrappable_neighbors() } - if len(data['neighbors']) == 0: + if not data['neighbors']: log.warning("No known neighbors, so not writing to cache.") return - with open(fname, 'wb') as f: - pickle.dump(data, f) + with open(fname, 'wb') as file: + pickle.dump(data, file) @classmethod - def loadState(self, fname): + def load_state(cls, fname): """ Load the state of this node (the alpha/ksize/id/immediate neighbors) from a cache file with the given fname. """ log.info("Loading state from %s", fname) - with open(fname, 'rb') as f: - data = pickle.load(f) - s = Server(data['ksize'], data['alpha'], data['id']) - if len(data['neighbors']) > 0: - s.bootstrap(data['neighbors']) - return s + with open(fname, 'rb') as file: + data = pickle.load(file) + svr = Server(data['ksize'], data['alpha'], data['id']) + if data['neighbors']: + svr.bootstrap(data['neighbors']) + return svr - def saveStateRegularly(self, fname, frequency=600): + def save_state_regularly(self, fname, frequency=600): """ Save the state of node with a given regularity to the given filename. @@ -233,10 +234,10 @@ class Server: frequency: Frequency in seconds that the state should be saved. By default, 10 minutes. """ - self.saveState(fname) + self.save_state(fname) loop = asyncio.get_event_loop() self.save_state_loop = loop.call_later(frequency, - self.saveStateRegularly, + self.save_state_regularly, fname, frequency) @@ -246,13 +247,11 @@ def check_dht_value_type(value): Checks to see if the type of the value is a valid type for placing in the dht. """ - typeset = set( - [ - int, - float, - bool, - str, - bytes, - ] - ) - return type(value) in typeset + typeset = [ + int, + float, + bool, + str, + bytes + ] + return type(value) in typeset # pylint: disable=unidiomatic-typecheck diff --git a/libp2p/kademlia/node.py b/libp2p/kademlia/node.py index f130dcc..2f087f5 100644 --- a/libp2p/kademlia/node.py +++ b/libp2p/kademlia/node.py @@ -4,15 +4,15 @@ import heapq class Node: def __init__(self, node_id, ip=None, port=None): - self.id = node_id - self.ip = ip + self.id = node_id # pylint: disable=invalid-name + self.ip = ip # pylint: disable=invalid-name self.port = port self.long_id = int(node_id.hex(), 16) - def sameHomeAs(self, node): + def same_home_as(self, node): return self.ip == node.ip and self.port == node.port - def distanceTo(self, node): + def distance_to(self, node): """ Get the distance between this node and another. """ @@ -47,7 +47,7 @@ class NodeHeap: self.contacted = set() self.maxsize = maxsize - def remove(self, peerIDs): + def remove(self, peers): """ Remove a list of peer ids from this heap. Note that while this heap retains a constant visible size (based on the iterator), it's @@ -55,34 +55,32 @@ class NodeHeap: removal of nodes may not change the visible size as previously added nodes suddenly become visible. """ - peerIDs = set(peerIDs) - if len(peerIDs) == 0: + peers = set(peers) + if not peers: return nheap = [] for distance, node in self.heap: - if node.id not in peerIDs: + if node.id not in peers: heapq.heappush(nheap, (distance, node)) self.heap = nheap - def getNodeById(self, node_id): + def get_node(self, node_id): for _, node in self.heap: if node.id == node_id: return node return None - def allBeenContacted(self): - return len(self.getUncontacted()) == 0 + def have_contacted_all(self): + return len(self.get_uncontacted()) == 0 - def getIDs(self): + def get_ids(self): return [n.id for n in self] - def markContacted(self, node): + def mark_contacted(self, node): self.contacted.add(node.id) def popleft(self): - if len(self) > 0: - return heapq.heappop(self.heap)[1] - return None + return heapq.heappop(self.heap)[1] if self else None def push(self, nodes): """ @@ -95,7 +93,7 @@ class NodeHeap: for node in nodes: if node not in self: - distance = self.node.distanceTo(node) + distance = self.node.distance_to(node) heapq.heappush(self.heap, (distance, node)) def __len__(self): @@ -106,10 +104,10 @@ class NodeHeap: return iter(map(itemgetter(1), nodes)) def __contains__(self, node): - for _, n in self.heap: - if node.id == n.id: + for _, other in self.heap: + if node.id == other.id: return True return False - def getUncontacted(self): + def get_uncontacted(self): return [n for n in self if n.id not in self.contacted] diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 2bcf276..3573983 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -4,41 +4,41 @@ import logging from rpcudp.protocol import RPCProtocol -from .kademlia.node import Node -from .kademlia.routing import RoutingTable -from .kademlia.utils import digest +from .node import Node +from .routing import RoutingTable +from .utils import digest -log = logging.getLogger(__name__) +log = logging.getLogger(__name__) # pylint: disable=invalid-name class KademliaProtocol(RPCProtocol): - def __init__(self, sourceNode, storage, ksize): + def __init__(self, source_node, storage, ksize): RPCProtocol.__init__(self) - self.router = RoutingTable(self, ksize, sourceNode) + self.router = RoutingTable(self, ksize, source_node) self.storage = storage - self.sourceNode = sourceNode + self.source_node = source_node - def getRefreshIDs(self): + def get_refresh_ids(self): """ Get ids to search for to keep old buckets up to date. """ ids = [] - for bucket in self.router.getLonelyBuckets(): + for bucket in self.router.lonely_buckets(): rid = random.randint(*bucket.range).to_bytes(20, byteorder='big') ids.append(rid) return ids - def rpc_stun(self, sender): + def rpc_stun(self, sender): # pylint: disable=no-self-use return sender def rpc_ping(self, sender, nodeid): source = Node(nodeid, sender[0], sender[1]) - self.welcomeIfNewNode(source) - return self.sourceNode.id + self.welcome_if_new(source) + return self.source_node.id def rpc_store(self, sender, nodeid, key, value): source = Node(nodeid, sender[0], sender[1]) - self.welcomeIfNewNode(source) + self.welcome_if_new(source) log.debug("got a store request from %s, storing '%s'='%s'", sender, key.hex(), value) self.storage[key] = value @@ -48,42 +48,42 @@ class KademliaProtocol(RPCProtocol): log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16)) source = Node(nodeid, sender[0], sender[1]) - self.welcomeIfNewNode(source) + self.welcome_if_new(source) node = Node(key) - neighbors = self.router.findNeighbors(node, exclude=source) + neighbors = self.router.find_neighbors(node, exclude=source) return list(map(tuple, neighbors)) def rpc_find_value(self, sender, nodeid, key): source = Node(nodeid, sender[0], sender[1]) - self.welcomeIfNewNode(source) + self.welcome_if_new(source) value = self.storage.get(key, None) if value is None: return self.rpc_find_node(sender, nodeid, key) return {'value': value} - async def callFindNode(self, nodeToAsk, nodeToFind): - address = (nodeToAsk.ip, nodeToAsk.port) - result = await self.find_node(address, self.sourceNode.id, - nodeToFind.id) - return self.handleCallResponse(result, nodeToAsk) + async def call_find_node(self, node_to_ask, node_to_find): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.find_node(address, self.source_node.id, + node_to_find.id) + return self.handle_call_response(result, node_to_ask) - async def callFindValue(self, nodeToAsk, nodeToFind): - address = (nodeToAsk.ip, nodeToAsk.port) - result = await self.find_value(address, self.sourceNode.id, - nodeToFind.id) - return self.handleCallResponse(result, nodeToAsk) + async def call_find_value(self, node_to_ask, node_to_find): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.find_value(address, self.source_node.id, + node_to_find.id) + return self.handle_call_response(result, node_to_ask) - async def callPing(self, nodeToAsk): - address = (nodeToAsk.ip, nodeToAsk.port) - result = await self.ping(address, self.sourceNode.id) - return self.handleCallResponse(result, nodeToAsk) + async def call_ping(self, node_to_ask): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.ping(address, self.source_node.id) + return self.handle_call_response(result, node_to_ask) - async def callStore(self, nodeToAsk, key, value): - address = (nodeToAsk.ip, nodeToAsk.port) - result = await self.store(address, self.sourceNode.id, key, value) - return self.handleCallResponse(result, nodeToAsk) + async def call_store(self, node_to_ask, key, value): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.store(address, self.source_node.id, key, value) + return self.handle_call_response(result, node_to_ask) - def welcomeIfNewNode(self, node): + def welcome_if_new(self, node): """ Given a new node, send it all the keys/values it should be storing, then add it to the routing table. @@ -97,32 +97,32 @@ class KademliaProtocol(RPCProtocol): is closer than the closest in that list, then store the key/value on the new node (per section 2.5 of the paper) """ - if not self.router.isNewNode(node): + if not self.router.is_new_node(node): return log.info("never seen %s before, adding to router", node) - for key, value in self.storage.items(): + for key, value in self.storage: keynode = Node(digest(key)) - neighbors = self.router.findNeighbors(keynode) - if len(neighbors) > 0: - last = neighbors[-1].distanceTo(keynode) - newNodeClose = node.distanceTo(keynode) < last - first = neighbors[0].distanceTo(keynode) - thisNodeClosest = self.sourceNode.distanceTo(keynode) < first - if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): - asyncio.ensure_future(self.callStore(node, key, value)) - self.router.addContact(node) + neighbors = self.router.find_neighbors(keynode) + if neighbors: + last = neighbors[-1].distance_to(keynode) + new_node_close = node.distance_to(keynode) < last + first = neighbors[0].distance_to(keynode) + this_closest = self.source_node.distance_to(keynode) < first + if not neighbors or (new_node_close and this_closest): + asyncio.ensure_future(self.call_store(node, key, value)) + self.router.add_contact(node) - def handleCallResponse(self, result, node): + def handle_call_response(self, result, node): """ If we get a response, add the node to the routing table. If we get no response, make sure it's removed from the routing table. """ if not result[0]: log.warning("no response from %s, removing from router", node) - self.router.removeContact(node) + self.router.remove_contact(node) return result log.info("got successful response from %s", node) - self.welcomeIfNewNode(node) + self.welcome_if_new(node) return result diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index 67aad00..2ffd525 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -4,22 +4,21 @@ import operator import asyncio from collections import OrderedDict - -from .kademlia.utils import OrderedSet, sharedPrefix, bytesToBitString +from .utils import OrderedSet, shared_prefix, bytes_to_bit_string class KBucket: def __init__(self, rangeLower, rangeUpper, ksize): self.range = (rangeLower, rangeUpper) self.nodes = OrderedDict() - self.replacementNodes = OrderedSet() - self.touchLastUpdated() + self.replacement_nodes = OrderedSet() + self.touch_last_updated() self.ksize = ksize - def touchLastUpdated(self): - self.lastUpdated = time.monotonic() + def touch_last_updated(self): + self.last_updated = time.monotonic() - def getNodes(self): + def get_nodes(self): return list(self.nodes.values()) def split(self): @@ -31,23 +30,23 @@ class KBucket: bucket.nodes[node.id] = node return (one, two) - def removeNode(self, node): + def remove_node(self, node): if node.id not in self.nodes: return # delete node, and see if we can add a replacement del self.nodes[node.id] - if len(self.replacementNodes) > 0: - newnode = self.replacementNodes.pop() + if self.replacement_nodes: + newnode = self.replacement_nodes.pop() self.nodes[newnode.id] = newnode - def hasInRange(self, node): + def has_in_range(self, node): return self.range[0] <= node.long_id <= self.range[1] - def isNewNode(self, node): + def is_new_node(self, node): return node.id not in self.nodes - def addNode(self, node): + def add_node(self, node): """ Add a C{Node} to the C{KBucket}. Return True if successful, False if the bucket is full. @@ -61,14 +60,14 @@ class KBucket: elif len(self) < self.ksize: self.nodes[node.id] = node else: - self.replacementNodes.push(node) + self.replacement_nodes.push(node) return False return True def depth(self): vals = self.nodes.values() - sp = sharedPrefix([bytesToBitString(n.id) for n in vals]) - return len(sp) + sprefix = shared_prefix([bytes_to_bit_string(n.id) for n in vals]) + return len(sprefix) def head(self): return list(self.nodes.values())[0] @@ -82,11 +81,11 @@ class KBucket: class TableTraverser: def __init__(self, table, startNode): - index = table.getBucketFor(startNode) - table.buckets[index].touchLastUpdated() - self.currentNodes = table.buckets[index].getNodes() - self.leftBuckets = table.buckets[:index] - self.rightBuckets = table.buckets[(index + 1):] + index = table.get_bucket_for(startNode) + table.buckets[index].touch_last_updated() + self.current_nodes = table.buckets[index].get_nodes() + self.left_buckets = table.buckets[:index] + self.right_buckets = table.buckets[(index + 1):] self.left = True def __iter__(self): @@ -96,16 +95,16 @@ class TableTraverser: """ Pop an item from the left subtree, then right, then left, etc. """ - if len(self.currentNodes) > 0: - return self.currentNodes.pop() + if self.current_nodes: + return self.current_nodes.pop() - if self.left and len(self.leftBuckets) > 0: - self.currentNodes = self.leftBuckets.pop().getNodes() + if self.left and self.left_buckets: + self.current_nodes = self.left_buckets.pop().get_nodes() self.left = False return next(self) - if len(self.rightBuckets) > 0: - self.currentNodes = self.rightBuckets.pop(0).getNodes() + if self.right_buckets: + self.current_nodes = self.right_buckets.pop(0).get_nodes() self.left = True return next(self) @@ -127,58 +126,60 @@ class RoutingTable: def flush(self): self.buckets = [KBucket(0, 2 ** 160, self.ksize)] - def splitBucket(self, index): + def split_bucket(self, index): one, two = self.buckets[index].split() self.buckets[index] = one self.buckets.insert(index + 1, two) - def getLonelyBuckets(self): + def lonely_buckets(self): """ Get all of the buckets that haven't been updated in over an hour. """ hrago = time.monotonic() - 3600 - return [b for b in self.buckets if b.lastUpdated < hrago] + return [b for b in self.buckets if b.last_updated < hrago] - def removeContact(self, node): - index = self.getBucketFor(node) - self.buckets[index].removeNode(node) + def remove_contact(self, node): + index = self.get_bucket_for(node) + self.buckets[index].remove_node(node) - def isNewNode(self, node): - index = self.getBucketFor(node) - return self.buckets[index].isNewNode(node) + def is_new_node(self, node): + index = self.get_bucket_for(node) + return self.buckets[index].is_new_node(node) - def addContact(self, node): - index = self.getBucketFor(node) + def add_contact(self, node): + index = self.get_bucket_for(node) bucket = self.buckets[index] # this will succeed unless the bucket is full - if bucket.addNode(node): + if bucket.add_node(node): return # Per section 4.2 of paper, split if the bucket has the node # in its range or if the depth is not congruent to 0 mod 5 - if bucket.hasInRange(self.node) or bucket.depth() % 5 != 0: - self.splitBucket(index) - self.addContact(node) + if bucket.has_in_range(self.node) or bucket.depth() % 5 != 0: + self.split_bucket(index) + self.add_contact(node) else: - asyncio.ensure_future(self.protocol.callPing(bucket.head())) + asyncio.ensure_future(self.protocol.call_ping(bucket.head())) - def getBucketFor(self, node): + def get_bucket_for(self, node): """ Get the index of the bucket that the given node would fall into. """ for index, bucket in enumerate(self.buckets): if node.long_id < bucket.range[1]: return index + # we should never be here, but make linter happy + return None - def findNeighbors(self, node, k=None, exclude=None): + def find_neighbors(self, node, k=None, exclude=None): k = k or self.ksize nodes = [] for neighbor in TableTraverser(self, node): - notexcluded = exclude is None or not neighbor.sameHomeAs(exclude) + notexcluded = exclude is None or not neighbor.same_home_as(exclude) if neighbor.id != node.id and notexcluded: - heapq.heappush(nodes, (node.distanceTo(neighbor), neighbor)) + heapq.heappush(nodes, (node.distance_to(neighbor), neighbor)) if len(nodes) == k: break diff --git a/libp2p/kademlia/storage.py b/libp2p/kademlia/storage.py index 71865f2..1aa1ac7 100644 --- a/libp2p/kademlia/storage.py +++ b/libp2p/kademlia/storage.py @@ -2,44 +2,45 @@ import time from itertools import takewhile import operator from collections import OrderedDict +from abc import abstractmethod, ABC -class IStorage: +class IStorage(ABC): """ Local storage for this node. IStorage implementations of get must return the same type as put in by set """ + @abstractmethod def __setitem__(self, key, value): """ Set a key to the given value. """ - raise NotImplementedError + @abstractmethod def __getitem__(self, key): """ Get the given key. If item doesn't exist, raises C{KeyError} """ - raise NotImplementedError + @abstractmethod def get(self, key, default=None): """ Get given key. If not found, return default. """ - raise NotImplementedError - def iteritemsOlderThan(self, secondsOld): + @abstractmethod + def iter_older_than(self, seconds_old): """ Return the an iterator over (key, value) tuples for items older than the given secondsOld. """ - raise NotImplementedError + @abstractmethod def __iter__(self): """ Get the iterator for this storage, should yield tuple of (key, value) """ - raise NotImplementedError class ForgetfulStorage(IStorage): @@ -57,7 +58,7 @@ class ForgetfulStorage(IStorage): self.cull() def cull(self): - for _, _ in self.iteritemsOlderThan(self.ttl): + for _, _ in self.iter_older_than(self.ttl): self.data.popitem(last=False) def get(self, key, default=None): @@ -70,27 +71,23 @@ class ForgetfulStorage(IStorage): self.cull() return self.data[key][1] - def __iter__(self): - self.cull() - return iter(self.data) - def __repr__(self): self.cull() return repr(self.data) - def iteritemsOlderThan(self, secondsOld): - minBirthday = time.monotonic() - secondsOld - zipped = self._tripleIterable() - matches = takewhile(lambda r: minBirthday >= r[1], zipped) + def iter_older_than(self, seconds_old): + min_birthday = time.monotonic() - seconds_old + zipped = self._triple_iter() + matches = takewhile(lambda r: min_birthday >= r[1], zipped) return list(map(operator.itemgetter(0, 2), matches)) - def _tripleIterable(self): + def _triple_iter(self): ikeys = self.data.keys() ibirthday = map(operator.itemgetter(0), self.data.values()) ivalues = map(operator.itemgetter(1), self.data.values()) return zip(ikeys, ibirthday, ivalues) - def items(self): + def __iter__(self): self.cull() ikeys = self.data.keys() ivalues = map(operator.itemgetter(1), self.data.values()) diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index 9732bf5..da90085 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -6,16 +6,16 @@ import operator import asyncio -async def gather_dict(d): - cors = list(d.values()) +async def gather_dict(dic): + cors = list(dic.values()) results = await asyncio.gather(*cors) - return dict(zip(d.keys(), results)) + return dict(zip(dic.keys(), results)) -def digest(s): - if not isinstance(s, bytes): - s = str(s).encode('utf8') - return hashlib.sha1(s).digest() +def digest(string): + if not isinstance(string, bytes): + string = str(string).encode('utf8') + return hashlib.sha1(string).digest() class OrderedSet(list): @@ -34,7 +34,7 @@ class OrderedSet(list): self.append(thing) -def sharedPrefix(args): +def shared_prefix(args): """ Find the shared prefix between the strings. @@ -52,6 +52,6 @@ def sharedPrefix(args): return args[0][:i] -def bytesToBitString(bites): +def bytes_to_bit_string(bites): bits = [bin(bite)[2:].rjust(8, '0') for bite in bites] return "".join(bits) From 4889a0a790c61e0925d78169024756b75dfb190f Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Pereira Martins Date: Tue, 15 Jan 2019 18:43:54 +0100 Subject: [PATCH 02/10] First POC of peer routing using kademlia lib --- libp2p/routing/__init__.py | 0 libp2p/routing/interfaces.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 libp2p/routing/__init__.py create mode 100644 libp2p/routing/interfaces.py diff --git a/libp2p/routing/__init__.py b/libp2p/routing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py new file mode 100644 index 0000000..1f29d48 --- /dev/null +++ b/libp2p/routing/interfaces.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +# pylint: disable=too-few-public-methods + + +class IContentRouting(ABC): + + @abstractmethod + def provide(self, cid, announce=True): + """ + Provide adds the given cid to the content routing system. If announce is True, + it also announces it, otherwise it is just kept in the local + accounting of which objects are being provided. + """ + + @abstractmethod + def find_provider_iter(self, cid, count): + """ + Search for peers who are able to provide a given key + returns an iterator of peer.PeerInfo + """ + + +class IPeerRouting(ABC): + + @abstractmethod + def find_peer(self, peer_id): + """ + Find specific Peer + FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo + with relevant addresses. + """ From cd8cb5c443dc8ad50883825569ea2ed4763ea5fa Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Wed, 27 Feb 2019 22:49:51 +0100 Subject: [PATCH 03/10] refactoring of the code to implement IAdvertiser and IDiscoverer --- libp2p/__init__.py | 2 + libp2p/{routing => discovery}/__init__.py | 0 libp2p/discovery/kademlia_router.py | 77 ++++++++++++++++++++++ libp2p/host/routed_host.py | 61 +++++++++++++++++ libp2p/kademlia/crawling.py | 40 ++++++++++++ tests/kademlia/__init__.py | 0 tests/kademlia/test_libp2p_node.py | 80 +++++++++++++++++++++++ 7 files changed, 260 insertions(+) rename libp2p/{routing => discovery}/__init__.py (100%) create mode 100644 libp2p/discovery/kademlia_router.py create mode 100644 libp2p/host/routed_host.py create mode 100644 tests/kademlia/__init__.py create mode 100644 tests/kademlia/test_libp2p_node.py diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 62e014b..fe00408 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -23,6 +23,7 @@ async def cleanup_done_tasks(): # Some sleep necessary to context switch await asyncio.sleep(3) + def initialize_default_swarm( id_opt=None, transport_opt=None, muxer_opt=None, sec_opt=None, peerstore_opt=None): @@ -54,6 +55,7 @@ def initialize_default_swarm( return swarm_opt + async def new_node( swarm_opt=None, id_opt=None, transport_opt=None, muxer_opt=None, sec_opt=None, peerstore_opt=None): diff --git a/libp2p/routing/__init__.py b/libp2p/discovery/__init__.py similarity index 100% rename from libp2p/routing/__init__.py rename to libp2p/discovery/__init__.py diff --git a/libp2p/discovery/kademlia_router.py b/libp2p/discovery/kademlia_router.py new file mode 100644 index 0000000..2d7004c --- /dev/null +++ b/libp2p/discovery/kademlia_router.py @@ -0,0 +1,77 @@ +import msgpack + +from libp2p.peer.id import id_b58_decode +from libp2p.kademlia.network import Server +from libp2p.kademlia.node import Node +from libp2p.kademlia.utils import digest +from libp2p.kademlia.crawling import ValueMultipleSpiderCrawl +from libp2p.discovery.advertiser_interface import IAdvertiser +from libp2p.discovery.discoverer_interface import IDiscoverer + +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.peerdata import PeerData + + +class KademliaPeerRouter(IAdvertiser, IDiscoverer): + + def __init__(self, host, bootstrap_nodes=None): + self.host = host + self.peer_id = host.get_id() + self.bootstrap_nodes = bootstrap_nodes + self.node = Server() + + async def listen(self, port): + await self.node.listen(port) + if self.bootstrap_nodes: + await self.node.bootstrap(self.bootstrap_nodes) + + async def advertise(self, service): + await self.node.set(service, self._make_advertise_msg()) + + def _make_advertise_msg(self): + peer_data = PeerData() + peer_data.add_addrs(self.host.get_addrs()) + peer_info = PeerInfo(self.peer_id, peer_data) + + if len(peer_info.addrs) < 1: + raise RuntimeError("not know address for self") + + return encode_peer_info(peer_info) + + async def find_peers(self, service): + key = dht_key(service) + target = Node(key) + + nearest = self.node.protocol.router.find_neighbors(target) + if not nearest: + print("There are no known neighbors to get key %s", key) + return [] + spider = ValueMultipleSpiderCrawl(self.node.protocol, target, nearest, + self.node.ksize, self.node.alpha) + + values = await spider.find() + if values: + return list(map(decode_peer_info, values)) + return [] + + +def dht_key(service): + # TODO: should convert to Content Identification + return digest(service) + + +def encode_peer_info(peer_info): + return msgpack.dumps({ + 'peer_id': peer_info.peer_id.pretty(), + 'addrs': [str(ma) for ma in peer_info.addrs] + }) + + +def decode_peer_info(data): + info = msgpack.loads(data, raw=False) + + peer_id = id_b58_decode(info['peer_id']) + peer_data = PeerData() + peer_data.add_addrs(info['addrs']) + + return PeerInfo(peer_id, peer_data) diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py new file mode 100644 index 0000000..3d1a19a --- /dev/null +++ b/libp2p/host/routed_host.py @@ -0,0 +1,61 @@ +from libp2p.peer.peerstore import PeerStoreError + +from .basic_host import BasicHost + + +class RoutedHost(BasicHost): + + # default options constructor + def __init__(self, host, router): + super().__init__(host.network) + self.host = host + self.router = router + + async def advertise(self, service): + await self.router.advertise(service) + + async def connect(self, peer_info): + """ + connect ensures there is a connection between this host and the peer with + given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal + peerstore. If there is not an active connection, connect will issue a + dial, and block until a connection is open, or an error is + returned. + + :param peer_info: peer_info of the host we want to connect to + :type peer_info: peer.peerinfo.PeerInfo + """ + # there is already a connection to this peer + if peer_info.peer_id in self.network.connections: + return + + # Check if we have some address for that peer + # if not, we use the router to get information about the peer + peer_info.addrs = await self._find_peer_addrs(peer_info.peer_id) + + # if addrs are given, save them + if peer_info.addrs: + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + + # try to connect + await super().connect(peer_info) + + def find_peers(self, service): + return self.router.find_peers(service) + + async def _find_peer_addrs(self, peer_id): + try: + addrs = self.peerstore.addrs(peer_id) + except PeerStoreError: + addrs = None + + if not addrs: + peers_info = await self.router.find_peers(peer_id.pretty()) + if not peers_info: + raise KeyError("no address found for this peer_id %s" % str(peer_id)) + peer_info = peers_info[0] # todo: handle multiple response + if peer_info.peer_id != peer_id: + raise RuntimeError('routing failure: provided addrs for different peer') + addrs = peer_info.addrs + + return addrs diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 6dabd10..ade597a 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -69,6 +69,46 @@ class SpiderCrawl: raise NotImplementedError +class ValueMultipleSpiderCrawl(SpiderCrawl): + # TODO: move this out of this module + def __init__(self, protocol, node, peers, ksize, alpha): + SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) + # keep track of the single nearest node without value - per + # section 2.3 so we can set the key there if found + self.nearest_without_value = NodeHeap(self.node, 1) + + async def find(self): + """ + Find either the closest nodes or the value requested. + """ + return await self._find(self.protocol.call_find_value) + + async def _nodes_found(self, responses): + """ + Handle the result of an iteration in _find. + """ + toremove = [] + found_values = [] + for peerid, response in responses.items(): + response = RPCFindResponse(response) + if not response.happened(): + toremove.append(peerid) + elif response.has_value(): + found_values.append(response.get_value()) + else: + peer = self.nearest.get_node(peerid) + self.nearest_without_value.push(peer) + self.nearest.push(response.get_node_list()) + self.nearest.remove(toremove) + + if found_values: + return found_values + if self.nearest.have_contacted_all(): + # not found! + return None + return await self.find() + + class ValueSpiderCrawl(SpiderCrawl): def __init__(self, protocol, node, peers, ksize, alpha): SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) diff --git a/tests/kademlia/__init__.py b/tests/kademlia/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/kademlia/test_libp2p_node.py b/tests/kademlia/test_libp2p_node.py new file mode 100644 index 0000000..bcdd7e4 --- /dev/null +++ b/tests/kademlia/test_libp2p_node.py @@ -0,0 +1,80 @@ +import multiaddr +import pytest +from libp2p import new_node +from libp2p.host.routed_host import RoutedHost +from libp2p.kademlia.network import Server +from libp2p.discovery.kademlia_router import KademliaPeerRouter +from libp2p.peer.peerinfo import info_from_p2p_addr + + +# @pytest.mark.asyncio +# async def test_connect_to_peer(): +# node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) +# node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + +# node_a = RoutedHost(node_a, KademliaPeerRouter(node_a)) +# await node_a.router.listen(5678) +# node_b = RoutedHost(node_b, KademliaPeerRouter(node_b, [('127.0.0.1', 5678)])) +# await node_b.router.listen(5679) + +# await node_a.advertise(node_a.get_id().pretty()) + +# async def stream_handler(stream): +# while True: +# read_string = (await stream.read()).decode() +# print("host B received:" + read_string) + +# response = "ack:" + read_string +# print("sending response:" + response) +# await stream.write(response.encode()) + +# node_b.set_stream_handler("/echo/1.0.0", stream_handler) + +# info = info_from_p2p_addr(node_b.get_addrs()[0]) +# info.addrs = [] # explicitly empty the addrs to force the router to look in the dht +# await node_a.connect(info) + +# stream = await node_a.new_stream(node_b.get_id(), ["/echo/1.0.0"]) + +# messages = ["hello" + str(x) for x in range(10)] +# for message in messages: +# await stream.write(message.encode()) + +# response = (await stream.read()).decode() + +# print("res: " + response) +# assert response == ("ack:" + message) + +# # Success, terminate pending tasks. +# return + +@pytest.mark.parametrize("nodes_nr", [2]) +# @pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(1, 5)]) +@pytest.mark.asyncio +async def test_find_peers(nodes_nr): + nodes = await _make_routed_hosts(nodes_nr, 5678) + node_a = nodes[0] + + await node_a.advertise("test") + + for node in nodes[1:]: + peers = await node.find_peers("test") + assert len(peers) == 1 + assert peers[0].peer_id == node_a.get_id() + + +async def _make_routed_host(dht_port, bootstrap_nodes=None): + node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + routed_node = RoutedHost(node, KademliaPeerRouter(node, bootstrap_nodes)) + await routed_node.router.listen(dht_port) + return routed_node + + +async def _make_routed_hosts(n, dht_port): + hosts = [] + bootraps = [] + for _ in range(n): + hosts.append(await _make_routed_host(dht_port, bootraps)) + bootraps.append(('127.0.0.1', dht_port)) + dht_port += 1 + return hosts From 2d1b9a03d1208a300b34d411fe82bbbbf2ca17b7 Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Tue, 26 Mar 2019 20:17:49 +0100 Subject: [PATCH 04/10] clean up --- libp2p/discovery/__init__.py | 0 libp2p/discovery/advertiser_interface.py | 16 ----- libp2p/discovery/discoverer_interface.py | 17 ----- libp2p/discovery/kademlia_router.py | 77 ----------------------- libp2p/host/routed_host.py | 61 ------------------ libp2p/kademlia/crawling.py | 40 ------------ tests/kademlia/test_libp2p_node.py | 80 ------------------------ 7 files changed, 291 deletions(-) delete mode 100644 libp2p/discovery/__init__.py delete mode 100644 libp2p/discovery/advertiser_interface.py delete mode 100644 libp2p/discovery/discoverer_interface.py delete mode 100644 libp2p/discovery/kademlia_router.py delete mode 100644 libp2p/host/routed_host.py delete mode 100644 tests/kademlia/test_libp2p_node.py diff --git a/libp2p/discovery/__init__.py b/libp2p/discovery/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/libp2p/discovery/advertiser_interface.py b/libp2p/discovery/advertiser_interface.py deleted file mode 100644 index 6c7297d..0000000 --- a/libp2p/discovery/advertiser_interface.py +++ /dev/null @@ -1,16 +0,0 @@ -from abc import ABC, abstractmethod -# pylint: disable=too-few-public-methods - -class IAdvertiser(ABC): - - def __init__(self): - pass - - @abstractmethod - def advertise(self, service): - """ - Advertise providing a specific service to the network - :param service: service that you provide - :raise Exception: network error - """ - \ No newline at end of file diff --git a/libp2p/discovery/discoverer_interface.py b/libp2p/discovery/discoverer_interface.py deleted file mode 100644 index 4924d29..0000000 --- a/libp2p/discovery/discoverer_interface.py +++ /dev/null @@ -1,17 +0,0 @@ -from abc import ABC, abstractmethod -# pylint: disable=too-few-public-methods - - -class IDiscoverer(ABC): - - def __init__(self): - pass - - @abstractmethod - def find_peers(self, service): - """ - Find peers on the networking providing a particular service - :param service: service that peers must provide - :return: PeerInfo generator that yields PeerInfo objects for discovered peers - :raise Exception: network error - """ diff --git a/libp2p/discovery/kademlia_router.py b/libp2p/discovery/kademlia_router.py deleted file mode 100644 index 2d7004c..0000000 --- a/libp2p/discovery/kademlia_router.py +++ /dev/null @@ -1,77 +0,0 @@ -import msgpack - -from libp2p.peer.id import id_b58_decode -from libp2p.kademlia.network import Server -from libp2p.kademlia.node import Node -from libp2p.kademlia.utils import digest -from libp2p.kademlia.crawling import ValueMultipleSpiderCrawl -from libp2p.discovery.advertiser_interface import IAdvertiser -from libp2p.discovery.discoverer_interface import IDiscoverer - -from libp2p.peer.peerinfo import PeerInfo -from libp2p.peer.peerdata import PeerData - - -class KademliaPeerRouter(IAdvertiser, IDiscoverer): - - def __init__(self, host, bootstrap_nodes=None): - self.host = host - self.peer_id = host.get_id() - self.bootstrap_nodes = bootstrap_nodes - self.node = Server() - - async def listen(self, port): - await self.node.listen(port) - if self.bootstrap_nodes: - await self.node.bootstrap(self.bootstrap_nodes) - - async def advertise(self, service): - await self.node.set(service, self._make_advertise_msg()) - - def _make_advertise_msg(self): - peer_data = PeerData() - peer_data.add_addrs(self.host.get_addrs()) - peer_info = PeerInfo(self.peer_id, peer_data) - - if len(peer_info.addrs) < 1: - raise RuntimeError("not know address for self") - - return encode_peer_info(peer_info) - - async def find_peers(self, service): - key = dht_key(service) - target = Node(key) - - nearest = self.node.protocol.router.find_neighbors(target) - if not nearest: - print("There are no known neighbors to get key %s", key) - return [] - spider = ValueMultipleSpiderCrawl(self.node.protocol, target, nearest, - self.node.ksize, self.node.alpha) - - values = await spider.find() - if values: - return list(map(decode_peer_info, values)) - return [] - - -def dht_key(service): - # TODO: should convert to Content Identification - return digest(service) - - -def encode_peer_info(peer_info): - return msgpack.dumps({ - 'peer_id': peer_info.peer_id.pretty(), - 'addrs': [str(ma) for ma in peer_info.addrs] - }) - - -def decode_peer_info(data): - info = msgpack.loads(data, raw=False) - - peer_id = id_b58_decode(info['peer_id']) - peer_data = PeerData() - peer_data.add_addrs(info['addrs']) - - return PeerInfo(peer_id, peer_data) diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py deleted file mode 100644 index 3d1a19a..0000000 --- a/libp2p/host/routed_host.py +++ /dev/null @@ -1,61 +0,0 @@ -from libp2p.peer.peerstore import PeerStoreError - -from .basic_host import BasicHost - - -class RoutedHost(BasicHost): - - # default options constructor - def __init__(self, host, router): - super().__init__(host.network) - self.host = host - self.router = router - - async def advertise(self, service): - await self.router.advertise(service) - - async def connect(self, peer_info): - """ - connect ensures there is a connection between this host and the peer with - given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal - peerstore. If there is not an active connection, connect will issue a - dial, and block until a connection is open, or an error is - returned. - - :param peer_info: peer_info of the host we want to connect to - :type peer_info: peer.peerinfo.PeerInfo - """ - # there is already a connection to this peer - if peer_info.peer_id in self.network.connections: - return - - # Check if we have some address for that peer - # if not, we use the router to get information about the peer - peer_info.addrs = await self._find_peer_addrs(peer_info.peer_id) - - # if addrs are given, save them - if peer_info.addrs: - self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) - - # try to connect - await super().connect(peer_info) - - def find_peers(self, service): - return self.router.find_peers(service) - - async def _find_peer_addrs(self, peer_id): - try: - addrs = self.peerstore.addrs(peer_id) - except PeerStoreError: - addrs = None - - if not addrs: - peers_info = await self.router.find_peers(peer_id.pretty()) - if not peers_info: - raise KeyError("no address found for this peer_id %s" % str(peer_id)) - peer_info = peers_info[0] # todo: handle multiple response - if peer_info.peer_id != peer_id: - raise RuntimeError('routing failure: provided addrs for different peer') - addrs = peer_info.addrs - - return addrs diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index ade597a..6dabd10 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -69,46 +69,6 @@ class SpiderCrawl: raise NotImplementedError -class ValueMultipleSpiderCrawl(SpiderCrawl): - # TODO: move this out of this module - def __init__(self, protocol, node, peers, ksize, alpha): - SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) - # keep track of the single nearest node without value - per - # section 2.3 so we can set the key there if found - self.nearest_without_value = NodeHeap(self.node, 1) - - async def find(self): - """ - Find either the closest nodes or the value requested. - """ - return await self._find(self.protocol.call_find_value) - - async def _nodes_found(self, responses): - """ - Handle the result of an iteration in _find. - """ - toremove = [] - found_values = [] - for peerid, response in responses.items(): - response = RPCFindResponse(response) - if not response.happened(): - toremove.append(peerid) - elif response.has_value(): - found_values.append(response.get_value()) - else: - peer = self.nearest.get_node(peerid) - self.nearest_without_value.push(peer) - self.nearest.push(response.get_node_list()) - self.nearest.remove(toremove) - - if found_values: - return found_values - if self.nearest.have_contacted_all(): - # not found! - return None - return await self.find() - - class ValueSpiderCrawl(SpiderCrawl): def __init__(self, protocol, node, peers, ksize, alpha): SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) diff --git a/tests/kademlia/test_libp2p_node.py b/tests/kademlia/test_libp2p_node.py deleted file mode 100644 index bcdd7e4..0000000 --- a/tests/kademlia/test_libp2p_node.py +++ /dev/null @@ -1,80 +0,0 @@ -import multiaddr -import pytest -from libp2p import new_node -from libp2p.host.routed_host import RoutedHost -from libp2p.kademlia.network import Server -from libp2p.discovery.kademlia_router import KademliaPeerRouter -from libp2p.peer.peerinfo import info_from_p2p_addr - - -# @pytest.mark.asyncio -# async def test_connect_to_peer(): -# node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) -# node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - -# node_a = RoutedHost(node_a, KademliaPeerRouter(node_a)) -# await node_a.router.listen(5678) -# node_b = RoutedHost(node_b, KademliaPeerRouter(node_b, [('127.0.0.1', 5678)])) -# await node_b.router.listen(5679) - -# await node_a.advertise(node_a.get_id().pretty()) - -# async def stream_handler(stream): -# while True: -# read_string = (await stream.read()).decode() -# print("host B received:" + read_string) - -# response = "ack:" + read_string -# print("sending response:" + response) -# await stream.write(response.encode()) - -# node_b.set_stream_handler("/echo/1.0.0", stream_handler) - -# info = info_from_p2p_addr(node_b.get_addrs()[0]) -# info.addrs = [] # explicitly empty the addrs to force the router to look in the dht -# await node_a.connect(info) - -# stream = await node_a.new_stream(node_b.get_id(), ["/echo/1.0.0"]) - -# messages = ["hello" + str(x) for x in range(10)] -# for message in messages: -# await stream.write(message.encode()) - -# response = (await stream.read()).decode() - -# print("res: " + response) -# assert response == ("ack:" + message) - -# # Success, terminate pending tasks. -# return - -@pytest.mark.parametrize("nodes_nr", [2]) -# @pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(1, 5)]) -@pytest.mark.asyncio -async def test_find_peers(nodes_nr): - nodes = await _make_routed_hosts(nodes_nr, 5678) - node_a = nodes[0] - - await node_a.advertise("test") - - for node in nodes[1:]: - peers = await node.find_peers("test") - assert len(peers) == 1 - assert peers[0].peer_id == node_a.get_id() - - -async def _make_routed_host(dht_port, bootstrap_nodes=None): - node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - routed_node = RoutedHost(node, KademliaPeerRouter(node, bootstrap_nodes)) - await routed_node.router.listen(dht_port) - return routed_node - - -async def _make_routed_hosts(n, dht_port): - hosts = [] - bootraps = [] - for _ in range(n): - hosts.append(await _make_routed_host(dht_port, bootraps)) - bootraps.append(('127.0.0.1', dht_port)) - dht_port += 1 - return hosts From c5289952eed43dd0b5a4b658d836089dc9718bc1 Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Tue, 26 Mar 2019 20:44:01 +0100 Subject: [PATCH 05/10] add routing interfaces --- libp2p/routing/kadmelia/__init__.py | 0 .../kadmelia/kadmelia_content_router.py | 19 +++++ .../routing/kadmelia/kadmelia_peer_router.py | 30 +++++++ tests/kademlia/test_basic.py | 81 +++++++++++++++++++ 4 files changed, 130 insertions(+) create mode 100644 libp2p/routing/kadmelia/__init__.py create mode 100644 libp2p/routing/kadmelia/kadmelia_content_router.py create mode 100644 libp2p/routing/kadmelia/kadmelia_peer_router.py create mode 100644 tests/kademlia/test_basic.py diff --git a/libp2p/routing/kadmelia/__init__.py b/libp2p/routing/kadmelia/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libp2p/routing/kadmelia/kadmelia_content_router.py b/libp2p/routing/kadmelia/kadmelia_content_router.py new file mode 100644 index 0000000..d96269a --- /dev/null +++ b/libp2p/routing/kadmelia/kadmelia_content_router.py @@ -0,0 +1,19 @@ +from libp2p.routing.interfaces import IContentRouting + + +class KadmeliaContentRouter(IContentRouting): + + def provide(self, cid, announce=True): + """ + Provide adds the given cid to the content routing system. If announce is True, + it also announces it, otherwise it is just kept in the local + accounting of which objects are being provided. + """ + pass + + def find_provider_iter(self, cid, count): + """ + Search for peers who are able to provide a given key + returns an iterator of peer.PeerInfo + """ + pass diff --git a/libp2p/routing/kadmelia/kadmelia_peer_router.py b/libp2p/routing/kadmelia/kadmelia_peer_router.py new file mode 100644 index 0000000..0201b7d --- /dev/null +++ b/libp2p/routing/kadmelia/kadmelia_peer_router.py @@ -0,0 +1,30 @@ +from libp2p.routing.interfaces import IPeerRouting +from libp2p.kademlia.utils import digest +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.peerdata import PeerData + + +class KadmeliaPeerRouter(IPeerRouting): + + def __init__(self, dht_server): + self.server = dht_server + + def find_peer(self, peer_id): + """ + Find specific Peer + FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo + with relevant addresses. + """ + value = self.server.get(peer_id) + return decode_peerinfo(value) + + +def decode_peerinfo(encoded): + if isinstance(encoded, bytes): + encoded = encoded.decode() + lines = encoded.splitlines() + peer_id = lines[0] + addrs = lines[1:] + peer_data = PeerData() + peer_data.add_addrs(addrs) + return PeerInfo(peer_id, addrs) diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py new file mode 100644 index 0000000..e35154b --- /dev/null +++ b/tests/kademlia/test_basic.py @@ -0,0 +1,81 @@ +import pytest +from libp2p.kademlia.network import Server + + +@pytest.mark.asyncio +async def test_example(): + node_a = Server() + await node_a.listen(5678) + + node_b = Server() + await node_b.listen(5679) + + # Bootstrap the node by connecting to other known nodes, in this case + # replace 123.123.123.123 with the IP of another node and optionally + # give as many ip/port combos as you can for other nodes. + await node_b.bootstrap([("127.0.0.1", 5678)]) + + # set a value for the key "my-key" on the network + value = "my-value" + key = "my-key" + await node_b.set(key, value) + + # get the value associated with "my-key" from the network + assert await node_b.get(key) == value + assert await node_a.get(key) == value + + +@pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(2, 5)]) +@pytest.mark.asyncio +async def test_multiple_nodes_bootstrap_set_get(nodes_nr): + nodes_nr = 25 + + node_bootstrap = Server() + await node_bootstrap.listen(5678) + + nodes = [] + for i in range(nodes_nr): + node = Server() + addrs = [("127.0.0.1", 5678)] + await node.listen(5679 + i) + await node.bootstrap(addrs) + nodes.append(node) + + for i, node in enumerate(nodes): + # set a value for the key "my-key" on the network + value = "my awesome value %d" % i + key = "set from %d" % i + await node.set(key, value) + + for i in range(nodes_nr): + for node in nodes: + value = "my awesome value %d" % i + key = "set from %d" % i + assert await node.get(key) == value + + +@pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(2, 5)]) +@pytest.mark.asyncio +async def test_multiple_nodes_set_bootstrap_get(nodes_nr): + nodes_nr = 25 + + node_bootstrap = Server() + await node_bootstrap.listen(5678) + + nodes = [] + for i in range(nodes_nr): + node = Server() + addrs = [("127.0.0.1", 5678)] + await node.listen(5679 + i) + await node.bootstrap(addrs) + + value = "my awesome value %d" % i + key = "set from %d" % i + await node.set(key, value) + nodes.append(node) + + for i in range(nodes_nr): + for node in nodes: + value = "my awesome value %d" % i + key = "set from %d" % i + assert await node.get(key) == value From d5c7cc7cb7bcf448974232d9d5d1ad85d270f6c5 Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Wed, 17 Apr 2019 20:21:59 -0400 Subject: [PATCH 06/10] added changes from future commits to last passing bmuller commit --- .travis.yml | 2 +- libp2p/kademlia/crawling.py | 2 + libp2p/kademlia/protocol.py | 15 ++++ libp2p/kademlia/routing.py | 7 ++ libp2p/kademlia/rpc.proto | 78 +++++++++++++++++++ libp2p/kademlia/storage.py | 2 +- .../kadmelia/kadmelia_content_router.py | 2 + .../routing/kadmelia/kadmelia_peer_router.py | 1 + tests/kademlia/test_basic.py | 18 ++--- 9 files changed, 115 insertions(+), 12 deletions(-) create mode 100644 libp2p/kademlia/rpc.proto diff --git a/.travis.yml b/.travis.yml index a2df56d..a5dfefb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ install: script: - pytest --cov=./libp2p tests/ - - pylint --rcfile=.pylintrc libp2p/!(kademlia) tests + - pylint --rcfile=.pylintrc libp2p tests after_success: - codecov diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 6dabd10..6006bef 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -15,6 +15,7 @@ class SpiderCrawl: """ def __init__(self, protocol, node, peers, ksize, alpha): + # pylint: disable=too-many-arguments """ Create a new C{SpiderCrawl}er. @@ -71,6 +72,7 @@ class SpiderCrawl: class ValueSpiderCrawl(SpiderCrawl): def __init__(self, protocol, node, peers, ksize, alpha): + # pylint: disable=too-many-arguments SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) # keep track of the single nearest node without value - per # section 2.3 so we can set the key there if found diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 3573983..5273230 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -12,6 +12,15 @@ log = logging.getLogger(__name__) # pylint: disable=invalid-name class KademliaProtocol(RPCProtocol): + """ + There are four main RPCs in the Kademlia protocol + PING, STORE, FIND_NODE, FIND_VALUE + PING probes if a node is still online + STORE instructs a node to store (key, value) + FIND_NODE takes a 160-bit ID and gets back + (ip, udp_port, node_id) for k closest nodes to target + FIND_VALUE behaves like FIND_NODE unless a value is stored + """ def __init__(self, source_node, storage, ksize): RPCProtocol.__init__(self) self.router = RoutingTable(self, ksize, source_node) @@ -28,6 +37,12 @@ class KademliaProtocol(RPCProtocol): ids.append(rid) return ids + def rpc_add_provider(self, sender, nodeid, key): + pass + + def rpc_get_providers(self, sender, nodeid, key): + pass + def rpc_stun(self, sender): # pylint: disable=no-self-use return sender diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index 2ffd525..3f88e8b 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -8,6 +8,13 @@ from .utils import OrderedSet, shared_prefix, bytes_to_bit_string class KBucket: + """ + each node keeps a list of (ip, udp_port, node_id) + for nodes of distance between 2^i and 2^(i+1) + this list that every node keeps is a k-bucket + each k-bucket implements a last seen eviction + policy except that live nodes are never removed + """ def __init__(self, rangeLower, rangeUpper, ksize): self.range = (rangeLower, rangeUpper) self.nodes = OrderedDict() diff --git a/libp2p/kademlia/rpc.proto b/libp2p/kademlia/rpc.proto new file mode 100644 index 0000000..96c14a8 --- /dev/null +++ b/libp2p/kademlia/rpc.proto @@ -0,0 +1,78 @@ +// Record represents a dht record that contains a value +// for a key value pair +message Record { + // The key that references this record + bytes key = 1; + + // The actual value this record is storing + bytes value = 2; + + // Note: These fields were removed from the Record message + // hash of the authors public key + //optional string author = 3; + // A PKI signature for the key+value+author + //optional bytes signature = 4; + + // Time the record was received, set by receiver + string timeReceived = 5; +}; + +message Message { + enum MessageType { + PUT_VALUE = 0; + GET_VALUE = 1; + ADD_PROVIDER = 2; + GET_PROVIDERS = 3; + FIND_NODE = 4; + PING = 5; + } + + enum ConnectionType { + // sender does not have a connection to peer, and no extra information (default) + NOT_CONNECTED = 0; + + // sender has a live connection to peer + CONNECTED = 1; + + // sender recently connected to peer + CAN_CONNECT = 2; + + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + CANNOT_CONNECT = 3; + } + + message Peer { + // ID of a given peer. + bytes id = 1; + + // multiaddrs for a given peer + repeated bytes addrs = 2; + + // used to signal the sender's connection capabilities to the peer + ConnectionType connection = 3; + } + + // defines what type of message it is. + MessageType type = 1; + + // defines what coral cluster level this query/response belongs to. + // in case we want to implement coral's cluster rings in the future. + int32 clusterLevelRaw = 10; // NOT USED + + // Used to specify the key associated with this message. + // PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + bytes key = 2; + + // Used to return a value + // PUT_VALUE, GET_VALUE + Record record = 3; + + // Used to return peers closer to a key in a query + // GET_VALUE, GET_PROVIDERS, FIND_NODE + repeated Peer closerPeers = 8; + + // Used to return Providers + // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + repeated Peer providerPeers = 9; +} \ No newline at end of file diff --git a/libp2p/kademlia/storage.py b/libp2p/kademlia/storage.py index 1aa1ac7..bcbf625 100644 --- a/libp2p/kademlia/storage.py +++ b/libp2p/kademlia/storage.py @@ -33,7 +33,7 @@ class IStorage(ABC): def iter_older_than(self, seconds_old): """ Return the an iterator over (key, value) tuples for items older - than the given secondsOld. + than the given seconds_old. """ @abstractmethod diff --git a/libp2p/routing/kadmelia/kadmelia_content_router.py b/libp2p/routing/kadmelia/kadmelia_content_router.py index d96269a..60cf55b 100644 --- a/libp2p/routing/kadmelia/kadmelia_content_router.py +++ b/libp2p/routing/kadmelia/kadmelia_content_router.py @@ -9,6 +9,8 @@ class KadmeliaContentRouter(IContentRouting): it also announces it, otherwise it is just kept in the local accounting of which objects are being provided. """ + # the DHT finds the closest peers to `key` using the `FIND_NODE` RPC + # then sends a `ADD_PROVIDER` RPC with its own `PeerInfo` to each of these peers. pass def find_provider_iter(self, cid, count): diff --git a/libp2p/routing/kadmelia/kadmelia_peer_router.py b/libp2p/routing/kadmelia/kadmelia_peer_router.py index 0201b7d..27be67c 100644 --- a/libp2p/routing/kadmelia/kadmelia_peer_router.py +++ b/libp2p/routing/kadmelia/kadmelia_peer_router.py @@ -5,6 +5,7 @@ from libp2p.peer.peerdata import PeerData class KadmeliaPeerRouter(IPeerRouting): + # pylint: disable=too-few-public-methods def __init__(self, dht_server): self.server = dht_server diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py index e35154b..82014a9 100644 --- a/tests/kademlia/test_basic.py +++ b/tests/kademlia/test_basic.py @@ -1,5 +1,6 @@ import pytest from libp2p.kademlia.network import Server +import math @pytest.mark.asyncio @@ -28,16 +29,15 @@ async def test_example(): @pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(2, 5)]) @pytest.mark.asyncio async def test_multiple_nodes_bootstrap_set_get(nodes_nr): - nodes_nr = 25 node_bootstrap = Server() - await node_bootstrap.listen(5678) + await node_bootstrap.listen(1000 + nodes_nr * 2) nodes = [] for i in range(nodes_nr): node = Server() - addrs = [("127.0.0.1", 5678)] - await node.listen(5679 + i) + addrs = [("127.0.0.1", 1000 + nodes_nr * 2)] + await node.listen(1001 + i + nodes_nr * 2) await node.bootstrap(addrs) nodes.append(node) @@ -57,16 +57,14 @@ async def test_multiple_nodes_bootstrap_set_get(nodes_nr): @pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(2, 5)]) @pytest.mark.asyncio async def test_multiple_nodes_set_bootstrap_get(nodes_nr): - nodes_nr = 25 - node_bootstrap = Server() - await node_bootstrap.listen(5678) - + await node_bootstrap.listen(2000 + nodes_nr * 2) + nodes = [] for i in range(nodes_nr): node = Server() - addrs = [("127.0.0.1", 5678)] - await node.listen(5679 + i) + addrs = [("127.0.0.1", 2000 + nodes_nr * 2)] + await node.listen(2001 + i + nodes_nr * 2) await node.bootstrap(addrs) value = "my awesome value %d" % i From 3c6ad886bb024076f41e7701ceb7885e12d6b55c Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 17 Apr 2019 21:21:17 -0400 Subject: [PATCH 07/10] add dependencies --- setup.py | 2 ++ tests/kademlia/test_basic.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index abaa1c7..d344892 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,8 @@ setuptools.setup( "base58", "pymultihash", "multiaddr", + "rpcudp", + "umsgpack" ], packages=["libp2p"], zip_safe=False, diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py index 82014a9..ede9022 100644 --- a/tests/kademlia/test_basic.py +++ b/tests/kademlia/test_basic.py @@ -31,13 +31,13 @@ async def test_example(): async def test_multiple_nodes_bootstrap_set_get(nodes_nr): node_bootstrap = Server() - await node_bootstrap.listen(1000 + nodes_nr * 2) + await node_bootstrap.listen(3000 + nodes_nr * 2) nodes = [] for i in range(nodes_nr): node = Server() - addrs = [("127.0.0.1", 1000 + nodes_nr * 2)] - await node.listen(1001 + i + nodes_nr * 2) + addrs = [("127.0.0.1", 3000 + nodes_nr * 2)] + await node.listen(3001 + i + nodes_nr * 2) await node.bootstrap(addrs) nodes.append(node) From 002a46f0d43a809eb0f9f679dc46f4463c0ec010 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 17 Apr 2019 21:25:02 -0400 Subject: [PATCH 08/10] add dependencies --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0bf8ffd..3485e2a 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ setuptools.setup( "pymultihash", "multiaddr", "rpcudp", - "umsgpack" + "umsgpack", "grpcio", "grpcio-tools", "lru-dict>=1.1.6" From 9906e23b55063b11ee5f52795f7fbba079968bcf Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 17 Apr 2019 21:39:30 -0400 Subject: [PATCH 09/10] fixed linting --- tests/kademlia/test_basic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py index ede9022..d65e2d9 100644 --- a/tests/kademlia/test_basic.py +++ b/tests/kademlia/test_basic.py @@ -1,6 +1,5 @@ import pytest from libp2p.kademlia.network import Server -import math @pytest.mark.asyncio From 1228b11bc9190ac4b19884908988e64c8270cc2e Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Wed, 17 Apr 2019 21:44:45 -0400 Subject: [PATCH 10/10] Update test_basic.py --- tests/kademlia/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kademlia/test_basic.py b/tests/kademlia/test_basic.py index d65e2d9..bb140b3 100644 --- a/tests/kademlia/test_basic.py +++ b/tests/kademlia/test_basic.py @@ -58,7 +58,7 @@ async def test_multiple_nodes_bootstrap_set_get(nodes_nr): async def test_multiple_nodes_set_bootstrap_get(nodes_nr): node_bootstrap = Server() await node_bootstrap.listen(2000 + nodes_nr * 2) - + nodes = [] for i in range(nodes_nr): node = Server()