refactored routedhost into router passed to swarm

This commit is contained in:
Alex Haynes 2019-04-24 22:11:54 -04:00
parent fc4fc74b87
commit 6c5bac53d7
13 changed files with 48 additions and 50 deletions

View File

@ -10,6 +10,7 @@ from .kademlia.routed_host import RoutedHost
from .transport.upgrader import TransportUpgrader from .transport.upgrader import TransportUpgrader
from .transport.tcp.tcp import TCP from .transport.tcp.tcp import TCP
from .kademlia.network import KademliaServer from .kademlia.network import KademliaServer
from .routing.kademlia.kademlia_peer_router import KadmeliaPeerRouter
async def cleanup_done_tasks(): async def cleanup_done_tasks():
@ -31,7 +32,7 @@ def generate_id():
# private_key = new_key.exportKey("PEM") # private_key = new_key.exportKey("PEM")
return new_id return new_id
def initialize_default_kademlia( def initialize_default_kademlia_router(
ksize=20, alpha=3, id_opt=None, storage=None): ksize=20, alpha=3, id_opt=None, storage=None):
""" """
initialize swam when no swarm is passed in initialize swam when no swarm is passed in
@ -46,8 +47,9 @@ def initialize_default_kademlia(
id_opt = generate_id() id_opt = generate_id()
node_id = id_opt.get_raw_id() node_id = id_opt.get_raw_id()
return KademliaServer(ksize=ksize, alpha=alpha, server = KademliaServer(ksize=ksize, alpha=alpha,
node_id=node_id, storage=storage) node_id=node_id, storage=storage)
return KadmeliaPeerRouter(server)
def initialize_default_swarm( def initialize_default_swarm(
@ -105,12 +107,11 @@ async def new_node(
muxer_opt=muxer_opt, sec_opt=sec_opt, muxer_opt=muxer_opt, sec_opt=sec_opt,
peerstore_opt=peerstore_opt) peerstore_opt=peerstore_opt)
swarm_opt.add_router(disc_opt)
# TODO enable support for other host type # TODO enable support for other host type
# TODO routing unimplemented # TODO routing unimplemented
if not disc_opt: host = BasicHost(swarm_opt)
host = BasicHost(swarm_opt)
else:
host = RoutedHost(swarm_opt, disc_opt)
# Kick off cleanup job # Kick off cleanup job
asyncio.ensure_future(cleanup_done_tasks()) asyncio.ensure_future(cleanup_done_tasks())

View File

@ -11,21 +11,22 @@ from .host_interface import IHost
class BasicHost(IHost): class BasicHost(IHost):
# default options constructor # default options constructor
def __init__(self, _network): def __init__(self, network, router=None):
self.network = _network self._network = network
self.peerstore = self.network.peerstore self.peerstore = self._network.peerstore
self._router = router
def get_id(self): def get_id(self):
""" """
:return: peer_id of host :return: peer_id of host
""" """
return self.network.get_peer_id() return self._network.get_peer_id()
def get_network(self): def get_network(self):
""" """
:return: network instance of host :return: network instance of host
""" """
return self.network return self._network
def get_peerstore(self): def get_peerstore(self):
""" """
@ -45,7 +46,7 @@ class BasicHost(IHost):
p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty())) p2p_part = multiaddr.Multiaddr('/p2p/{}'.format(self.get_id().pretty()))
addrs = [] addrs = []
for transport in self.network.listeners.values(): for transport in self._network.listeners.values():
for addr in transport.get_addrs(): for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part)) addrs.append(addr.encapsulate(p2p_part))
return addrs return addrs
@ -57,7 +58,7 @@ class BasicHost(IHost):
:param stream_handler: a stream handler function :param stream_handler: a stream handler function
:return: true if successful :return: true if successful
""" """
return self.network.set_stream_handler(protocol_id, stream_handler) return self._network.set_stream_handler(protocol_id, stream_handler)
# protocol_id can be a list of protocol_ids # protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
@ -67,7 +68,7 @@ class BasicHost(IHost):
:param protocol_id: protocol id that stream runs on :param protocol_id: protocol id that stream runs on
:return: true if successful :return: true if successful
""" """
stream = await self.network.new_stream(peer_id, protocol_ids) stream = await self._network.new_stream(peer_id, protocol_ids)
return stream return stream
async def connect(self, peer_info): async def connect(self, peer_info):
@ -84,7 +85,7 @@ class BasicHost(IHost):
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10) self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)
# there is already a connection to this peer # there is already a connection to this peer
if peer_info.peer_id in self.network.connections: if peer_info.peer_id in self._network.connections:
return return
await self.network.dial_peer(peer_info.peer_id) await self._network.dial_peer(peer_info.peer_id)

