Merge pull request #152 from libp2p/node-refactor

Node refactor
This commit is contained in:
ZX 2019-04-19 21:09:44 -04:00 committed by GitHub
commit 367a939087
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 62 deletions

View File

@ -1,7 +1,7 @@
from collections import Counter from collections import Counter
import logging import logging
from .node import Node, NodeHeap from .kad_peerinfo import KadPeerHeap, create_kad_peerinfo
from .utils import gather_dict from .utils import gather_dict
@ -32,7 +32,7 @@ class SpiderCrawl:
self.ksize = ksize self.ksize = ksize
self.alpha = alpha self.alpha = alpha
self.node = node self.node = node
self.nearest = NodeHeap(self.node, self.ksize) self.nearest = KadPeerHeap(self.node, self.ksize)
self.last_ids_crawled = [] self.last_ids_crawled = []
log.info("creating spider with peers: %s", peers) log.info("creating spider with peers: %s", peers)
self.nearest.push(peers) self.nearest.push(peers)
@ -61,7 +61,7 @@ class SpiderCrawl:
dicts = {} dicts = {}
for peer in self.nearest.get_uncontacted()[:count]: for peer in self.nearest.get_uncontacted()[:count]:
dicts[peer.id] = rpcmethod(peer, self.node) dicts[peer.peer_id] = rpcmethod(peer, self.node)
self.nearest.mark_contacted(peer) self.nearest.mark_contacted(peer)
found = await gather_dict(dicts) found = await gather_dict(dicts)
return await self._nodes_found(found) return await self._nodes_found(found)
@ -76,7 +76,7 @@ class ValueSpiderCrawl(SpiderCrawl):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per # keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found # section 2.3 so we can set the key there if found
self.nearest_without_value = NodeHeap(self.node, 1) self.nearest_without_value = KadPeerHeap(self.node, 1)
async def find(self): async def find(self):
""" """
@ -124,7 +124,7 @@ class ValueSpiderCrawl(SpiderCrawl):
peer = self.nearest_without_value.popleft() peer = self.nearest_without_value.popleft()
if peer: if peer:
await self.protocol.call_store(peer, self.node.id, value) await self.protocol.call_store(peer, self.node.peer_id, value)
return value return value
@ -183,4 +183,4 @@ class RPCFindResponse:
be set. be set.
""" """
nodelist = self.response[1] or [] nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist] return [create_kad_peerinfo(*nodeple) for nodeple in nodelist]

View File

