py-libp2p/libp2p/kademlia/protocol.py

192 lines
6.9 KiB
Python
Raw Normal View History

2018-10-14 10:32:27 -04:00
import asyncio
import logging
2019-08-02 22:36:19 -07:00
import random
2018-10-14 10:32:27 -04:00
from rpcudp.protocol import RPCProtocol
2019-08-02 22:36:19 -07:00
2019-04-19 20:44:17 -04:00
from .kad_peerinfo import create_kad_peerinfo
2019-01-15 18:41:41 +01:00
from .routing import RoutingTable
2019-04-19 20:44:17 -04:00
2019-08-02 23:19:36 +08:00
log = logging.getLogger(__name__)
2018-10-14 10:32:27 -04:00
class KademliaProtocol(RPCProtocol):
"""
There are four main RPCs in the Kademlia protocol
PING, STORE, FIND_NODE, FIND_VALUE
PING probes if a node is still online
STORE instructs a node to store (key, value)
FIND_NODE takes a 160-bit ID and gets back
(ip, udp_port, node_id) for k closest nodes to target
FIND_VALUE behaves like FIND_NODE unless a value is stored
"""
2019-07-31 15:00:12 -07:00
2019-01-15 18:41:41 +01:00
def __init__(self, source_node, storage, ksize):
2018-10-14 10:32:27 -04:00
RPCProtocol.__init__(self)
2019-01-15 18:41:41 +01:00
self.router = RoutingTable(self, ksize, source_node)
2018-10-14 10:32:27 -04:00
self.storage = storage
2019-01-15 18:41:41 +01:00
self.source_node = source_node
2018-10-14 10:32:27 -04:00
2019-01-15 18:41:41 +01:00
def get_refresh_ids(self):
2018-10-14 10:32:27 -04:00
"""
Get ids to search for to keep old buckets up to date.
"""
ids = []
2019-01-15 18:41:41 +01:00
for bucket in self.router.lonely_buckets():
2019-07-31 15:00:12 -07:00
rid = random.randint(*bucket.range).to_bytes(20, byteorder="big")
2018-10-14 10:32:27 -04:00
ids.append(rid)
return ids
2019-08-02 23:19:36 +08:00
def rpc_stun(self, sender):
2018-10-14 10:32:27 -04:00
return sender
def rpc_ping(self, sender, nodeid):
2019-04-19 20:44:17 -04:00
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
2019-01-15 18:41:41 +01:00
self.welcome_if_new(source)
return self.source_node.peer_id_bytes
2018-10-14 10:32:27 -04:00
def rpc_store(self, sender, nodeid, key, value):
2019-04-19 20:44:17 -04:00
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
2019-01-15 18:41:41 +01:00
self.welcome_if_new(source)
log.debug(
"got a store request from %s, storing '%s'='%s'", sender, key.hex(), value
)
2018-10-14 10:32:27 -04:00
self.storage[key] = value
return True
def rpc_find_node(self, sender, nodeid, key):
2019-07-31 15:00:12 -07:00
log.info("finding neighbors of %i in local table", int(nodeid.hex(), 16))
2019-04-19 20:44:17 -04:00
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
2019-01-15 18:41:41 +01:00
self.welcome_if_new(source)
2019-04-19 20:44:17 -04:00
node = create_kad_peerinfo(key)
2019-01-15 18:41:41 +01:00
neighbors = self.router.find_neighbors(node, exclude=source)
2018-10-14 10:32:27 -04:00
return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key):
2019-04-19 20:44:17 -04:00
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
2019-01-15 18:41:41 +01:00
self.welcome_if_new(source)
2018-10-14 10:32:27 -04:00
value = self.storage.get(key, None)
if value is None:
return self.rpc_find_node(sender, nodeid, key)
2019-07-31 15:00:12 -07:00
return {"value": value}
2018-10-14 10:32:27 -04:00
2019-04-28 17:58:14 -04:00
def rpc_add_provider(self, sender, nodeid, key, provider_id):
2019-04-25 13:25:09 -04:00
"""
rpc when receiving an add_provider call
should validate received PeerInfo matches sender nodeid
if it does, receipient must store a record in its datastore
2019-04-28 17:58:14 -04:00
we store a map of content_id to peer_id (non xor)
2019-04-25 13:25:09 -04:00
"""
2019-04-28 17:58:14 -04:00
if nodeid == provider_id:
log.info(
"adding provider %s for key %s in local table", provider_id, str(key)
)
2019-04-28 17:58:14 -04:00
self.storage[key] = provider_id
2019-04-25 13:25:09 -04:00
return True
return False
def rpc_get_providers(self, sender, key):
"""
rpc when receiving a get_providers call
should look up key in data store and respond with records
2019-04-28 17:58:14 -04:00
plus a list of closer peers in its routing table
2019-04-25 13:25:09 -04:00
"""
providers = []
record = self.storage.get(key, None)
2019-04-28 17:58:14 -04:00
2019-04-25 13:25:09 -04:00
if record:
providers.append(record)
keynode = create_kad_peerinfo(key)
neighbors = self.router.find_neighbors(keynode)
2019-04-28 17:58:14 -04:00
for neighbor in neighbors:
if neighbor.peer_id_bytes != record:
providers.append(neighbor.peer_id_bytes)
2019-04-25 13:25:09 -04:00
return providers
2019-01-15 18:41:41 +01:00
async def call_find_node(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port)
2019-07-31 15:00:12 -07:00
result = await self.find_node(
address, self.source_node.peer_id_bytes, node_to_find.peer_id_bytes
2019-07-31 15:00:12 -07:00
)
2019-01-15 18:41:41 +01:00
return self.handle_call_response(result, node_to_ask)
async def call_find_value(self, node_to_ask, node_to_find):
address = (node_to_ask.ip, node_to_ask.port)
2019-07-31 15:00:12 -07:00
result = await self.find_value(
address, self.source_node.peer_id_bytes, node_to_find.peer_id_bytes
2019-07-31 15:00:12 -07:00
)
2019-01-15 18:41:41 +01:00
return self.handle_call_response(result, node_to_ask)
async def call_ping(self, node_to_ask):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.ping(address, self.source_node.peer_id_bytes)
2019-01-15 18:41:41 +01:00
return self.handle_call_response(result, node_to_ask)
async def call_store(self, node_to_ask, key, value):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.store(address, self.source_node.peer_id_bytes, key, value)
2019-01-15 18:41:41 +01:00
return self.handle_call_response(result, node_to_ask)
2019-04-28 17:58:14 -04:00
async def call_add_provider(self, node_to_ask, key, provider_id):
2019-04-25 13:25:09 -04:00
address = (node_to_ask.ip, node_to_ask.port)
result = await self.add_provider(
address, self.source_node.peer_id_bytes, key, provider_id
)
2019-04-28 17:58:14 -04:00
2019-04-25 13:25:09 -04:00
return self.handle_call_response(result, node_to_ask)
2019-04-28 17:58:14 -04:00
async def call_get_providers(self, node_to_ask, key):
2019-04-25 13:25:09 -04:00
address = (node_to_ask.ip, node_to_ask.port)
result = await self.get_providers(address, key)
return self.handle_call_response(result, node_to_ask)
2019-01-15 18:41:41 +01:00
def welcome_if_new(self, node):
2018-10-14 10:32:27 -04:00
"""
Given a new node, send it all the keys/values it should be storing,
then add it to the routing table.
@param node: A new node that just joined (or that we just found out
about).
Process:
For each key in storage, get k closest nodes. If newnode is closer
than the furtherst in that list, and the node for this server
is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper)
"""
2019-01-15 18:41:41 +01:00
if not self.router.is_new_node(node):
2018-10-14 10:32:27 -04:00
return
log.info("never seen %s before, adding to router", node)
2019-01-15 18:41:41 +01:00
for key, value in self.storage:
2019-04-19 20:44:17 -04:00
keynode = create_kad_peerinfo(key)
2019-01-15 18:41:41 +01:00
neighbors = self.router.find_neighbors(keynode)
if neighbors:
last = neighbors[-1].distance_to(keynode)
new_node_close = node.distance_to(keynode) < last
first = neighbors[0].distance_to(keynode)
this_closest = self.source_node.distance_to(keynode) < first
if not neighbors or (new_node_close and this_closest):
asyncio.ensure_future(self.call_store(node, key, value))
self.router.add_contact(node)
def handle_call_response(self, result, node):
2018-10-14 10:32:27 -04:00
"""
If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table.
"""
if not result[0]:
log.warning("no response from %s, removing from router", node)
2019-01-15 18:41:41 +01:00
self.router.remove_contact(node)
2018-10-14 10:32:27 -04:00
return result
log.info("got successful response from %s", node)
2019-01-15 18:41:41 +01:00
self.welcome_if_new(node)
2018-10-14 10:32:27 -04:00
return result