diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index b0eea2b..0090c64 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -5,6 +5,7 @@ import logging from rpcudp.protocol import RPCProtocol from .kad_peerinfo import create_kad_peerinfo from .routing import RoutingTable +from .utils import validate_provider_id log = logging.getLogger(__name__) # pylint: disable=invalid-name @@ -36,12 +37,6 @@ class KademliaProtocol(RPCProtocol): ids.append(rid) return ids - def rpc_add_provider(self, sender, nodeid, key): - pass - - def rpc_get_providers(self, sender, nodeid, key): - pass - def rpc_stun(self, sender): # pylint: disable=no-self-use return sender @@ -79,6 +74,38 @@ class KademliaProtocol(RPCProtocol): return self.rpc_find_node(sender, nodeid, key) return {'value': value} + def rpc_add_provider(self, sender, nodeid, key, provider_peerinfo): + """ + rpc when receiving an add_provider call + should validate received PeerInfo matches sender nodeid + if it does, receipient must store a record in its datastore + """ + log.info("adding provider for key %s in local table", + str(key)) + if validate_provider_id(nodeid, provider_peerinfo): + source = create_kad_peerinfo(nodeid, sender[0], sender[1]) + # TODO differentiate this from key, value + self.storage[key] = provider_peerinfo + return True + return False + + def rpc_get_providers(self, sender, key): + """ + rpc when receiving a get_providers call + should look up key in data store and respond with records + plus a list of closer peers in itrs routing table + """ + providers = [] + record = self.storage.get(key, None) + if record: + providers.append(record) + + keynode = create_kad_peerinfo(key) + neighbors = self.router.find_neighbors(keynode) + providers.extend(neighbors) + + return providers + async def call_find_node(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) result = await self.find_node(address, self.source_node.peer_id, @@ -101,6 +128,17 @@ class KademliaProtocol(RPCProtocol): result = await self.store(address, self.source_node.peer_id, key, value) return self.handle_call_response(result, node_to_ask) + def call_add_provider(self, node_to_ask, key): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.add_provider(address, + self.source_node.peer_id, key) + return self.handle_call_response(result, node_to_ask) + + def call_get_providers(self, node_to_ask, key): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.get_providers(address, key) + return self.handle_call_response(result, node_to_ask) + def welcome_if_new(self, node): """ Given a new node, send it all the keys/values it should be storing, diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index da90085..d47e4e0 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -55,3 +55,7 @@ def shared_prefix(args): def bytes_to_bit_string(bites): bits = [bin(bite)[2:].rjust(8, '0') for bite in bites] return "".join(bits) + + +def validate_provider_id(sender_id, sender_peerinfo): + return sender_id == sender_peerinfo.peer_id