View File

@ -119,7 +119,7 @@ class ValueSpiderCrawl(SpiderCrawl):
value_counts = Counter(values) value_counts = Counter(values)
if len(value_counts) != 1: if len(value_counts) != 1:
log.warning("Got multiple values for key %i: %s", log.warning("Got multiple values for key %i: %s",
self.node.long_id, str(values)) self.node.xor_id, str(values))
value = value_counts.most_common(1)[0][0] value = value_counts.most_common(1)[0][0]
peer = self.nearest_without_value.popleft() peer = self.nearest_without_value.popleft()

View File

@ -16,7 +16,7 @@ class KadPeerInfo(PeerInfo):
super(KadPeerInfo, self).__init__(peer_id, peer_data) super(KadPeerInfo, self).__init__(peer_id, peer_data)
self.peer_id = peer_id.get_raw_id() self.peer_id = peer_id.get_raw_id()
self.long_id = int(digest(peer_id.get_raw_id()).hex(), 16) self.xor_id = peer_id.get_xor_id()
self.addrs = peer_data.get_addrs() if peer_data else None self.addrs = peer_data.get_addrs() if peer_data else None
@ -34,7 +34,7 @@ class KadPeerInfo(PeerInfo):
""" """
Get the distance between this node and another. Get the distance between this node and another.
""" """
return self.long_id ^ node.long_id return self.xor_id ^ node.xor_id
def __iter__(self): def __iter__(self):
""" """
@ -43,7 +43,7 @@ class KadPeerInfo(PeerInfo):
return iter([self.peer_id, self.ip, self.port]) return iter([self.peer_id, self.ip, self.port])
def __repr__(self): def __repr__(self):
return repr([self.long_id, self.ip, self.port]) return repr([self.xor_id, self.ip, self.port])
def __str__(self): def __str__(self):
return "%s:%s" % (self.ip, str(self.port)) return "%s:%s" % (self.ip, str(self.port))

View File

@ -67,7 +67,7 @@ class KademliaServer:
listen = loop.create_datagram_endpoint(self._create_protocol, listen = loop.create_datagram_endpoint(self._create_protocol,
local_addr=(interface, port)) local_addr=(interface, port))
log.info("Node %i listening on %s:%i", log.info("Node %i listening on %s:%i",
self.node.long_id, interface, port) self.node.xor_id, interface, port)
self.transport, self.protocol = await listen self.transport, self.protocol = await listen
# finally, schedule refreshing table # finally, schedule refreshing table
self.refresh_table() self.refresh_table()

View File

@ -1,21 +0,0 @@
from libp2p.host.basic_host import BasicHost
class RoutedHost(BasicHost):
def __init__(self, _network, _kad_network):
super(RoutedHost, self).__init__(_network)
self.kad_network = _kad_network
def get_kad_network(self):
return self.kad_network
def routed_listen(self, port, interface='0.0.0.0'):
return self.kad_network.listen(port, interface)
def routed_get(self, key):
return self.kad_network.get(key)
def routed_set(self, key, value):
return self.kad_network.set(key, value)
def routed_set_digest(self, dkey, value):
return self.kad_network.set_digest(dkey, value)

View File