@ -1,16 +1,35 @@
from operator import itemgetter
import heapq import heapq
import random
from operator import itemgetter
from multiaddr import Multiaddr
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.id import ID
from libp2p.peer.peerdata import PeerData
from .utils import digest
P_IP = "ip4"
P_UDP = "udp"
class KadPeerInfo(PeerInfo):
def __init__(self, peer_id, peer_data=None):
super(KadPeerInfo, self).__init__(peer_id, peer_data)
# pylint: disable=protected-access
self.peer_id = peer_id._id_str
self.long_id = int(digest(peer_id._id_str).hex(), 16)
self.addrs = peer_data.get_addrs() if peer_data else None
# pylint: disable=invalid-name
self.ip = self.addrs[0].value_for_protocol(P_IP)\
if peer_data else None
self.port = int(self.addrs[0].value_for_protocol(P_UDP))\
if peer_data else None
class Node:
def __init__(self, node_id, ip=None, port=None):
self.id = node_id # pylint: disable=invalid-name
self.ip = ip # pylint: disable=invalid-name
self.port = port
self.long_id = int(node_id.hex(), 16)
def same_home_as(self, node): def same_home_as(self, node):
return self.ip == node.ip and self.port == node.port return sorted(self.addrs) == sorted(node.addrs)
def distance_to(self, node): def distance_to(self, node):
""" """
@ -22,7 +41,7 @@ class Node:
""" """
Enables use of Node as a tuple - i.e., tuple(node) works. Enables use of Node as a tuple - i.e., tuple(node) works.
""" """
return iter([self.id, self.ip, self.port]) return iter([self.peer_id, self.ip, self.port])
def __repr__(self): def __repr__(self):
return repr([self.long_id, self.ip, self.port]) return repr([self.long_id, self.ip, self.port])
@ -30,10 +49,9 @@ class Node:
def __str__(self): def __str__(self):
return "%s:%s" % (self.ip, str(self.port)) return "%s:%s" % (self.ip, str(self.port))
class KadPeerHeap:
class NodeHeap:
""" """
A heap of nodes ordered by distance to a given node. A heap of peers ordered by distance to a given node.
""" """
def __init__(self, node, maxsize): def __init__(self, node, maxsize):
""" """
@ -60,13 +78,13 @@ class NodeHeap:
return return
nheap = [] nheap = []
for distance, node in self.heap: for distance, node in self.heap:
if node.id not in peers: if node.peer_id not in peers:
heapq.heappush(nheap, (distance, node)) heapq.heappush(nheap, (distance, node))
self.heap = nheap self.heap = nheap
def get_node(self, node_id): def get_node(self, node_id):
for _, node in self.heap: for _, node in self.heap:
if node.id == node_id: if node.peer_id == node_id:
return node return node
return None return None
@ -74,10 +92,10 @@ class NodeHeap:
return len(self.get_uncontacted()) == 0 return len(self.get_uncontacted()) == 0
def get_ids(self): def get_ids(self):
return [n.id for n in self] return [n.peer_id for n in self]
def mark_contacted(self, node): def mark_contacted(self, node):
self.contacted.add(node.id) self.contacted.add(node.peer_id)
def popleft(self): def popleft(self):
return heapq.heappop(self.heap)[1] if self else None return heapq.heappop(self.heap)[1] if self else None
@ -105,9 +123,20 @@ class NodeHeap:
def __contains__(self, node): def __contains__(self, node):
for _, other in self.heap: for _, other in self.heap:
if node.id == other.id: if node.peer_id == other.peer_id:
return True return True
return False return False
def get_uncontacted(self): def get_uncontacted(self):
return [n for n in self if n.id not in self.contacted] return [n for n in self if n.peer_id not in self.contacted]
def create_kad_peerinfo(raw_node_id=None, sender_ip=None, sender_port=None):
node_id = ID(raw_node_id) if raw_node_id else ID(digest(random.getrandbits(255)))
peer_data = None
if sender_ip and sender_port:
peer_data = PeerData() #pylint: disable=no-value-for-parameter
addr = [Multiaddr("/"+ P_IP +"/" + str(sender_ip) + "/"\
+ P_UDP + "/" + str(sender_port))]
peer_data.add_addrs(addr)
return KadPeerInfo(node_id, peer_data)

View File

@ -1,7 +1,6 @@
""" """
Package for interacting on the network at a high level. Package for interacting on the network at a high level.
""" """
import random
import pickle import pickle
import asyncio import asyncio
import logging import logging
@ -9,7 +8,7 @@ import logging
from .protocol import KademliaProtocol from .protocol import KademliaProtocol
from .utils import digest from .utils import digest
from .storage import ForgetfulStorage from .storage import ForgetfulStorage
from .node import Node from .kad_peerinfo import create_kad_peerinfo
from .crawling import ValueSpiderCrawl from .crawling import ValueSpiderCrawl
from .crawling import NodeSpiderCrawl from .crawling import NodeSpiderCrawl
@ -39,7 +38,7 @@ class Server:
self.ksize = ksize self.ksize = ksize
self.alpha = alpha self.alpha = alpha
self.storage = storage or ForgetfulStorage() self.storage = storage or ForgetfulStorage()
self.node = Node(node_id or digest(random.getrandbits(255))) self.node = create_kad_peerinfo(node_id)
self.transport = None self.transport = None
self.protocol = None self.protocol = None
self.refresh_loop = None self.refresh_loop = None
@ -86,7 +85,7 @@ class Server:
""" """
results = [] results = []
for node_id in self.protocol.get_refresh_ids(): for node_id in self.protocol.get_refresh_ids():
node = Node(node_id) node = create_kad_peerinfo(node_id)
nearest = self.protocol.router.find_neighbors(node, self.alpha) nearest = self.protocol.router.find_neighbors(node, self.alpha)
spider = NodeSpiderCrawl(self.protocol, node, nearest, spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha) self.ksize, self.alpha)
@ -130,8 +129,8 @@ class Server:
return await spider.find() return await spider.find()
async def bootstrap_node(self, addr): async def bootstrap_node(self, addr):
result = await self.protocol.ping(addr, self.node.id) result = await self.protocol.ping(addr, self.node.peer_id)
return Node(result[1], addr[0], addr[1]) if result[0] else None return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None
async def get(self, key): async def get(self, key):
""" """
@ -145,7 +144,8 @@ class Server:
# if this node has it, return it # if this node has it, return it
if self.storage.get(dkey) is not None: if self.storage.get(dkey) is not None:
return self.storage.get(dkey) return self.storage.get(dkey)
node = Node(dkey)
node = create_kad_peerinfo(dkey)
nearest = self.protocol.router.find_neighbors(node) nearest = self.protocol.router.find_neighbors(node)
if not nearest: if not nearest:
log.warning("There are no known neighbors to get key %s", key) log.warning("There are no known neighbors to get key %s", key)
@ -171,7 +171,7 @@ class Server:
Set the given SHA1 digest key (bytes) to the given value in the Set the given SHA1 digest key (bytes) to the given value in the
network. network.
""" """
node = Node(dkey) node = create_kad_peerinfo(dkey)
nearest = self.protocol.router.find_neighbors(node) nearest = self.protocol.router.find_neighbors(node)
if not nearest: if not nearest:
@ -201,7 +201,7 @@ class Server:
data = { data = {
'ksize': self.ksize, 'ksize': self.ksize,
'alpha': self.alpha, 'alpha': self.alpha,
'id': self.node.id, 'id': self.node.peer_id,
'neighbors': self.bootstrappable_neighbors() 'neighbors': self.bootstrappable_neighbors()
} }
if not data['neighbors']: if not data['neighbors']:

