py-libp2p/libp2p/kademlia/crawling.py

182 lines
6.0 KiB
Python
Raw Normal View History

2018-10-14 10:32:27 -04:00
from collections import Counter
import logging
2019-01-09 21:38:56 +03:00
from .kademlia.node import Node, NodeHeap
from .kademlia.utils import gather_dict
2018-10-14 10:32:27 -04:00
log = logging.getLogger(__name__)
2019-01-09 21:38:56 +03:00
class SpiderCrawl:
2018-10-14 10:32:27 -04:00
"""
Crawl the network and look for given 160-bit keys.
"""
def __init__(self, protocol, node, peers, ksize, alpha):
"""
Create a new C{SpiderCrawl}er.
Args:
protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance.
node: A :class:`~kademlia.node.Node` representing the key we're
looking for
peers: A list of :class:`~kademlia.node.Node` instances that
provide the entry point for the network
ksize: The value for k based on the paper
alpha: The value for alpha based on the paper
"""
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.node = node
self.nearest = NodeHeap(self.node, self.ksize)
self.lastIDsCrawled = []
log.info("creating spider with peers: %s", peers)
self.nearest.push(peers)
async def _find(self, rpcmethod):
"""
Get either a value or list of nodes.
Args:
rpcmethod: The protocol's callfindValue or callFindNode.
The process:
1. calls find_* to current ALPHA nearest not already queried nodes,
adding results to current nearest list of k nodes.
2. current nearest list needs to keep track of who has been queried
already sort by nearest, keep KSIZE
3. if list is same as last time, next call should be to everyone not
yet queried
4. repeat, unless nearest list has all been queried, then ur done
"""
log.info("crawling network with nearest: %s", str(tuple(self.nearest)))
count = self.alpha
if self.nearest.getIDs() == self.lastIDsCrawled:
count = len(self.nearest)
self.lastIDsCrawled = self.nearest.getIDs()
ds = {}
for peer in self.nearest.getUncontacted()[:count]:
ds[peer.id] = rpcmethod(peer, self.node)
self.nearest.markContacted(peer)
found = await gather_dict(ds)
return await self._nodesFound(found)
async def _nodesFound(self, responses):
raise NotImplementedError
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found
self.nearestWithoutValue = NodeHeap(self.node, 1)
async def find(self):
"""
Find either the closest nodes or the value requested.
"""
return await self._find(self.protocol.callFindValue)
async def _nodesFound(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
foundValues = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.hasValue():
foundValues.append(response.getValue())
else:
peer = self.nearest.getNodeById(peerid)
self.nearestWithoutValue.push(peer)
self.nearest.push(response.getNodeList())
self.nearest.remove(toremove)
if len(foundValues) > 0:
return await self._handleFoundValues(foundValues)
if self.nearest.allBeenContacted():
# not found!
return None
return await self.find()
async def _handleFoundValues(self, values):
"""
We got some values! Exciting. But let's make sure
they're all the same or freak out a little bit. Also,
make sure we tell the nearest node that *didn't* have
the value to store it.
"""
valueCounts = Counter(values)
if len(valueCounts) != 1:
log.warning("Got multiple values for key %i: %s",
self.node.long_id, str(values))
value = valueCounts.most_common(1)[0][0]
peerToSaveTo = self.nearestWithoutValue.popleft()
if peerToSaveTo is not None:
await self.protocol.callStore(peerToSaveTo, self.node.id, value)
return value
class NodeSpiderCrawl(SpiderCrawl):
async def find(self):
"""
Find the closest nodes.
"""
return await self._find(self.protocol.callFindNode)
async def _nodesFound(self, responses):
"""
Handle the result of an iteration in _find.
"""
toremove = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
else:
self.nearest.push(response.getNodeList())
self.nearest.remove(toremove)
if self.nearest.allBeenContacted():
return list(self.nearest)
return await self.find()
2019-01-09 21:38:56 +03:00
class RPCFindResponse:
2018-10-14 10:32:27 -04:00
def __init__(self, response):
"""
A wrapper for the result of a RPC find.
Args:
response: This will be a tuple of (<response received>, <value>)
where <value> will be a list of tuples if not found or
a dictionary of {'value': v} where v is the value desired
"""
self.response = response
def happened(self):
"""
Did the other host actually respond?
"""
return self.response[0]
def hasValue(self):
return isinstance(self.response[1], dict)
def getValue(self):
return self.response[1]['value']
def getNodeList(self):
"""
Get the node list in the response. If there's no value, this should
be set.
"""
nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist]