py-libp2p/libp2p/kademlia/network.py

258 lines
9.0 KiB
Python
Raw Normal View History

2018-10-14 22:32:27 +08:00
"""
Package for interacting on the network at a high level.
"""
import pickle
import asyncio
import logging
2019-01-16 01:41:41 +08:00
from .protocol import KademliaProtocol
from .utils import digest
from .storage import ForgetfulStorage
2019-04-20 08:44:17 +08:00
from .kad_peerinfo import create_kad_peerinfo
2019-01-16 01:41:41 +08:00
from .crawling import ValueSpiderCrawl
from .crawling import NodeSpiderCrawl
2018-10-14 22:32:27 +08:00
2019-01-16 01:41:41 +08:00
log = logging.getLogger(__name__) # pylint: disable=invalid-name
2018-10-14 22:32:27 +08:00
2019-01-16 01:41:41 +08:00
# pylint: disable=too-many-instance-attributes
class KademliaServer:
2018-10-14 22:32:27 +08:00
"""
High level view of a node instance. This is the object that should be
created to start listening as an active node on the network.
"""
protocol_class = KademliaProtocol
def __init__(self, ksize=20, alpha=3, node_id=None, storage=None):
"""
Create a server instance. This will start listening on the given port.
Args:
ksize (int): The k parameter from the paper
alpha (int): The alpha parameter from the paper
node_id: The id for this node on the network.
storage: An instance that implements
:interface:`~kademlia.storage.IStorage`
"""
self.ksize = ksize
self.alpha = alpha
self.storage = storage or ForgetfulStorage()
2019-04-20 08:44:17 +08:00
self.node = create_kad_peerinfo(node_id)
2018-10-14 22:32:27 +08:00
self.transport = None
self.protocol = None
self.refresh_loop = None
self.save_state_loop = None
def stop(self):
if self.transport is not None:
self.transport.close()
if self.refresh_loop:
self.refresh_loop.cancel()
if self.save_state_loop:
self.save_state_loop.cancel()
def _create_protocol(self):
return self.protocol_class(self.node, self.storage, self.ksize)
2019-01-16 01:41:41 +08:00
async def listen(self, port, interface='0.0.0.0'):
2018-10-14 22:32:27 +08:00
"""
Start listening on the given port.
Provide interface="::" to accept ipv6 address
"""
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(self._create_protocol,
local_addr=(interface, port))
log.info("Node %i listening on %s:%i",
self.node.long_id, interface, port)
2019-01-16 01:41:41 +08:00
self.transport, self.protocol = await listen
2018-10-14 22:32:27 +08:00
# finally, schedule refreshing table
self.refresh_table()
def refresh_table(self):
log.debug("Refreshing routing table")
asyncio.ensure_future(self._refresh_table())
loop = asyncio.get_event_loop()
self.refresh_loop = loop.call_later(3600, self.refresh_table)
async def _refresh_table(self):
"""
Refresh buckets that haven't had any lookups in the last hour
(per section 2.3 of the paper).
"""
2019-01-16 01:41:41 +08:00
results = []
for node_id in self.protocol.get_refresh_ids():
2019-04-20 08:44:17 +08:00
node = create_kad_peerinfo(node_id)
2019-01-16 01:41:41 +08:00
nearest = self.protocol.router.find_neighbors(node, self.alpha)
2018-10-14 22:32:27 +08:00
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
2019-01-16 01:41:41 +08:00
results.append(spider.find())
2018-10-14 22:32:27 +08:00
# do our crawling
2019-01-16 01:41:41 +08:00
await asyncio.gather(*results)
2018-10-14 22:32:27 +08:00
# now republish keys older than one hour
2019-01-16 01:41:41 +08:00
for dkey, value in self.storage.iter_older_than(3600):
2018-10-14 22:32:27 +08:00
await self.set_digest(dkey, value)
2019-01-16 01:41:41 +08:00
def bootstrappable_neighbors(self):
2018-10-14 22:32:27 +08:00
"""
Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for
use as an argument to the bootstrap method.
The server should have been bootstrapped
already - this is just a utility for getting some neighbors and then
storing them if this server is going down for a while. When it comes
back up, the list of nodes can be used to bootstrap.
"""
2019-01-16 01:41:41 +08:00
neighbors = self.protocol.router.find_neighbors(self.node)
2018-10-14 22:32:27 +08:00
return [tuple(n)[-2:] for n in neighbors]
async def bootstrap(self, addrs):
"""
Bootstrap the server by connecting to other known nodes in the network.
Args:
addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP
addresses are acceptable - hostnames will cause an error.
"""
log.debug("Attempting to bootstrap node with %i initial contacts",
len(addrs))
cos = list(map(self.bootstrap_node, addrs))
gathered = await asyncio.gather(*cos)
nodes = [node for node in gathered if node is not None]
spider = NodeSpiderCrawl(self.protocol, self.node, nodes,
self.ksize, self.alpha)
return await spider.find()
async def bootstrap_node(self, addr):
result = await self.protocol.ping(addr, self.node.peer_id)
2019-04-20 08:44:17 +08:00
return create_kad_peerinfo(result[1], addr[0], addr[1]) if result[0] else None
2018-10-14 22:32:27 +08:00
async def get(self, key):
"""
Get a key if the network has it.
Returns:
:class:`None` if not found, the value otherwise.
"""
log.info("Looking up key %s", key)
dkey = digest(key)
# if this node has it, return it
if self.storage.get(dkey) is not None:
return self.storage.get(dkey)
2019-04-20 08:44:17 +08:00
node = create_kad_peerinfo(dkey)
2019-01-16 01:41:41 +08:00
nearest = self.protocol.router.find_neighbors(node)
if not nearest:
2018-10-14 22:32:27 +08:00
log.warning("There are no known neighbors to get key %s", key)
return None
spider = ValueSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
return await spider.find()
async def set(self, key, value):
"""
Set the given string key to the given value in the network.
"""
if not check_dht_value_type(value):
raise TypeError(
"Value must be of type int, float, bool, str, or bytes"
)
log.info("setting '%s' = '%s' on network", key, value)
dkey = digest(key)
return await self.set_digest(dkey, value)
async def set_digest(self, dkey, value):
"""
Set the given SHA1 digest key (bytes) to the given value in the
network.
"""
2019-04-20 08:44:17 +08:00
node = create_kad_peerinfo(dkey)
2018-10-14 22:32:27 +08:00
2019-01-16 01:41:41 +08:00
nearest = self.protocol.router.find_neighbors(node)
if not nearest:
2018-10-14 22:32:27 +08:00
log.warning("There are no known neighbors to set key %s",
dkey.hex())
return False
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
nodes = await spider.find()
log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes)))
# if this node is close too, then store here as well
2019-01-16 01:41:41 +08:00
biggest = max([n.distance_to(node) for n in nodes])
if self.node.distance_to(node) < biggest:
2018-10-14 22:32:27 +08:00
self.storage[dkey] = value
2019-01-16 01:41:41 +08:00
results = [self.protocol.call_store(n, dkey, value) for n in nodes]
2018-10-14 22:32:27 +08:00
# return true only if at least one store call succeeded
2019-01-16 01:41:41 +08:00
return any(await asyncio.gather(*results))
2018-10-14 22:32:27 +08:00
2019-01-16 01:41:41 +08:00
def save_state(self, fname):
2018-10-14 22:32:27 +08:00
"""
Save the state of this node (the alpha/ksize/id/immediate neighbors)
to a cache file with the given fname.
"""
log.info("Saving state to %s", fname)
data = {
'ksize': self.ksize,
'alpha': self.alpha,
'id': self.node.peer_id,
2019-01-16 01:41:41 +08:00
'neighbors': self.bootstrappable_neighbors()
2018-10-14 22:32:27 +08:00
}
2019-01-16 01:41:41 +08:00
if not data['neighbors']:
2018-10-14 22:32:27 +08:00
log.warning("No known neighbors, so not writing to cache.")
return
2019-01-16 01:41:41 +08:00
with open(fname, 'wb') as file:
pickle.dump(data, file)
2018-10-14 22:32:27 +08:00
@classmethod
2019-01-16 01:41:41 +08:00
def load_state(cls, fname):
2018-10-14 22:32:27 +08:00
"""
Load the state of this node (the alpha/ksize/id/immediate neighbors)
from a cache file with the given fname.
"""
log.info("Loading state from %s", fname)
2019-01-16 01:41:41 +08:00
with open(fname, 'rb') as file:
data = pickle.load(file)
svr = Server(data['ksize'], data['alpha'], data['id'])
if data['neighbors']:
svr.bootstrap(data['neighbors'])
return svr
def save_state_regularly(self, fname, frequency=600):
2018-10-14 22:32:27 +08:00
"""
Save the state of node with a given regularity to the given
filename.
Args:
fname: File name to save retularly to
frequency: Frequency in seconds that the state should be saved.
By default, 10 minutes.
"""
2019-01-16 01:41:41 +08:00
self.save_state(fname)
2018-10-14 22:32:27 +08:00
loop = asyncio.get_event_loop()
self.save_state_loop = loop.call_later(frequency,
2019-01-16 01:41:41 +08:00
self.save_state_regularly,
2018-10-14 22:32:27 +08:00
fname,
frequency)
def check_dht_value_type(value):
"""
Checks to see if the type of the value is a valid type for
placing in the dht.
"""
2019-01-16 01:41:41 +08:00
typeset = [
int,
float,
bool,
str,
bytes
]
return type(value) in typeset # pylint: disable=unidiomatic-typecheck