@ -33,7 +33,7 @@ class KBucket:
one = KBucket(self.range[0], midpoint, self.ksize) one = KBucket(self.range[0], midpoint, self.ksize)
two = KBucket(midpoint + 1, self.range[1], self.ksize) two = KBucket(midpoint + 1, self.range[1], self.ksize)
for node in self.nodes.values(): for node in self.nodes.values():
bucket = one if node.long_id <= midpoint else two bucket = one if node.xor_id <= midpoint else two
bucket.nodes[node.peer_id] = node bucket.nodes[node.peer_id] = node
return (one, two) return (one, two)
@ -48,7 +48,7 @@ class KBucket:
self.nodes[newnode.peer_id] = newnode self.nodes[newnode.peer_id] = newnode
def has_in_range(self, node): def has_in_range(self, node):
return self.range[0] <= node.long_id <= self.range[1] return self.range[0] <= node.xor_id <= self.range[1]
def is_new_node(self, node): def is_new_node(self, node):
return node.peer_id not in self.nodes return node.peer_id not in self.nodes
@ -175,7 +175,7 @@ class RoutingTable:
Get the index of the bucket that the given node would fall into. Get the index of the bucket that the given node would fall into.
""" """
for index, bucket in enumerate(self.buckets): for index, bucket in enumerate(self.buckets):
if node.long_id < bucket.range[1]: if node.xor_id < bucket.range[1]:
return index return index
# we should never be here, but make linter happy # we should never be here, but make linter happy
return None return None

View File

@ -20,6 +20,7 @@ class Swarm(INetwork):
self.listeners = dict() self.listeners = dict()
self.stream_handlers = dict() self.stream_handlers = dict()
self.transport = None self.transport = None
self.router = None
# Protocol muxing # Protocol muxing
self.multiselect = Multiselect() self.multiselect = Multiselect()
@ -57,8 +58,10 @@ class Swarm(INetwork):
if not addrs: if not addrs:
raise SwarmException("No known addresses to peer") raise SwarmException("No known addresses to peer")
# TODO: define logic to choose which address to use, or try them all ? if not self.router:
multiaddr = addrs[0] multiaddr = addrs[0]
else:
multiaddr = self.router.find_peer(peer_id)
if peer_id in self.connections: if peer_id in self.connections:
# If muxed connection already exists for peer_id, # If muxed connection already exists for peer_id,
@ -187,6 +190,9 @@ class Swarm(INetwork):
# TODO: Support more than one transport # TODO: Support more than one transport
self.transport = transport self.transport = transport
def add_router(self, router):
self.router = router
def create_generic_protocol_handler(swarm): def create_generic_protocol_handler(swarm):
""" """
Create a generic protocol handler from the given swarm. We use swarm Create a generic protocol handler from the given swarm. We use swarm

View File

@ -1,5 +1,6 @@
import base58 import base58
import multihash import multihash
import hashlib
# MaxInlineKeyLength is the maximum length a key can be for it to be inlined in # MaxInlineKeyLength is the maximum length a key can be for it to be inlined in
# the peer ID. # the peer ID.
@ -21,6 +22,9 @@ class ID:
def pretty(self): def pretty(self):
return base58.b58encode(self._id_str).decode() return base58.b58encode(self._id_str).decode()
def get_xor_id(self):
return int(digest(self.get_raw_id()).hex(), 16)
def __str__(self): def __str__(self):
pid = self.pretty() pid = self.pretty()
if len(pid) <= 10: if len(pid) <= 10:
@ -67,3 +71,8 @@ def id_from_public_key(key):
def id_from_private_key(key): def id_from_private_key(key):
return id_from_public_key(key.publickey()) return id_from_public_key(key.publickey())
def digest(string):
if not isinstance(string, bytes):
string = str(string).encode('utf8')
return hashlib.sha1(string).digest()

View File

@ -28,4 +28,4 @@ class IPeerRouting(ABC):
Find specific Peer Find specific Peer
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
with relevant addresses. with relevant addresses.
""" """

View File

@ -16,7 +16,9 @@ class KadmeliaPeerRouter(IPeerRouting):
FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo FindPeer searches for a peer with given peer_id, returns a peer.PeerInfo
with relevant addresses. with relevant addresses.
""" """
value = self.server.get(peer_id) # switching peer_id to xor_id used by kademlia as node_id
xor_id = peer_id.get_xor_id()
value = self.server.get(xor_id)
return decode_peerinfo(value) return decode_peerinfo(value)