From 44368863719d36ba2ad6925c5ea3cdb9a8dcc2cb Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 25 Apr 2019 13:25:09 -0400 Subject: [PATCH 1/3] implement add get providers --- libp2p/kademlia/protocol.py | 50 ++++++++++++++++++++++++++++++++----- libp2p/kademlia/utils.py | 4 +++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index b0eea2b..0090c64 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -5,6 +5,7 @@ import logging from rpcudp.protocol import RPCProtocol from .kad_peerinfo import create_kad_peerinfo from .routing import RoutingTable +from .utils import validate_provider_id log = logging.getLogger(__name__) # pylint: disable=invalid-name @@ -36,12 +37,6 @@ class KademliaProtocol(RPCProtocol): ids.append(rid) return ids - def rpc_add_provider(self, sender, nodeid, key): - pass - - def rpc_get_providers(self, sender, nodeid, key): - pass - def rpc_stun(self, sender): # pylint: disable=no-self-use return sender @@ -79,6 +74,38 @@ class KademliaProtocol(RPCProtocol): return self.rpc_find_node(sender, nodeid, key) return {'value': value} + def rpc_add_provider(self, sender, nodeid, key, provider_peerinfo): + """ + rpc when receiving an add_provider call + should validate received PeerInfo matches sender nodeid + if it does, receipient must store a record in its datastore + """ + log.info("adding provider for key %s in local table", + str(key)) + if validate_provider_id(nodeid, provider_peerinfo): + source = create_kad_peerinfo(nodeid, sender[0], sender[1]) + # TODO differentiate this from key, value + self.storage[key] = provider_peerinfo + return True + return False + + def rpc_get_providers(self, sender, key): + """ + rpc when receiving a get_providers call + should look up key in data store and respond with records + plus a list of closer peers in itrs routing table + """ + providers = [] + record = self.storage.get(key, None) + if record: + providers.append(record) + + keynode = create_kad_peerinfo(key) + neighbors = self.router.find_neighbors(keynode) + providers.extend(neighbors) + + return providers + async def call_find_node(self, node_to_ask, node_to_find): address = (node_to_ask.ip, node_to_ask.port) result = await self.find_node(address, self.source_node.peer_id, @@ -101,6 +128,17 @@ class KademliaProtocol(RPCProtocol): result = await self.store(address, self.source_node.peer_id, key, value) return self.handle_call_response(result, node_to_ask) + def call_add_provider(self, node_to_ask, key): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.add_provider(address, + self.source_node.peer_id, key) + return self.handle_call_response(result, node_to_ask) + + def call_get_providers(self, node_to_ask, key): + address = (node_to_ask.ip, node_to_ask.port) + result = await self.get_providers(address, key) + return self.handle_call_response(result, node_to_ask) + def welcome_if_new(self, node): """ Given a new node, send it all the keys/values it should be storing, diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index da90085..d47e4e0 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -55,3 +55,7 @@ def shared_prefix(args): def bytes_to_bit_string(bites): bits = [bin(bite)[2:].rjust(8, '0') for bite in bites] return "".join(bits) + + +def validate_provider_id(sender_id, sender_peerinfo): + return sender_id == sender_peerinfo.peer_id From db7be2d561a5e2c5ad883310c3407abfe9e50a50 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 28 Apr 2019 17:57:57 -0400 Subject: [PATCH 2/3] add simple test --- tests/kademlia/test_providers.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tests/kademlia/test_providers.py diff --git a/tests/kademlia/test_providers.py b/tests/kademlia/test_providers.py new file mode 100644 index 0000000..9723cc1 --- /dev/null +++ b/tests/kademlia/test_providers.py @@ -0,0 +1,31 @@ +import pytest +from libp2p.kademlia.network import KademliaServer + + +@pytest.mark.asyncio +async def test_example(): + node_a = KademliaServer() + await node_a.listen(5801) + + node_b = KademliaServer() + await node_b.listen(5802) + await node_b.bootstrap([("127.0.0.1", 5801)]) + + key = "hello" + value = "world" + await node_b.set(key, value) + await node_b.provide("hello") + + providers = await node_b.get_providers("hello") + # print ("providers") + # print (providers) + + # bmuller's handle_call_response wraps + # every rpc call result in a list of tuples + # [(True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F'])] + first_tuple = providers[0] + # (True, [b'\xf9\xa1\xf5\x10a\xe5\xe0F']) + first_providers = first_tuple[1] + # [b'\xf9\xa1\xf5\x10a\xe5\xe0F'] + first_provider = first_providers[0] + assert node_b.node.peer_id == first_provider From e1d6fdae731bfc4439910eb8d5eeab500159c023 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 28 Apr 2019 17:58:14 -0400 Subject: [PATCH 3/3] pass test --- libp2p/kademlia/network.py | 14 ++++++++++++++ libp2p/kademlia/protocol.py | 31 ++++++++++++++++++------------- libp2p/kademlia/utils.py | 4 ---- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index 86522ba..63a6c31 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -166,6 +166,20 @@ class KademliaServer: dkey = digest(key) return await self.set_digest(dkey, value) + async def provide(self, key): + """ + publish to the network that it provides for a particular key + """ + neighbors = self.protocol.router.find_neighbors(self.node) + return [await self.protocol.call_add_provider(n, key, self.node.peer_id) for n in neighbors] + + async def get_providers(self, key): + """ + get the list of providers for a key + """ + neighbors = self.protocol.router.find_neighbors(self.node) + return [await self.protocol.call_get_providers(n, key) for n in neighbors] + async def set_digest(self, dkey, value): """ Set the given SHA1 digest key (bytes) to the given value in the diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 0090c64..0612545 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -5,7 +5,6 @@ import logging from rpcudp.protocol import RPCProtocol from .kad_peerinfo import create_kad_peerinfo from .routing import RoutingTable -from .utils import validate_provider_id log = logging.getLogger(__name__) # pylint: disable=invalid-name @@ -74,35 +73,39 @@ class KademliaProtocol(RPCProtocol): return self.rpc_find_node(sender, nodeid, key) return {'value': value} - def rpc_add_provider(self, sender, nodeid, key, provider_peerinfo): + def rpc_add_provider(self, sender, nodeid, key, provider_id): + # pylint: disable=unused-argument """ rpc when receiving an add_provider call should validate received PeerInfo matches sender nodeid if it does, receipient must store a record in its datastore + we store a map of content_id to peer_id (non xor) """ - log.info("adding provider for key %s in local table", - str(key)) - if validate_provider_id(nodeid, provider_peerinfo): - source = create_kad_peerinfo(nodeid, sender[0], sender[1]) - # TODO differentiate this from key, value - self.storage[key] = provider_peerinfo + if nodeid == provider_id: + log.info("adding provider %s for key %s in local table", + provider_id, str(key)) + self.storage[key] = provider_id return True return False def rpc_get_providers(self, sender, key): + # pylint: disable=unused-argument """ rpc when receiving a get_providers call should look up key in data store and respond with records - plus a list of closer peers in itrs routing table + plus a list of closer peers in its routing table """ providers = [] record = self.storage.get(key, None) + if record: providers.append(record) keynode = create_kad_peerinfo(key) neighbors = self.router.find_neighbors(keynode) - providers.extend(neighbors) + for neighbor in neighbors: + if neighbor.peer_id != record: + providers.append(neighbor.peer_id) return providers @@ -128,13 +131,15 @@ class KademliaProtocol(RPCProtocol): result = await self.store(address, self.source_node.peer_id, key, value) return self.handle_call_response(result, node_to_ask) - def call_add_provider(self, node_to_ask, key): + async def call_add_provider(self, node_to_ask, key, provider_id): address = (node_to_ask.ip, node_to_ask.port) result = await self.add_provider(address, - self.source_node.peer_id, key) + self.source_node.peer_id, + key, provider_id) + return self.handle_call_response(result, node_to_ask) - def call_get_providers(self, node_to_ask, key): + async def call_get_providers(self, node_to_ask, key): address = (node_to_ask.ip, node_to_ask.port) result = await self.get_providers(address, key) return self.handle_call_response(result, node_to_ask) diff --git a/libp2p/kademlia/utils.py b/libp2p/kademlia/utils.py index d47e4e0..da90085 100644 --- a/libp2p/kademlia/utils.py +++ b/libp2p/kademlia/utils.py @@ -55,7 +55,3 @@ def shared_prefix(args): def bytes_to_bit_string(bites): bits = [bin(bite)[2:].rjust(8, '0') for bite in bites] return "".join(bits) - - -def validate_provider_id(sender_id, sender_peerinfo): - return sender_id == sender_peerinfo.peer_id