From cd8cb5c443dc8ad50883825569ea2ed4763ea5fa Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Wed, 27 Feb 2019 22:49:51 +0100 Subject: [PATCH] refactoring of the code to implement IAdvertiser and IDiscoverer --- libp2p/__init__.py | 2 + libp2p/{routing => discovery}/__init__.py | 0 libp2p/discovery/kademlia_router.py | 77 ++++++++++++++++++++++ libp2p/host/routed_host.py | 61 +++++++++++++++++ libp2p/kademlia/crawling.py | 40 ++++++++++++ tests/kademlia/__init__.py | 0 tests/kademlia/test_libp2p_node.py | 80 +++++++++++++++++++++++ 7 files changed, 260 insertions(+) rename libp2p/{routing => discovery}/__init__.py (100%) create mode 100644 libp2p/discovery/kademlia_router.py create mode 100644 libp2p/host/routed_host.py create mode 100644 tests/kademlia/__init__.py create mode 100644 tests/kademlia/test_libp2p_node.py diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 62e014b..fe00408 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -23,6 +23,7 @@ async def cleanup_done_tasks(): # Some sleep necessary to context switch await asyncio.sleep(3) + def initialize_default_swarm( id_opt=None, transport_opt=None, muxer_opt=None, sec_opt=None, peerstore_opt=None): @@ -54,6 +55,7 @@ def initialize_default_swarm( return swarm_opt + async def new_node( swarm_opt=None, id_opt=None, transport_opt=None, muxer_opt=None, sec_opt=None, peerstore_opt=None): diff --git a/libp2p/routing/__init__.py b/libp2p/discovery/__init__.py similarity index 100% rename from libp2p/routing/__init__.py rename to libp2p/discovery/__init__.py diff --git a/libp2p/discovery/kademlia_router.py b/libp2p/discovery/kademlia_router.py new file mode 100644 index 0000000..2d7004c --- /dev/null +++ b/libp2p/discovery/kademlia_router.py @@ -0,0 +1,77 @@ +import msgpack + +from libp2p.peer.id import id_b58_decode +from libp2p.kademlia.network import Server +from libp2p.kademlia.node import Node +from libp2p.kademlia.utils import digest +from libp2p.kademlia.crawling import ValueMultipleSpiderCrawl +from libp2p.discovery.advertiser_interface import IAdvertiser +from libp2p.discovery.discoverer_interface import IDiscoverer + +from libp2p.peer.peerinfo import PeerInfo +from libp2p.peer.peerdata import PeerData + + +class KademliaPeerRouter(IAdvertiser, IDiscoverer): + + def __init__(self, host, bootstrap_nodes=None): + self.host = host + self.peer_id = host.get_id() + self.bootstrap_nodes = bootstrap_nodes + self.node = Server() + + async def listen(self, port): + await self.node.listen(port) + if self.bootstrap_nodes: + await self.node.bootstrap(self.bootstrap_nodes) + + async def advertise(self, service): + await self.node.set(service, self._make_advertise_msg()) + + def _make_advertise_msg(self): + peer_data = PeerData() + peer_data.add_addrs(self.host.get_addrs()) + peer_info = PeerInfo(self.peer_id, peer_data) + + if len(peer_info.addrs) < 1: + raise RuntimeError("not know address for self") + + return encode_peer_info(peer_info) + + async def find_peers(self, service): + key = dht_key(service) + target = Node(key) + + nearest = self.node.protocol.router.find_neighbors(target) + if not nearest: + print("There are no known neighbors to get key %s", key) + return [] + spider = ValueMultipleSpiderCrawl(self.node.protocol, target, nearest, + self.node.ksize, self.node.alpha) + + values = await spider.find() + if values: + return list(map(decode_peer_info, values)) + return [] + + +def dht_key(service): + # TODO: should convert to Content Identification + return digest(service) + + +def encode_peer_info(peer_info): + return msgpack.dumps({ + 'peer_id': peer_info.peer_id.pretty(), + 'addrs': [str(ma) for ma in peer_info.addrs] + }) + + +def decode_peer_info(data): + info = msgpack.loads(data, raw=False) + + peer_id = id_b58_decode(info['peer_id']) + peer_data = PeerData() + peer_data.add_addrs(info['addrs']) + + return PeerInfo(peer_id, peer_data) diff --git a/libp2p/host/routed_host.py b/libp2p/host/routed_host.py new file mode 100644 index 0000000..3d1a19a --- /dev/null +++ b/libp2p/host/routed_host.py @@ -0,0 +1,61 @@ +from libp2p.peer.peerstore import PeerStoreError + +from .basic_host import BasicHost + + +class RoutedHost(BasicHost): + + # default options constructor + def __init__(self, host, router): + super().__init__(host.network) + self.host = host + self.router = router + + async def advertise(self, service): + await self.router.advertise(service) + + async def connect(self, peer_info): + """ + connect ensures there is a connection between this host and the peer with + given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal + peerstore. If there is not an active connection, connect will issue a + dial, and block until a connection is open, or an error is + returned. + + :param peer_info: peer_info of the host we want to connect to + :type peer_info: peer.peerinfo.PeerInfo + """ + # there is already a connection to this peer + if peer_info.peer_id in self.network.connections: + return + + # Check if we have some address for that peer + # if not, we use the router to get information about the peer + peer_info.addrs = await self._find_peer_addrs(peer_info.peer_id) + + # if addrs are given, save them + if peer_info.addrs: + self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) + + # try to connect + await super().connect(peer_info) + + def find_peers(self, service): + return self.router.find_peers(service) + + async def _find_peer_addrs(self, peer_id): + try: + addrs = self.peerstore.addrs(peer_id) + except PeerStoreError: + addrs = None + + if not addrs: + peers_info = await self.router.find_peers(peer_id.pretty()) + if not peers_info: + raise KeyError("no address found for this peer_id %s" % str(peer_id)) + peer_info = peers_info[0] # todo: handle multiple response + if peer_info.peer_id != peer_id: + raise RuntimeError('routing failure: provided addrs for different peer') + addrs = peer_info.addrs + + return addrs diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 6dabd10..ade597a 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -69,6 +69,46 @@ class SpiderCrawl: raise NotImplementedError +class ValueMultipleSpiderCrawl(SpiderCrawl): + # TODO: move this out of this module + def __init__(self, protocol, node, peers, ksize, alpha): + SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) + # keep track of the single nearest node without value - per + # section 2.3 so we can set the key there if found + self.nearest_without_value = NodeHeap(self.node, 1) + + async def find(self): + """ + Find either the closest nodes or the value requested. + """ + return await self._find(self.protocol.call_find_value) + + async def _nodes_found(self, responses): + """ + Handle the result of an iteration in _find. + """ + toremove = [] + found_values = [] + for peerid, response in responses.items(): + response = RPCFindResponse(response) + if not response.happened(): + toremove.append(peerid) + elif response.has_value(): + found_values.append(response.get_value()) + else: + peer = self.nearest.get_node(peerid) + self.nearest_without_value.push(peer) + self.nearest.push(response.get_node_list()) + self.nearest.remove(toremove) + + if found_values: + return found_values + if self.nearest.have_contacted_all(): + # not found! + return None + return await self.find() + + class ValueSpiderCrawl(SpiderCrawl): def __init__(self, protocol, node, peers, ksize, alpha): SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) diff --git a/tests/kademlia/__init__.py b/tests/kademlia/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/kademlia/test_libp2p_node.py b/tests/kademlia/test_libp2p_node.py new file mode 100644 index 0000000..bcdd7e4 --- /dev/null +++ b/tests/kademlia/test_libp2p_node.py @@ -0,0 +1,80 @@ +import multiaddr +import pytest +from libp2p import new_node +from libp2p.host.routed_host import RoutedHost +from libp2p.kademlia.network import Server +from libp2p.discovery.kademlia_router import KademliaPeerRouter +from libp2p.peer.peerinfo import info_from_p2p_addr + + +# @pytest.mark.asyncio +# async def test_connect_to_peer(): +# node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) +# node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + +# node_a = RoutedHost(node_a, KademliaPeerRouter(node_a)) +# await node_a.router.listen(5678) +# node_b = RoutedHost(node_b, KademliaPeerRouter(node_b, [('127.0.0.1', 5678)])) +# await node_b.router.listen(5679) + +# await node_a.advertise(node_a.get_id().pretty()) + +# async def stream_handler(stream): +# while True: +# read_string = (await stream.read()).decode() +# print("host B received:" + read_string) + +# response = "ack:" + read_string +# print("sending response:" + response) +# await stream.write(response.encode()) + +# node_b.set_stream_handler("/echo/1.0.0", stream_handler) + +# info = info_from_p2p_addr(node_b.get_addrs()[0]) +# info.addrs = [] # explicitly empty the addrs to force the router to look in the dht +# await node_a.connect(info) + +# stream = await node_a.new_stream(node_b.get_id(), ["/echo/1.0.0"]) + +# messages = ["hello" + str(x) for x in range(10)] +# for message in messages: +# await stream.write(message.encode()) + +# response = (await stream.read()).decode() + +# print("res: " + response) +# assert response == ("ack:" + message) + +# # Success, terminate pending tasks. +# return + +@pytest.mark.parametrize("nodes_nr", [2]) +# @pytest.mark.parametrize("nodes_nr", [(2**i) for i in range(1, 5)]) +@pytest.mark.asyncio +async def test_find_peers(nodes_nr): + nodes = await _make_routed_hosts(nodes_nr, 5678) + node_a = nodes[0] + + await node_a.advertise("test") + + for node in nodes[1:]: + peers = await node.find_peers("test") + assert len(peers) == 1 + assert peers[0].peer_id == node_a.get_id() + + +async def _make_routed_host(dht_port, bootstrap_nodes=None): + node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + routed_node = RoutedHost(node, KademliaPeerRouter(node, bootstrap_nodes)) + await routed_node.router.listen(dht_port) + return routed_node + + +async def _make_routed_hosts(n, dht_port): + hosts = [] + bootraps = [] + for _ in range(n): + hosts.append(await _make_routed_host(dht_port, bootraps)) + bootraps.append(('127.0.0.1', dht_port)) + dht_port += 1 + return hosts