Dangling kademlia
cleanup
This commit is contained in:
parent
1355fbae99
commit
445c0f8e65
|
@ -1,250 +0,0 @@
|
|||
"""Package for interacting on the network at a high level."""
|
||||
import asyncio
|
||||
import logging
|
||||
import pickle
|
||||
|
||||
from .crawling import NodeSpiderCrawl, ValueSpiderCrawl
|
||||
from .kad_peerinfo import create_kad_peerinfo
|
||||
from .protocol import KademliaProtocol
|
||||
from .storage import ForgetfulStorage
|
||||
from .utils import digest
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KademliaServer:
|
||||
"""
|
||||
High level view of a node instance.
|
||||
|
||||
This is the object that should be created to start listening as an
|
||||
active node on the network.
|
||||
"""
|
||||
|
||||
protocol_class = KademliaProtocol
|
||||
|
||||
def __init__(self, ksize=20, alpha=3, node_id=None, storage=None):
|
||||
"""
|
||||
Create a server instance. This will start listening on the given port.
|
||||
|
||||
Args:
|
||||
ksize (int): The k parameter from the paper
|
||||
alpha (int): The alpha parameter from the paper
|
||||
node_id: The id for this node on the network.
|
||||
storage: An instance that implements
|
||||
:interface:`~kademlia.storage.IStorage`
|
||||
"""
|
||||
self.ksize = ksize
|
||||
self.alpha = alpha
|
||||
self.storage = storage or ForgetfulStorage()
|
||||
self.node = create_kad_peerinfo(node_id)
|
||||
self.transport = None
|
||||
self.protocol = None
|
||||
self.refresh_loop = None
|
||||
self.save_state_loop = None
|
||||
|
||||
def stop(self):
|
||||
if self.transport is not None:
|
||||
self.transport.close()
|
||||
|
||||
if self.refresh_loop:
|
||||
self.refresh_loop.cancel()
|
||||
|
||||
if self.save_state_loop:
|
||||
self.save_state_loop.cancel()
|
||||
|
||||
def _create_protocol(self):
|
||||
return self.protocol_class(self.node, self.storage, self.ksize)
|
||||
|
||||
async def listen(self, port=0, interface="0.0.0.0"):
|
||||
"""
|
||||
Start listening on the given port.
|
||||
|
||||
Provide interface="::" to accept ipv6 address
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
listen = loop.create_datagram_endpoint(
|
||||
self._create_protocol, local_addr=(interface, port)
|
||||
)
|
||||
self.transport, self.protocol = await listen
|
||||
socket = self.transport.get_extra_info("socket")
|
||||
self.address = socket.getsockname()
|
||||
log.info(
|
||||
"Node %i listening on %s:%i",
|
||||
self.node.xor_id,
|
||||
self.address[0],
|
||||
self.address[1],
|
||||
)
|
||||
# finally, schedule refreshing table
|
||||
self.refresh_table()
|
||||
|
||||
def refresh_table(self):
|
||||
log.debug("Refreshing routing table")
|
||||
asyncio.ensure_future(self._refresh_table())
|
||||
loop = asyncio.get_event_loop()
|
||||
self.refresh_loop = loop.call_later(3600, self.refresh_table)
|
||||
|
||||
async def _refresh_table(self):
|
||||
"""Refresh buckets that haven't had any lookups in the last hour (per
|
||||
section 2.3 of the paper)."""
|
||||
results = []
|
||||
for node_id in self.protocol.get_refresh_ids():
|
||||
node = create_kad_peerinfo(node_id)
|
||||
nearest = self.protocol.router.find_neighbors(node, self.alpha)
|
||||
spider = NodeSpiderCrawl(
|
||||
self.protocol, node, nearest, self.ksize, self.alpha
|
||||
)
|
||||
results.append(spider.find())
|
||||
|
||||
# do our crawling
|
||||
await asyncio.gather(*results)
|
||||
|
||||
# now republish keys older than one hour
|
||||
for dkey, value in self.storage.iter_older_than(3600):
|
||||
await self.set_digest(dkey, value)
|
||||
|
||||
def bootstrappable_neighbors(self):
|
||||
"""
|
||||
Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for use
|
||||
as an argument to the bootstrap method.
|
||||
|
||||
The server should have been bootstrapped
|
||||
already - this is just a utility for getting some neighbors and then
|
||||
storing them if this server is going down for a while. When it comes
|
||||
back up, the list of nodes can be used to bootstrap.
|
||||
"""
|
||||
neighbors = self.protocol.router.find_neighbors(self.node)
|
||||
return [tuple(n)[-2:] for n in neighbors]
|
||||
|
||||
async def bootstrap(self, addrs):
|
||||
"""
|
||||
Bootstrap the server by connecting to other known nodes in the network.
|
||||
|
||||
Args:
|
||||
addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP
|
||||
addresses are acceptable - hostnames will cause an error.
|
||||
"""
|
||||
log.debug("Attempting to bootstrap node with %i initial contacts", len(addrs))
|
||||
cos = list(map(self.bootstrap_node, addrs))
|
||||
gathered = await asyncio.gather(*cos)
|
||||
nodes = [node for node in gathered if node is not None]
|
||||
spider = NodeSpiderCrawl(
|
||||
self.protocol, self.node, nodes, self.ksize, self.alpha
|
||||
)
|
||||
return await spider.find()
|
||||
|
||||
async def bootstrap_node(self, addr):
|
||||
result = await self.protocol.ping(addr, self.node.peer_id_bytes)
|
||||
return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None
|
||||
|
||||
async def get(self, key):
|
||||
"""
|
||||
Get a key if the network has it.
|
||||
|
||||
Returns:
|
||||
:class:`None` if not found, the value otherwise.
|
||||
"""
|
||||
log.info("Looking up key %s", key)
|
||||
dkey = digest(key)
|
||||
# if this node has it, return it
|
||||
if self.storage.get(dkey) is not None:
|
||||
return self.storage.get(dkey)
|
||||
|
||||
node = create_kad_peerinfo(dkey)
|
||||
nearest = self.protocol.router.find_neighbors(node)
|
||||
if not nearest:
|
||||
log.warning("There are no known neighbors to get key %s", key)
|
||||
return None
|
||||
spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
|
||||
return await spider.find()
|
||||
|
||||
async def set(self, key, value):
|
||||
"""Set the given string key to the given value in the network."""
|
||||
if not check_dht_value_type(value):
|
||||
raise TypeError("Value must be of type int, float, bool, str, or bytes")
|
||||
log.info("setting '%s' = '%s' on network", key, value)
|
||||
dkey = digest(key)
|
||||
return await self.set_digest(dkey, value)
|
||||
|
||||
async def provide(self, key):
|
||||
"""publish to the network that it provides for a particular key."""
|
||||
neighbors = self.protocol.router.find_neighbors(self.node)
|
||||
return [
|
||||
await self.protocol.call_add_provider(n, key, self.node.peer_id_bytes)
|
||||
for n in neighbors
|
||||
]
|
||||
|
||||
async def get_providers(self, key):
|
||||
"""get the list of providers for a key."""
|
||||
neighbors = self.protocol.router.find_neighbors(self.node)
|
||||
return [await self.protocol.call_get_providers(n, key) for n in neighbors]
|
||||
|
||||
async def set_digest(self, dkey, value):
|
||||
"""Set the given SHA1 digest key (bytes) to the given value in the
|
||||
network."""
|
||||
node = create_kad_peerinfo(dkey)
|
||||
|
||||
nearest = self.protocol.router.find_neighbors(node)
|
||||
if not nearest:
|
||||
log.warning("There are no known neighbors to set key %s", dkey.hex())
|
||||
return False
|
||||
|
||||
spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
|
||||
nodes = await spider.find()
|
||||
log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes)))
|
||||
|
||||
# if this node is close too, then store here as well
|
||||
biggest = max([n.distance_to(node) for n in nodes])
|
||||
if self.node.distance_to(node) < biggest:
|
||||
self.storage[dkey] = value
|
||||
results = [self.protocol.call_store(n, dkey, value) for n in nodes]
|
||||
# return true only if at least one store call succeeded
|
||||
return any(await asyncio.gather(*results))
|
||||
|
||||
def save_state(self, fname):
|
||||
"""Save the state of this node (the alpha/ksize/id/immediate neighbors)
|
||||
to a cache file with the given fname."""
|
||||
log.info("Saving state to %s", fname)
|
||||
data = {
|
||||
"ksize": self.ksize,
|
||||
"alpha": self.alpha,
|
||||
"id": self.node.peer_id_bytes,
|
||||
"neighbors": self.bootstrappable_neighbors(),
|
||||
}
|
||||
if not data["neighbors"]:
|
||||
log.warning("No known neighbors, so not writing to cache.")
|
||||
return
|
||||
with open(fname, "wb") as file:
|
||||
pickle.dump(data, file)
|
||||
|
||||
@classmethod
|
||||
def load_state(cls, fname):
|
||||
"""Load the state of this node (the alpha/ksize/id/immediate neighbors)
|
||||
from a cache file with the given fname."""
|
||||
log.info("Loading state from %s", fname)
|
||||
with open(fname, "rb") as file:
|
||||
data = pickle.load(file)
|
||||
svr = KademliaServer(data["ksize"], data["alpha"], data["id"])
|
||||
if data["neighbors"]:
|
||||
svr.bootstrap(data["neighbors"])
|
||||
return svr
|
||||
|
||||
def save_state_regularly(self, fname, frequency=600):
|
||||
"""
|
||||
Save the state of node with a given regularity to the given filename.
|
||||
|
||||
:param fname: File name to save regularly to
|
||||
:param frequency: Frequency in seconds that the state should be saved.
|
||||
By default, 10 minutes.
|
||||
"""
|
||||
self.save_state(fname)
|
||||
loop = asyncio.get_event_loop()
|
||||
self.save_state_loop = loop.call_later(
|
||||
frequency, self.save_state_regularly, fname, frequency
|
||||
)
|
||||
|
||||
|
||||
def check_dht_value_type(value):
|
||||
"""Checks to see if the type of the value is a valid type for placing in
|
||||
the dht."""
|
||||
typeset = [int, float, bool, str, bytes]
|
||||
return type(value) in typeset
|
Loading…
Reference in New Issue
Block a user