View File

@ -3,10 +3,9 @@ import asyncio
import logging import logging
from rpcudp.protocol import RPCProtocol from rpcudp.protocol import RPCProtocol
from .kad_peerinfo import create_kad_peerinfo
from .node import Node
from .routing import RoutingTable from .routing import RoutingTable
from .utils import digest
log = logging.getLogger(__name__) # pylint: disable=invalid-name log = logging.getLogger(__name__) # pylint: disable=invalid-name
@ -47,12 +46,14 @@ class KademliaProtocol(RPCProtocol):
return sender return sender
def rpc_ping(self, sender, nodeid): def rpc_ping(self, sender, nodeid):
source = Node(nodeid, sender[0], sender[1]) source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source) self.welcome_if_new(source)
return self.source_node.id return self.source_node.peer_id
def rpc_store(self, sender, nodeid, key, value): def rpc_store(self, sender, nodeid, key, value):
source = Node(nodeid, sender[0], sender[1]) source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source) self.welcome_if_new(source)
log.debug("got a store request from %s, storing '%s'='%s'", log.debug("got a store request from %s, storing '%s'='%s'",
sender, key.hex(), value) sender, key.hex(), value)
@ -62,14 +63,16 @@ class KademliaProtocol(RPCProtocol):
def rpc_find_node(self, sender, nodeid, key): def rpc_find_node(self, sender, nodeid, key):
log.info("finding neighbors of %i in local table", log.info("finding neighbors of %i in local table",
int(nodeid.hex(), 16)) int(nodeid.hex(), 16))
source = Node(nodeid, sender[0], sender[1]) source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source) self.welcome_if_new(source)
node = Node(key) node = create_kad_peerinfo(key)
neighbors = self.router.find_neighbors(node, exclude=source) neighbors = self.router.find_neighbors(node, exclude=source)
return list(map(tuple, neighbors)) return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key): def rpc_find_value(self, sender, nodeid, key):
source = Node(nodeid, sender[0], sender[1]) source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source) self.welcome_if_new(source)
value = self.storage.get(key, None) value = self.storage.get(key, None)
if value is None: if value is None:
@ -78,24 +81,24 @@ class KademliaProtocol(RPCProtocol):
async def call_find_node(self, node_to_ask, node_to_find): async def call_find_node(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_node(address, self.source_node.id, result = await self.find_node(address, self.source_node.peer_id,
node_to_find.id) node_to_find.peer_id)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
async def call_find_value(self, node_to_ask, node_to_find): async def call_find_value(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.find_value(address, self.source_node.id, result = await self.find_value(address, self.source_node.peer_id,
node_to_find.id) node_to_find.peer_id)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
async def call_ping(self, node_to_ask): async def call_ping(self, node_to_ask):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.ping(address, self.source_node.id) result = await self.ping(address, self.source_node.peer_id)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
async def call_store(self, node_to_ask, key, value): async def call_store(self, node_to_ask, key, value):
address = (node_to_ask.ip, node_to_ask.port) address = (node_to_ask.ip, node_to_ask.port)
result = await self.store(address, self.source_node.id, key, value) result = await self.store(address, self.source_node.peer_id, key, value)
return self.handle_call_response(result, node_to_ask) return self.handle_call_response(result, node_to_ask)
def welcome_if_new(self, node): def welcome_if_new(self, node):
@ -117,7 +120,7 @@ class KademliaProtocol(RPCProtocol):
log.info("never seen %s before, adding to router", node) log.info("never seen %s before, adding to router", node)
for key, value in self.storage: for key, value in self.storage:
keynode = Node(digest(key)) keynode = create_kad_peerinfo(key)
neighbors = self.router.find_neighbors(keynode) neighbors = self.router.find_neighbors(keynode)
if neighbors: if neighbors:
last = neighbors[-1].distance_to(keynode) last = neighbors[-1].distance_to(keynode)

View File

@ -34,24 +34,24 @@ class KBucket:
two = KBucket(midpoint + 1, self.range[1], self.ksize) two = KBucket(midpoint + 1, self.range[1], self.ksize)
for node in self.nodes.values(): for node in self.nodes.values():
bucket = one if node.long_id <= midpoint else two bucket = one if node.long_id <= midpoint else two
bucket.nodes[node.id] = node bucket.nodes[node.peer_id] = node
return (one, two) return (one, two)
def remove_node(self, node): def remove_node(self, node):
if node.id not in self.nodes: if node.peer_id not in self.nodes:
return return
# delete node, and see if we can add a replacement # delete node, and see if we can add a replacement
del self.nodes[node.id] del self.nodes[node.peer_id]
if self.replacement_nodes: if self.replacement_nodes:
newnode = self.replacement_nodes.pop() newnode = self.replacement_nodes.pop()
self.nodes[newnode.id] = newnode self.nodes[newnode.peer_id] = newnode
def has_in_range(self, node): def has_in_range(self, node):
return self.range[0] <= node.long_id <= self.range[1] return self.range[0] <= node.long_id <= self.range[1]
def is_new_node(self, node): def is_new_node(self, node):
return node.id not in self.nodes return node.peer_id not in self.nodes
def add_node(self, node): def add_node(self, node):
""" """
@ -61,11 +61,11 @@ class KBucket:
If the bucket is full, keep track of node in a replacement list, If the bucket is full, keep track of node in a replacement list,
per section 4.1 of the paper. per section 4.1 of the paper.
""" """
if node.id in self.nodes: if node.peer_id in self.nodes:
del self.nodes[node.id] del self.nodes[node.peer_id]
self.nodes[node.id] = node self.nodes[node.peer_id] = node
elif len(self) < self.ksize: elif len(self) < self.ksize:
self.nodes[node.id] = node self.nodes[node.peer_id] = node
else: else:
self.replacement_nodes.push(node) self.replacement_nodes.push(node)
return False return False
@ -73,7 +73,7 @@ class KBucket:
def depth(self): def depth(self):
vals = self.nodes.values() vals = self.nodes.values()
sprefix = shared_prefix([bytes_to_bit_string(n.id) for n in vals]) sprefix = shared_prefix([bytes_to_bit_string(n.peer_id) for n in vals])
return len(sprefix) return len(sprefix)
def head(self): def head(self):
@ -185,7 +185,7 @@ class RoutingTable:
nodes = [] nodes = []
for neighbor in TableTraverser(self, node): for neighbor in TableTraverser(self, node):
notexcluded = exclude is None or not neighbor.same_home_as(exclude) notexcluded = exclude is None or not neighbor.same_home_as(exclude)
if neighbor.id != node.id and notexcluded: if neighbor.peer_id != node.peer_id and notexcluded:
heapq.heappush(nodes, (node.distance_to(neighbor), neighbor)) heapq.heappush(nodes, (node.distance_to(neighbor), neighbor))
if len(nodes) == k: if len(nodes) == k:
break break

View File

@ -7,9 +7,9 @@ from .peerdata import PeerData
class PeerInfo: class PeerInfo:
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, peer_id, peer_data): def __init__(self, peer_id, peer_data=None):
self.peer_id = peer_id self.peer_id = peer_id
self.addrs = peer_data.get_addrs() self.addrs = peer_data.get_addrs() if peer_data else None
def info_from_p2p_addr(addr): def info_from_p2p_addr(addr):