diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 6006bef..6a3b5fe 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -1,7 +1,7 @@ from collections import Counter import logging -from .node import Node, NodeHeap +from .kad_peerinfo import KadPeerHeap, create_kad_peerinfo from .utils import gather_dict @@ -32,7 +32,7 @@ class SpiderCrawl: self.ksize = ksize self.alpha = alpha self.node = node - self.nearest = NodeHeap(self.node, self.ksize) + self.nearest = KadPeerHeap(self.node, self.ksize) self.last_ids_crawled = [] log.info("creating spider with peers: %s", peers) self.nearest.push(peers) @@ -61,7 +61,7 @@ class SpiderCrawl: dicts = {} for peer in self.nearest.get_uncontacted()[:count]: - dicts[peer.id] = rpcmethod(peer, self.node) + dicts[peer.peer_id] = rpcmethod(peer, self.node) self.nearest.mark_contacted(peer) found = await gather_dict(dicts) return await self._nodes_found(found) @@ -76,7 +76,7 @@ class ValueSpiderCrawl(SpiderCrawl): SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) # keep track of the single nearest node without value - per # section 2.3 so we can set the key there if found - self.nearest_without_value = NodeHeap(self.node, 1) + self.nearest_without_value = KadPeerHeap(self.node, 1) async def find(self): """ @@ -124,7 +124,7 @@ class ValueSpiderCrawl(SpiderCrawl): peer = self.nearest_without_value.popleft() if peer: - await self.protocol.call_store(peer, self.node.id, value) + await self.protocol.call_store(peer, self.node.peer_id, value) return value @@ -183,4 +183,4 @@ class RPCFindResponse: be set. """ nodelist = self.response[1] or [] - return [Node(*nodeple) for nodeple in nodelist] + return [create_kad_peerinfo(*nodeple) for nodeple in nodelist] diff --git a/libp2p/kademlia/node.py b/libp2p/kademlia/kad_peerinfo.py similarity index 59% rename from libp2p/kademlia/node.py rename to libp2p/kademlia/kad_peerinfo.py index 2f087f5..cd0bc60 100644 --- a/libp2p/kademlia/node.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -1,16 +1,35 @@ -from operator import itemgetter import heapq +import random +from operator import itemgetter +from multiaddr import Multiaddr +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.id import ID +from libp2p.peer.peerdata import PeerData +from .utils import digest + +P_IP = "ip4" +P_UDP = "udp" + +class KadPeerInfo(PeerInfo): + def __init__(self, peer_id, peer_data=None): + super(KadPeerInfo, self).__init__(peer_id, peer_data) + + # pylint: disable=protected-access + self.peer_id = peer_id._id_str + self.long_id = int(digest(peer_id._id_str).hex(), 16) + + self.addrs = peer_data.get_addrs() if peer_data else None + + # pylint: disable=invalid-name + self.ip = self.addrs[0].value_for_protocol(P_IP)\ + if peer_data else None + self.port = int(self.addrs[0].value_for_protocol(P_UDP))\ + if peer_data else None -class Node: - def __init__(self, node_id, ip=None, port=None): - self.id = node_id # pylint: disable=invalid-name - self.ip = ip # pylint: disable=invalid-name - self.port = port - self.long_id = int(node_id.hex(), 16) def same_home_as(self, node): - return self.ip == node.ip and self.port == node.port + return sorted(self.addrs) == sorted(node.addrs) def distance_to(self, node): """ @@ -22,7 +41,7 @@ class Node: """ Enables use of Node as a tuple - i.e., tuple(node) works. """ - return iter([self.id, self.ip, self.port]) + return iter([self.peer_id, self.ip, self.port]) def __repr__(self): return repr([self.long_id, self.ip, self.port]) @@ -30,10 +49,9 @@ class Node: def __str__(self): return "%s:%s" % (self.ip, str(self.port)) - -class NodeHeap: +class KadPeerHeap: """ - A heap of nodes ordered by distance to a given node. + A heap of peers ordered by distance to a given node. """ def __init__(self, node, maxsize): """ @@ -60,13 +78,13 @@ class NodeHeap: return nheap = [] for distance, node in self.heap: - if node.id not in peers: + if node.peer_id not in peers: heapq.heappush(nheap, (distance, node)) self.heap = nheap def get_node(self, node_id): for _, node in self.heap: - if node.id == node_id: + if node.peer_id == node_id: return node return None @@ -74,10 +92,10 @@ class NodeHeap: return len(self.get_uncontacted()) == 0 def get_ids(self): - return [n.id for n in self] + return [n.peer_id for n in self] def mark_contacted(self, node): - self.contacted.add(node.id) + self.contacted.add(node.peer_id) def popleft(self): return heapq.heappop(self.heap)[1] if self else None @@ -105,9 +123,20 @@ class NodeHeap: def __contains__(self, node): for _, other in self.heap: - if node.id == other.id: + if node.peer_id == other.peer_id: return True return False def get_uncontacted(self): - return [n for n in self if n.id not in self.contacted] + return [n for n in self if n.peer_id not in self.contacted] + +def create_kad_peerinfo(raw_node_id=None, sender_ip=None, sender_port=None): + node_id = ID(raw_node_id) if raw_node_id else ID(digest(random.getrandbits(255))) + peer_data = None + if sender_ip and sender_port: + peer_data = PeerData() #pylint: disable=no-value-for-parameter + addr = [Multiaddr("/"+ P_IP +"/" + str(sender_ip) + "/"\ + + P_UDP + "/" + str(sender_port))] + peer_data.add_addrs(addr) + + return KadPeerInfo(node_id, peer_data) diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index bbfbad6..30215a0 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -1,7 +1,6 @@ """ Package for interacting on the network at a high level. """ -import random import pickle import asyncio import logging @@ -9,7 +8,7 @@ import logging from .protocol import KademliaProtocol from .utils import digest from .storage import ForgetfulStorage -from .node import Node +from .kad_peerinfo import create_kad_peerinfo from .crawling import ValueSpiderCrawl from .crawling import NodeSpiderCrawl @@ -39,7 +38,7 @@ class Server: self.ksize = ksize self.alpha = alpha self.storage = storage or ForgetfulStorage() - self.node = Node(node_id or digest(random.getrandbits(255))) + self.node = create_kad_peerinfo(node_id) self.transport = None self.protocol = None self.refresh_loop = None @@ -86,7 +85,7 @@ class Server: """ results = [] for node_id in self.protocol.get_refresh_ids(): - node = Node(node_id) + node = create_kad_peerinfo(node_id) nearest = self.protocol.router.find_neighbors(node, self.alpha) spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) @@ -130,8 +129,8 @@ class Server: return await spider.find() async def bootstrap_node(self, addr): - result = await self.protocol.ping(addr, self.node.id) - return Node(result[1], addr[0], addr[1]) if result[0] else None + result = await self.protocol.ping(addr, self.node.peer_id) + return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None async def get(self, key): """ @@ -145,7 +144,8 @@ class Server: # if this node has it, return it if self.storage.get(dkey) is not None: return self.storage.get(dkey) - node = Node(dkey) + + node = create_kad_peerinfo(dkey) nearest = self.protocol.router.find_neighbors(node) if not nearest: log.warning("There are no known neighbors to get key %s", key) @@ -171,7 +171,7 @@ class Server: Set the given SHA1 digest key (bytes) to the given value in the network. """ - node = Node(dkey) + node = create_kad_peerinfo(dkey) nearest = self.protocol.router.find_neighbors(node) if not nearest: @@ -201,7 +201,7 @@ class Server: data = { 'ksize': self.ksize, 'alpha': self.alpha, - 'id': self.node.id, + 'id': self.node.peer_id, 'neighbors': self.bootstrappable_neighbors() } if not data['neighbors']: diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 5273230..b0eea2b 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -3,10 +3,9 @@ import asyncio import logging from rpcudp.protocol import RPCProtocol - -from .node import Node +from .kad_peerinfo import create_kad_peerinfo from .routing import RoutingTable -from .utils import digest + log = logging.getLogger(__name__) # pylint: disable=invalid-name @@ -47,12 +46,14 @@ class KademliaProtocol(RPCProtocol): return sender def rpc_ping(self, sender, nodeid): - source = Node(nodeid, sender[0], sender[1]) + source = create_kad_peerinfo(nodeid, sender[0], sender[1]) + self.welcome_if_new(source) - return self.source_node.id + return self.source_node.peer_id def rpc_store(self, sender, nodeid, key, value): - source = Node(nodeid, sender[0], sender[1]) + source = create_kad_peerinfo(nodeid, sender[0], sender[1]) + self.welcome_if_new(source) log.debug("got a store request from %s, storing '%s'='%s'", sender, key.hex(), value) @@ -62,14 +63,16 @@ class KademliaProtocol(RPCProtocol): def rpc_find_node(self, sender, nodeid, key): log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16)) - source = Node(nodeid, sender[0], sender[1]) + source = create_kad_peerinfo(nodeid, sender[0], sender[1]) + self.welcome_if_new(source) - node = Node(key) + node = create_kad_peerinfo(key) neighbors = self.router.find_neighbors(node, exclude=source) return list(map(tuple, neighbors)) def rpc_find_value(self, sender, nodeid, key): - source = Node(nodeid, sender[0], sender[1]) + source = create_kad_peerinfo(nodeid, sender[0], sender[1]) + self.welcome_if_new(source) value = self.storage.get(key, None) if value is None: @@ -78,24 +81,24 @@ class KademliaProtocol(RPCProtocol): async def call_find_node(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_node(address, self.source_node.id, - node_to_find.id) + result = await self.find_node(address, self.source_node.peer_id, + node_to_find.peer_id) return self.handle_call_response(result, node_to_ask) async def call_find_value(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) - result = await self.find_value(address, self.source_node.id, - node_to_find.id) + result = await self.find_value(address, self.source_node.peer_id, + node_to_find.peer_id) return self.handle_call_response(result, node_to_ask) async def call_ping(self, node_to_ask): address = (node_to_ask.ip, node_to_ask.port) - result = await self.ping(address, self.source_node.id) + result = await self.ping(address, self.source_node.peer_id) return self.handle_call_response(result, node_to_ask) async def call_store(self, node_to_ask, key, value): address = (node_to_ask.ip, node_to_ask.port) - result = await self.store(address, self.source_node.id, key, value) + result = await self.store(address, self.source_node.peer_id, key, value) return self.handle_call_response(result, node_to_ask) def welcome_if_new(self, node): @@ -117,7 +120,7 @@ class KademliaProtocol(RPCProtocol): log.info("never seen %s before, adding to router", node) for key, value in self.storage: - keynode = Node(digest(key)) + keynode = create_kad_peerinfo(key) neighbors = self.router.find_neighbors(keynode) if neighbors: last = neighbors[-1].distance_to(keynode) diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index 3f88e8b..7d97494 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -34,24 +34,24 @@ class KBucket: two = KBucket(midpoint + 1, self.range[1], self.ksize) for node in self.nodes.values(): bucket = one if node.long_id <= midpoint else two - bucket.nodes[node.id] = node + bucket.nodes[node.peer_id] = node return (one, two) def remove_node(self, node): - if node.id not in self.nodes: + if node.peer_id not in self.nodes: return # delete node, and see if we can add a replacement - del self.nodes[node.id] + del self.nodes[node.peer_id] if self.replacement_nodes: newnode = self.replacement_nodes.pop() - self.nodes[newnode.id] = newnode + self.nodes[newnode.peer_id] = newnode def has_in_range(self, node): return self.range[0] <= node.long_id <= self.range[1] def is_new_node(self, node): - return node.id not in self.nodes + return node.peer_id not in self.nodes def add_node(self, node): """ @@ -61,11 +61,11 @@ class KBucket: If the bucket is full, keep track of node in a replacement list, per section 4.1 of the paper. """ - if node.id in self.nodes: - del self.nodes[node.id] - self.nodes[node.id] = node + if node.peer_id in self.nodes: + del self.nodes[node.peer_id] + self.nodes[node.peer_id] = node elif len(self) < self.ksize: - self.nodes[node.id] = node + self.nodes[node.peer_id] = node else: self.replacement_nodes.push(node) return False @@ -73,7 +73,7 @@ class KBucket: def depth(self): vals = self.nodes.values() - sprefix = shared_prefix([bytes_to_bit_string(n.id) for n in vals]) + sprefix = shared_prefix([bytes_to_bit_string(n.peer_id) for n in vals]) return len(sprefix) def head(self): @@ -185,7 +185,7 @@ class RoutingTable: nodes = [] for neighbor in TableTraverser(self, node): notexcluded = exclude is None or not neighbor.same_home_as(exclude) - if neighbor.id != node.id and notexcluded: + if neighbor.peer_id != node.peer_id and notexcluded: heapq.heappush(nodes, (node.distance_to(neighbor), neighbor)) if len(nodes) == k: break diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index 44cba56..2e7edb9 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -7,9 +7,9 @@ from .peerdata import PeerData class PeerInfo: # pylint: disable=too-few-public-methods - def __init__(self, peer_id, peer_data): + def __init__(self, peer_id, peer_data=None): self.peer_id = peer_id - self.addrs = peer_data.get_addrs() + self.addrs = peer_data.get_addrs() if peer_data else None def info_from_p2p_addr(addr):