diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 38a285a..2772bed 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -6,10 +6,10 @@ from .peer.peerstore import PeerStore from .peer.id import id_from_public_key from .network.swarm import Swarm from .host.basic_host import BasicHost -from .kademlia.routed_host import RoutedHost from .transport.upgrader import TransportUpgrader from .transport.tcp.tcp import TCP from .kademlia.network import KademliaServer +from .routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter async def cleanup_done_tasks(): @@ -31,7 +31,7 @@ def generate_id(): # private_key = new_key.exportKey("PEM") return new_id -def initialize_default_kademlia( +def initialize_default_kademlia_router( ksize=20, alpha=3, id_opt=None, storage=None): """ initialize swam when no swarm is passed in @@ -46,13 +46,14 @@ def initialize_default_kademlia( id_opt = generate_id() node_id = id_opt.get_raw_id() - return KademliaServer(ksize=ksize, alpha=alpha, - node_id=node_id, storage=storage) + server = KademliaServer(ksize=ksize, alpha=alpha, + node_id=node_id, storage=storage) + return KadmeliaPeerRouter(server) def initialize_default_swarm( - id_opt=None, transport_opt=None, - muxer_opt=None, sec_opt=None, peerstore_opt=None): + id_opt=None, transport_opt=None, muxer_opt=None, + sec_opt=None, peerstore_opt=None, disc_opt=None): """ initialize swarm when no swarm is passed in :param id_opt: optional id for host @@ -78,7 +79,8 @@ def initialize_default_swarm( upgrader = TransportUpgrader(sec, muxer) peerstore = peerstore_opt or PeerStore() - swarm_opt = Swarm(id_opt, peerstore, upgrader, transport) + swarm_opt = Swarm(id_opt, peerstore,\ + upgrader, transport, disc_opt) return swarm_opt @@ -105,14 +107,11 @@ async def new_node( swarm_opt = initialize_default_swarm( id_opt=id_opt, transport_opt=transport_opt, muxer_opt=muxer_opt, sec_opt=sec_opt, - peerstore_opt=peerstore_opt) + peerstore_opt=peerstore_opt, disc_opt=disc_opt) # TODO enable support for other host type # TODO routing unimplemented - if not disc_opt: - host = BasicHost(swarm_opt) - else: - host = RoutedHost(swarm_opt, disc_opt) + host = BasicHost(swarm_opt) # Kick off cleanup job asyncio.ensure_future(cleanup_done_tasks()) diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 0c3bf0e..40292df 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -11,21 +11,22 @@ from .host_interface import IHost class BasicHost(IHost): # default options constructor - def __init__(self, _network): - self.network = _network - self.peerstore = self.network.peerstore + def __init__(self, network, router=None): + self._network = network + self._router = router + self.peerstore = self._network.peerstore def get_id(self): """ :return: peer_id of host """ - return self.network.get_peer_id() + return self._network.get_peer_id() def get_network(self): """ :return: network instance of host """ - return self.network + return self._network def get_peerstore(self): """ @@ -45,7 +46,7 @@ class BasicHost(IHost): p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty())) addrs = [] - for transport in self.network.listeners.values(): + for transport in self._network.listeners.values(): for addr in transport.get_addrs(): addrs.append(addr.encapsulate(p2p_part)) return addrs @@ -57,7 +58,7 @@ class BasicHost(IHost): :param stream_handler: a stream handler function :return: true if successful """ - return self.network.set_stream_handler(protocol_id, stream_handler) + return self._network.set_stream_handler(protocol_id, stream_handler) # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on @@ -67,7 +68,7 @@ class BasicHost(IHost): :param protocol_id: protocol id that stream runs on :return: true if successful """ - stream = await self.network.new_stream(peer_id, protocol_ids) + stream = await self._network.new_stream(peer_id, protocol_ids) return stream async def connect(self, peer_info): @@ -84,7 +85,7 @@ class BasicHost(IHost): self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) # there is already a connection to this peer - if peer_info.peer_id in self.network.connections: + if peer_info.peer_id in self._network.connections: return - await self.network.dial_peer(peer_info.peer_id) + await self._network.dial_peer(peer_info.peer_id) diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 6a3b5fe..3fdfbc6 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -119,7 +119,7 @@ class ValueSpiderCrawl(SpiderCrawl): value_counts = Counter(values) if len(value_counts) != 1: log.warning("Got multiple values for key %i: %s", - self.node.long_id, str(values)) + self.node.xor_id, str(values)) value = value_counts.most_common(1)[0][0] peer = self.nearest_without_value.popleft() diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index fb40e3b..a5ef5db 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -15,8 +15,9 @@ class KadPeerInfo(PeerInfo): def __init__(self, peer_id, peer_data=None): super(KadPeerInfo, self).__init__(peer_id, peer_data) + self.peer_id_obj = peer_id self.peer_id = peer_id.get_raw_id() - self.long_id = int(digest(peer_id.get_raw_id()).hex(), 16) + self.xor_id = peer_id.get_xor_id() self.addrs = peer_data.get_addrs() if peer_data else None @@ -34,7 +35,7 @@ class KadPeerInfo(PeerInfo): """ Get the distance between this node and another. """ - return self.long_id ^ node.long_id + return self.xor_id ^ node.xor_id def __iter__(self): """ @@ -43,11 +44,15 @@ class KadPeerInfo(PeerInfo): return iter([self.peer_id, self.ip, self.port]) def __repr__(self): - return repr([self.long_id, self.ip, self.port]) + return repr([self.xor_id, self.ip, self.port, self.peer_id]) def __str__(self): return "%s:%s" % (self.ip, str(self.port)) + def encode(self): + return str(self.peer_id) + "\n" + \ + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port)) + class KadPeerHeap: """ A heap of peers ordered by distance to a given node. diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index 63a6c31..05b9ab4 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -67,7 +67,7 @@ class KademliaServer: listen = loop.create_datagram_endpoint(self._create_protocol, local_addr=(interface, port)) log.info("Node %i listening on %s:%i", - self.node.long_id, interface, port) + self.node.xor_id, interface, port) self.transport, self.protocol = await listen # finally, schedule refreshing table self.refresh_table() diff --git a/libp2p/kademlia/routed_host.py b/libp2p/kademlia/routed_host.py deleted file mode 100644 index 45fc0c6..0000000 --- a/libp2p/kademlia/routed_host.py +++ /dev/null @@ -1,21 +0,0 @@ -from libp2p.host.basic_host import BasicHost - -class RoutedHost(BasicHost): - def __init__(self, _network, _kad_network): - super(RoutedHost, self).__init__(_network) - self.kad_network = _kad_network - - def get_kad_network(self): - return self.kad_network - - def routed_listen(self, port, interface='0.0.0.0'): - return self.kad_network.listen(port, interface) - - def routed_get(self, key): - return self.kad_network.get(key) - - def routed_set(self, key, value): - return self.kad_network.set(key, value) - - def routed_set_digest(self, dkey, value): - return self.kad_network.set_digest(dkey, value) diff --git a/libp2p/kademlia/routing.py b/libp2p/kademlia/routing.py index 7d97494..b84717e 100644 --- a/libp2p/kademlia/routing.py +++ b/libp2p/kademlia/routing.py @@ -33,7 +33,7 @@ class KBucket: one = KBucket(self.range[0], midpoint, self.ksize) two = KBucket(midpoint + 1, self.range[1], self.ksize) for node in self.nodes.values(): - bucket = one if node.long_id <= midpoint else two + bucket = one if node.xor_id <= midpoint else two bucket.nodes[node.peer_id] = node return (one, two) @@ -48,7 +48,7 @@ class KBucket: self.nodes[newnode.peer_id] = newnode def has_in_range(self, node): - return self.range[0] <= node.long_id <= self.range[1] + return self.range[0] <= node.xor_id <= self.range[1] def is_new_node(self, node): return node.peer_id not in self.nodes @@ -175,7 +175,7 @@ class RoutingTable: Get the index of the bucket that the given node would fall into. """ for index, bucket in enumerate(self.buckets): - if node.long_id < bucket.range[1]: + if node.xor_id < bucket.range[1]: return index # we should never be here, but make linter happy return None diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index ff62c52..9b37b50 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -10,13 +10,14 @@ from .stream.net_stream import NetStream from .connection.raw_connection import RawConnection class Swarm(INetwork): - # pylint: disable=too-many-instance-attributes, cell-var-from-loop + # pylint: disable=too-many-instance-attributes,cell-var-from-loop,too-many-arguments - def __init__(self, peer_id, peerstore, upgrader, transport): + def __init__(self, peer_id, peerstore, upgrader, transport, router): self.self_id = peer_id self.peerstore = peerstore self.upgrader = upgrader self.transport = transport + self.router = router self.connections = dict() self.listeners = dict() self.stream_handlers = dict() @@ -57,8 +58,10 @@ class Swarm(INetwork): if not addrs: raise SwarmException("No known addresses to peer") - # TODO: define logic to choose which address to use, or try them all ? - multiaddr = addrs[0] + if not self.router: + multiaddr = addrs[0] + else: + multiaddr = self.router.find_peer(peer_id) if peer_id in self.connections: # If muxed connection already exists for peer_id, @@ -183,6 +186,9 @@ class Swarm(INetwork): return True return False + def add_router(self, router): + self.router = router + def create_generic_protocol_handler(swarm): """ Create a generic protocol handler from the given swarm. We use swarm diff --git a/libp2p/peer/id.py b/libp2p/peer/id.py index 92b4db3..47d7cf5 100644 --- a/libp2p/peer/id.py +++ b/libp2p/peer/id.py @@ -1,3 +1,4 @@ +import hashlib import base58 import multihash @@ -21,6 +22,9 @@ class ID: def pretty(self): return base58.b58encode(self._id_str).decode() + def get_xor_id(self): + return int(digest(self.get_raw_id()).hex(), 16) + def __str__(self): pid = self.pretty() if len(pid) <= 10: @@ -67,3 +71,8 @@ def id_from_public_key(key): def id_from_private_key(key): return id_from_public_key(key.publickey()) + +def digest(string): + if not isinstance(string, bytes): + string = str(string).encode('utf8') + return hashlib.sha1(string).digest() diff --git a/libp2p/routing/interfaces.py b/libp2p/routing/interfaces.py index 1f29d48..b519a09 100644 --- a/libp2p/routing/interfaces.py +++ b/libp2p/routing/interfaces.py @@ -28,4 +28,4 @@ class IPeerRouting(ABC): Find specific Peer FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo with relevant addresses. - """ + """ \ No newline at end of file diff --git a/libp2p/routing/kadmelia/__init__.py b/libp2p/routing/kademlia/__init__.py similarity index 100% rename from libp2p/routing/kadmelia/__init__.py rename to libp2p/routing/kademlia/__init__.py diff --git a/libp2p/routing/kadmelia/kadmelia_content_router.py b/libp2p/routing/kademlia/kademlia_content_router.py similarity index 100% rename from libp2p/routing/kadmelia/kadmelia_content_router.py rename to libp2p/routing/kademlia/kademlia_content_router.py diff --git a/libp2p/routing/kademlia/kademlia_peer_router.py b/libp2p/routing/kademlia/kademlia_peer_router.py new file mode 100644 index 0000000..45c43c8 --- /dev/null +++ b/libp2p/routing/kademlia/kademlia_peer_router.py @@ -0,0 +1,35 @@ +import ast + +from libp2p.routing.interfaces import IPeerRouting +from libp2p.kademlia.kad_peerinfo import create_kad_peerinfo + + +class KadmeliaPeerRouter(IPeerRouting): + # pylint: disable=too-few-public-methods + + def __init__(self, dht_server): + self.server = dht_server + + async def find_peer(self, peer_id): + """ + Find a specific peer + :param peer_id: peer to search for + :return: KadPeerInfo of specified peer + """ + # switching peer_id to xor_id used by kademlia as node_id + xor_id = peer_id.get_xor_id() + value = await self.server.get(xor_id) + return decode_peerinfo(value) + +def decode_peerinfo(encoded): + if isinstance(encoded, bytes): + encoded = encoded.decode() + try: + lines = ast.literal_eval(encoded) + except SyntaxError: + return None + ip = lines[1] # pylint: disable=invalid-name + port = lines[2] + peer_id = lines[3] + peer_info = create_kad_peerinfo(peer_id, ip, port) + return peer_info diff --git a/libp2p/routing/kadmelia/kadmelia_peer_router.py b/libp2p/routing/kadmelia/kadmelia_peer_router.py deleted file mode 100644 index 27be67c..0000000 --- a/libp2p/routing/kadmelia/kadmelia_peer_router.py +++ /dev/null @@ -1,31 +0,0 @@ -from libp2p.routing.interfaces import IPeerRouting -from libp2p.kademlia.utils import digest -from libp2p.peer.peerinfo import PeerInfo -from libp2p.peer.peerdata import PeerData - - -class KadmeliaPeerRouter(IPeerRouting): - # pylint: disable=too-few-public-methods - - def __init__(self, dht_server): - self.server = dht_server - - def find_peer(self, peer_id): - """ - Find specific Peer - FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo - with relevant addresses. - """ - value = self.server.get(peer_id) - return decode_peerinfo(value) - - -def decode_peerinfo(encoded): - if isinstance(encoded, bytes): - encoded = encoded.decode() - lines = encoded.splitlines() - peer_id = lines[0] - addrs = lines[1:] - peer_data = PeerData() - peer_data.add_addrs(addrs) - return PeerInfo(peer_id, addrs) diff --git a/tests/routing/__init__.py b/tests/routing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/routing/test_kad_peer_router.py b/tests/routing/test_kad_peer_router.py new file mode 100644 index 0000000..7f639fb --- /dev/null +++ b/tests/routing/test_kad_peer_router.py @@ -0,0 +1,73 @@ +import pytest + +from libp2p.kademlia.network import KademliaServer +from libp2p.routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter + +@pytest.mark.asyncio +async def test_simple_two_nodes(): + node_a = KademliaServer() + await node_a.listen(5678) + + node_b = KademliaServer() + await node_b.listen(5679) + + node_a_value = await node_b.bootstrap([("127.0.0.1", 5678)]) + node_a_kad_peerinfo = node_a_value[0] + await node_a.set(node_a_kad_peerinfo.xor_id, + repr(node_a_kad_peerinfo)) + + router = KadmeliaPeerRouter(node_b) + returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj) + print(repr(returned_info)) + print(repr(node_a_kad_peerinfo)) + assert repr(returned_info) == repr(node_a_kad_peerinfo) + +@pytest.mark.asyncio +async def test_simple_three_nodes(): + node_a = KademliaServer() + await node_a.listen(5701) + + node_b = KademliaServer() + await node_b.listen(5702) + + node_c = KademliaServer() + await node_c.listen(5703) + + node_a_value = await node_b.bootstrap([("127.0.0.1", 5701)]) + node_a_kad_peerinfo = node_a_value[0] + + await node_c.bootstrap([("127.0.0.1", 5702)]) + await node_a.set(node_a_kad_peerinfo.xor_id, + repr(node_a_kad_peerinfo)) + + router = KadmeliaPeerRouter(node_c) + returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj) + assert str(returned_info) == str(node_a_kad_peerinfo) + +@pytest.mark.asyncio +async def test_simple_four_nodes(): + node_a = KademliaServer() + await node_a.listen(5801) + + node_b = KademliaServer() + await node_b.listen(5802) + + node_c = KademliaServer() + await node_c.listen(5803) + + node_d = KademliaServer() + await node_d.listen(5804) + + node_a_value = await node_b.bootstrap([("127.0.0.1", 5801)]) + node_a_kad_peerinfo = node_a_value[0] + + await node_c.bootstrap([("127.0.0.1", 5802)]) + + await node_d.bootstrap([("127.0.0.1", 5803)]) + + await node_b.set(node_a_kad_peerinfo.xor_id, + repr(node_a_kad_peerinfo)) + + router = KadmeliaPeerRouter(node_d) + returned_info = await router.find_peer(node_a_kad_peerinfo.peer_id_obj) + assert str(returned_info) == str(node_a_kad_peerinfo)