Merge branch 'node-refactor' of https://github.com/libp2p/py-libp2p into node-refactor
This commit is contained in:
commit
3fa6a165b3
|
@ -1,7 +1,7 @@
|
|||
from collections import Counter
|
||||
import logging
|
||||
|
||||
from .node import Node, NodeHeap
|
||||
from .kad_peerinfo import KadPeerHeap, create_kad_peerinfo
|
||||
from .utils import gather_dict
|
||||
|
||||
|
||||
|
@ -32,7 +32,7 @@ class SpiderCrawl:
|
|||
self.ksize = ksize
|
||||
self.alpha = alpha
|
||||
self.node = node
|
||||
self.nearest = NodeHeap(self.node, self.ksize)
|
||||
self.nearest = KadPeerHeap(self.node, self.ksize)
|
||||
self.last_ids_crawled = []
|
||||
log.info("creating spider with peers: %s", peers)
|
||||
self.nearest.push(peers)
|
||||
|
@ -61,7 +61,7 @@ class SpiderCrawl:
|
|||
|
||||
dicts = {}
|
||||
for peer in self.nearest.get_uncontacted()[:count]:
|
||||
dicts[peer.id] = rpcmethod(peer, self.node)
|
||||
dicts[peer.peer_id] = rpcmethod(peer, self.node)
|
||||
self.nearest.mark_contacted(peer)
|
||||
found = await gather_dict(dicts)
|
||||
return await self._nodes_found(found)
|
||||
|
@ -76,7 +76,7 @@ class ValueSpiderCrawl(SpiderCrawl):
|
|||
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
|
||||
# keep track of the single nearest node without value - per
|
||||
# section 2.3 so we can set the key there if found
|
||||
self.nearest_without_value = NodeHeap(self.node, 1)
|
||||
self.nearest_without_value = KadPeerHeap(self.node, 1)
|
||||
|
||||
async def find(self):
|
||||
"""
|
||||
|
@ -124,7 +124,7 @@ class ValueSpiderCrawl(SpiderCrawl):
|
|||
|
||||
peer = self.nearest_without_value.popleft()
|
||||
if peer:
|
||||
await self.protocol.call_store(peer, self.node.id, value)
|
||||
await self.protocol.call_store(peer, self.node.peer_id, value)
|
||||
return value
|
||||
|
||||
|
||||
|
@ -183,4 +183,4 @@ class RPCFindResponse:
|
|||
be set.
|
||||
"""
|
||||
nodelist = self.response[1] or []
|
||||
return [Node(*nodeple) for nodeple in nodelist]
|
||||
return [create_kad_peerinfo(*nodeple) for nodeple in nodelist]
|
||||
|
|
|
@ -1,16 +1,36 @@
|
|||
from operator import itemgetter
|
||||
import heapq
|
||||
import random
|
||||
|
||||
from operator import itemgetter
|
||||
from multiaddr import Multiaddr
|
||||
from libp2p.peer.peerinfo import PeerInfo
|
||||
from libp2p.peer.id import ID
|
||||
from libp2p.peer.peerdata import PeerData
|
||||
from .utils import digest
|
||||
|
||||
P_IP = "ip4"
|
||||
P_UDP = "udp"
|
||||
|
||||
class KadPeerInfo(PeerInfo):
|
||||
def __init__(self, peer_id, peer_data=None):
|
||||
super(KadPeerInfo, self).__init__(peer_id, peer_data)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
self.peer_id = peer_id._id_str
|
||||
self.long_id = int(digest(peer_id._id_str).hex(), 16)
|
||||
|
||||
self.addrs = peer_data.get_addrs() if peer_data else None
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
self.ip = self.addrs[0].value_for_protocol(P_IP)\
|
||||
if peer_data else None
|
||||
self.port = int(self.addrs[0].value_for_protocol(P_UDP))\
|
||||
if peer_data else None
|
||||
|
||||
class Node:
|
||||
def __init__(self, node_id, ip=None, port=None):
|
||||
self.id = node_id # pylint: disable=invalid-name
|
||||
self.ip = ip # pylint: disable=invalid-name
|
||||
self.port = port
|
||||
self.long_id = int(node_id.hex(), 16)
|
||||
|
||||
def same_home_as(self, node):
|
||||
return self.ip == node.ip and self.port == node.port
|
||||
#TODO: handle more than one addr
|
||||
return self.addrs[0] == node.addrs[0]
|
||||
|
||||
def distance_to(self, node):
|
||||
"""
|
||||
|
@ -22,7 +42,7 @@ class Node:
|
|||
"""
|
||||
Enables use of Node as a tuple - i.e., tuple(node) works.
|
||||
"""
|
||||
return iter([self.id, self.ip, self.port])
|
||||
return iter([self.peer_id, self.ip, self.port])
|
||||
|
||||
def __repr__(self):
|
||||
return repr([self.long_id, self.ip, self.port])
|
||||
|
@ -30,10 +50,9 @@ class Node:
|
|||
def __str__(self):
|
||||
return "%s:%s" % (self.ip, str(self.port))
|
||||
|
||||
|
||||
class NodeHeap:
|
||||
class KadPeerHeap:
|
||||
"""
|
||||
A heap of nodes ordered by distance to a given node.
|
||||
A heap of peers ordered by distance to a given node.
|
||||
"""
|
||||
def __init__(self, node, maxsize):
|
||||
"""
|
||||
|
@ -60,13 +79,13 @@ class NodeHeap:
|
|||
return
|
||||
nheap = []
|
||||
for distance, node in self.heap:
|
||||
if node.id not in peers:
|
||||
if node.peer_id not in peers:
|
||||
heapq.heappush(nheap, (distance, node))
|
||||
self.heap = nheap
|
||||
|
||||
def get_node(self, node_id):
|
||||
for _, node in self.heap:
|
||||
if node.id == node_id:
|
||||
if node.peer_id == node_id:
|
||||
return node
|
||||
return None
|
||||
|
||||
|
@ -74,10 +93,10 @@ class NodeHeap:
|
|||
return len(self.get_uncontacted()) == 0
|
||||
|
||||
def get_ids(self):
|
||||
return [n.id for n in self]
|
||||
return [n.peer_id for n in self]
|
||||
|
||||
def mark_contacted(self, node):
|
||||
self.contacted.add(node.id)
|
||||
self.contacted.add(node.peer_id)
|
||||
|
||||
def popleft(self):
|
||||
return heapq.heappop(self.heap)[1] if self else None
|
||||
|
@ -105,9 +124,20 @@ class NodeHeap:
|
|||
|
||||
def __contains__(self, node):
|
||||
for _, other in self.heap:
|
||||
if node.id == other.id:
|
||||
if node.peer_id == other.peer_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_uncontacted(self):
|
||||
return [n for n in self if n.id not in self.contacted]
|
||||
return [n for n in self if n.peer_id not in self.contacted]
|
||||
|
||||
def create_kad_peerinfo(raw_node_id=None, sender_ip=None, sender_port=None):
|
||||
node_id = ID(raw_node_id) if raw_node_id else ID(digest(random.getrandbits(255)))
|
||||
peer_data = None
|
||||
if sender_ip and sender_port:
|
||||
peer_data = PeerData() #pylint: disable=no-value-for-parameter
|
||||
addr = [Multiaddr("/"+ P_IP +"/" + str(sender_ip) + "/"\
|
||||
+ P_UDP + "/" + str(sender_port))]
|
||||
peer_data.add_addrs(addr)
|
||||
|
||||
return KadPeerInfo(node_id, peer_data)
|
|
@ -1,7 +1,6 @@
|
|||
"""
|
||||
Package for interacting on the network at a high level.
|
||||
"""
|
||||
import random
|
||||
import pickle
|
||||
import asyncio
|
||||
import logging
|
||||
|
@ -9,7 +8,7 @@ import logging
|
|||
from .protocol import KademliaProtocol
|
||||
from .utils import digest
|
||||
from .storage import ForgetfulStorage
|
||||
from .node import Node
|
||||
from .kad_peerinfo import create_kad_peerinfo
|
||||
from .crawling import ValueSpiderCrawl
|
||||
from .crawling import NodeSpiderCrawl
|
||||
|
||||
|
@ -39,7 +38,7 @@ class Server:
|
|||
self.ksize = ksize
|
||||
self.alpha = alpha
|
||||
self.storage = storage or ForgetfulStorage()
|
||||
self.node = Node(node_id or digest(random.getrandbits(255)))
|
||||
self.node = create_kad_peerinfo(node_id)
|
||||
self.transport = None
|
||||
self.protocol = None
|
||||
self.refresh_loop = None
|
||||
|
@ -86,7 +85,7 @@ class Server:
|
|||
"""
|
||||
results = []
|
||||
for node_id in self.protocol.get_refresh_ids():
|
||||
node = Node(node_id)
|
||||
node = create_kad_peerinfo(node_id)
|
||||
nearest = self.protocol.router.find_neighbors(node, self.alpha)
|
||||
spider = NodeSpiderCrawl(self.protocol, node, nearest,
|
||||
self.ksize, self.alpha)
|
||||
|
@ -130,8 +129,8 @@ class Server:
|
|||
return await spider.find()
|
||||
|
||||
async def bootstrap_node(self, addr):
|
||||
result = await self.protocol.ping(addr, self.node.id)
|
||||
return Node(result[1], addr[0], addr[1]) if result[0] else None
|
||||
result = await self.protocol.ping(addr, self.node.peer_id)
|
||||
return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None
|
||||
|
||||
async def get(self, key):
|
||||
"""
|
||||
|
@ -145,7 +144,8 @@ class Server:
|
|||
# if this node has it, return it
|
||||
if self.storage.get(dkey) is not None:
|
||||
return self.storage.get(dkey)
|
||||
node = Node(dkey)
|
||||
|
||||
node = create_kad_peerinfo(dkey)
|
||||
nearest = self.protocol.router.find_neighbors(node)
|
||||
if not nearest:
|
||||
log.warning("There are no known neighbors to get key %s", key)
|
||||
|
@ -171,7 +171,7 @@ class Server:
|
|||
Set the given SHA1 digest key (bytes) to the given value in the
|
||||
network.
|
||||
"""
|
||||
node = Node(dkey)
|
||||
node = create_kad_peerinfo(dkey)
|
||||
|
||||
nearest = self.protocol.router.find_neighbors(node)
|
||||
if not nearest:
|
||||
|
@ -201,7 +201,7 @@ class Server:
|
|||
data = {
|
||||
'ksize': self.ksize,
|
||||
'alpha': self.alpha,
|
||||
'id': self.node.id,
|
||||
'id': self.node.peer_id,
|
||||
'neighbors': self.bootstrappable_neighbors()
|
||||
}
|
||||
if not data['neighbors']:
|
||||
|
|
|
@ -3,10 +3,9 @@ import asyncio
|
|||
import logging
|
||||
|
||||
from rpcudp.protocol import RPCProtocol
|
||||
|
||||
from .node import Node
|
||||
from .kad_peerinfo import create_kad_peerinfo
|
||||
from .routing import RoutingTable
|
||||
from .utils import digest
|
||||
|
||||
|
||||
log = logging.getLogger(__name__) # pylint: disable=invalid-name
|
||||
|
||||
|
@ -47,12 +46,14 @@ class KademliaProtocol(RPCProtocol):
|
|||
return sender
|
||||
|
||||
def rpc_ping(self, sender, nodeid):
|
||||
source = Node(nodeid, sender[0], sender[1])
|
||||
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
|
||||
|
||||
self.welcome_if_new(source)
|
||||
return self.source_node.id
|
||||
return self.source_node.peer_id
|
||||
|
||||
def rpc_store(self, sender, nodeid, key, value):
|
||||
source = Node(nodeid, sender[0], sender[1])
|
||||
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
|
||||
|
||||
self.welcome_if_new(source)
|
||||
log.debug("got a store request from %s, storing '%s'='%s'",
|
||||
sender, key.hex(), value)
|
||||
|
@ -62,14 +63,17 @@ class KademliaProtocol(RPCProtocol):
|
|||
def rpc_find_node(self, sender, nodeid, key):
|
||||
log.info("finding neighbors of %i in local table",
|
||||
int(nodeid.hex(), 16))
|
||||
source = Node(nodeid, sender[0], sender[1])
|
||||
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
|
||||
|
||||
# source = Node(nodeid, sender[0], sender[1])
|
||||
self.welcome_if_new(source)
|
||||
node = Node(key)
|
||||
node = create_kad_peerinfo(key)
|
||||
neighbors = self.router.find_neighbors(node, exclude=source)
|
||||
return list(map(tuple, neighbors))
|
||||
|
||||
def rpc_find_value(self, sender, nodeid, key):
|
||||
source = Node(nodeid, sender[0], sender[1])
|
||||
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
|
||||
|
||||
self.welcome_if_new(source)
|
||||
value = self.storage.get(key, None)
|
||||
if value is None:
|
||||
|
@ -78,24 +82,24 @@ class KademliaProtocol(RPCProtocol):
|
|||
|
||||
async def call_find_node(self, node_to_ask, node_to_find):
|
||||
address = (node_to_ask.ip, node_to_ask.port)
|
||||
result = await self.find_node(address, self.source_node.id,
|
||||
node_to_find.id)
|
||||
result = await self.find_node(address, self.source_node.peer_id,
|
||||
node_to_find.peer_id)
|
||||
return self.handle_call_response(result, node_to_ask)
|
||||
|
||||
async def call_find_value(self, node_to_ask, node_to_find):
|
||||
address = (node_to_ask.ip, node_to_ask.port)
|
||||
result = await self.find_value(address, self.source_node.id,
|
||||
node_to_find.id)
|
||||
result = await self.find_value(address, self.source_node.peer_id,
|
||||
node_to_find.peer_id)
|
||||
return self.handle_call_response(result, node_to_ask)
|
||||
|
||||
async def call_ping(self, node_to_ask):
|
||||
address = (node_to_ask.ip, node_to_ask.port)
|
||||
result = await self.ping(address, self.source_node.id)
|
||||
result = await self.ping(address, self.source_node.peer_id)
|
||||
return self.handle_call_response(result, node_to_ask)
|
||||
|
||||
async def call_store(self, node_to_ask, key, value):
|
||||
address = (node_to_ask.ip, node_to_ask.port)
|
||||
result = await self.store(address, self.source_node.id, key, value)
|
||||
result = await self.store(address, self.source_node.peer_id, key, value)
|
||||
return self.handle_call_response(result, node_to_ask)
|
||||
|
||||
def welcome_if_new(self, node):
|
||||
|
@ -117,7 +121,7 @@ class KademliaProtocol(RPCProtocol):
|
|||
|
||||
log.info("never seen %s before, adding to router", node)
|
||||
for key, value in self.storage:
|
||||
keynode = Node(digest(key))
|
||||
keynode = create_kad_peerinfo(key)
|
||||
neighbors = self.router.find_neighbors(keynode)
|
||||
if neighbors:
|
||||
last = neighbors[-1].distance_to(keynode)
|
||||
|
|
|
@ -34,24 +34,24 @@ class KBucket:
|
|||
two = KBucket(midpoint + 1, self.range[1], self.ksize)
|
||||
for node in self.nodes.values():
|
||||
bucket = one if node.long_id <= midpoint else two
|
||||
bucket.nodes[node.id] = node
|
||||
bucket.nodes[node.peer_id] = node
|
||||
return (one, two)
|
||||
|
||||
def remove_node(self, node):
|
||||
if node.id not in self.nodes:
|
||||
if node.peer_id not in self.nodes:
|
||||
return
|
||||
|
||||
# delete node, and see if we can add a replacement
|
||||
del self.nodes[node.id]
|
||||
del self.nodes[node.peer_id]
|
||||
if self.replacement_nodes:
|
||||
newnode = self.replacement_nodes.pop()
|
||||
self.nodes[newnode.id] = newnode
|
||||
self.nodes[newnode.peer_id] = newnode
|
||||
|
||||
def has_in_range(self, node):
|
||||
return self.range[0] <= node.long_id <= self.range[1]
|
||||
|
||||
def is_new_node(self, node):
|
||||
return node.id not in self.nodes
|
||||
return node.peer_id not in self.nodes
|
||||
|
||||
def add_node(self, node):
|
||||
"""
|
||||
|
@ -61,11 +61,11 @@ class KBucket:
|
|||
If the bucket is full, keep track of node in a replacement list,
|
||||
per section 4.1 of the paper.
|
||||
"""
|
||||
if node.id in self.nodes:
|
||||
del self.nodes[node.id]
|
||||
self.nodes[node.id] = node
|
||||
if node.peer_id in self.nodes:
|
||||
del self.nodes[node.peer_id]
|
||||
self.nodes[node.peer_id] = node
|
||||
elif len(self) < self.ksize:
|
||||
self.nodes[node.id] = node
|
||||
self.nodes[node.peer_id] = node
|
||||
else:
|
||||
self.replacement_nodes.push(node)
|
||||
return False
|
||||
|
@ -73,7 +73,7 @@ class KBucket:
|
|||
|
||||
def depth(self):
|
||||
vals = self.nodes.values()
|
||||
sprefix = shared_prefix([bytes_to_bit_string(n.id) for n in vals])
|
||||
sprefix = shared_prefix([bytes_to_bit_string(n.peer_id) for n in vals])
|
||||
return len(sprefix)
|
||||
|
||||
def head(self):
|
||||
|
@ -185,7 +185,7 @@ class RoutingTable:
|
|||
nodes = []
|
||||
for neighbor in TableTraverser(self, node):
|
||||
notexcluded = exclude is None or not neighbor.same_home_as(exclude)
|
||||
if neighbor.id != node.id and notexcluded:
|
||||
if neighbor.peer_id != node.peer_id and notexcluded:
|
||||
heapq.heappush(nodes, (node.distance_to(neighbor), neighbor))
|
||||
if len(nodes) == k:
|
||||
break
|
||||
|
|
|
@ -7,9 +7,9 @@ from .peerdata import PeerData
|
|||
|
||||
class PeerInfo:
|
||||
# pylint: disable=too-few-public-methods
|
||||
def __init__(self, peer_id, peer_data):
|
||||
def __init__(self, peer_id, peer_data=None):
|
||||
self.peer_id = peer_id
|
||||
self.addrs = peer_data.get_addrs()
|
||||
self.addrs = peer_data.get_addrs() if peer_data else None
|
||||
|
||||
|
||||
def info_from_p2p_addr(addr):
|
||||
|
|
Loading…
Reference in New Issue
Block a user