do_Assoc(): keep the data of the buckets somewhat current

This commit is contained in:
Coren[m] 2013-12-04 00:58:57 +01:00
parent 632e692577
commit ad9d20c08b
No known key found for this signature in database
GPG Key ID: AFA6943800F5DC6D
4 changed files with 130 additions and 0 deletions

View File

@ -663,6 +663,11 @@ static int getnodes(DHT *dht, IP_Port ip_port, uint8_t *public_key, uint8_t *cli
return sendpacket(dht->c->lossless_udp->net, ip_port, data, sizeof(data));
}
void DHT_getnodes(DHT *dht, IP_Port *from_ipp, uint8_t *from_id, uint8_t *which_id)
{
getnodes(dht, *from_ipp, from_id, which_id);
}
/* Send a send nodes response. */
/* because of BINARY compatibility, the Node_format MUST BE Node4_format,
* IPv6 nodes are sent in a different message */
@ -1666,6 +1671,10 @@ void do_DHT(DHT *dht)
do_DHT_friends(dht);
do_NAT(dht);
do_toping(dht->ping);
if (dht->assoc)
do_Assoc(dht->assoc, dht);
dht->last_run = unix_time();
}
void kill_DHT(DHT *dht)

View File

@ -151,6 +151,8 @@ typedef struct {
} DHT;
/*----------------------------------------------------------------------------------*/
void DHT_getnodes(DHT *dht, IP_Port *from_ipp, uint8_t *from_id, uint8_t *which_id);
/* Add a new friend to the friends list.
* client_id must be CLIENT_ID_SIZE bytes long.
*

View File

@ -65,7 +65,9 @@ typedef struct Client_entry {
hash_t hash;
/* shortcuts & rumors: timers and data */
uint64_t getnodes;
uint64_t used_at;
uint64_t seen_at;
uint64_t heard_at;
@ -91,6 +93,7 @@ struct Assoc {
size_t candidates_bucket_count;
size_t candidates_bucket_size;
candidates_bucket *candidates;
uint64_t getnodes;
};
/*****************************************************************************/
@ -803,6 +806,7 @@ Assoc *new_Assoc(size_t bits, size_t entries, uint8_t *public_id)
}
assoc->candidates = lists;
assoc->getnodes = unix_time();
id_copy(assoc->self_client_id, public_id);
client_id_self_update(assoc);
@ -827,6 +831,114 @@ void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *id)
}
}
#ifdef LOGGING
static char *idpart2str(uint8_t *id, size_t len);
#endif
/* refresh buckets */
void do_Assoc(Assoc *assoc, DHT *dht)
{
if (is_timeout(assoc->getnodes, ASSOC_BUCKET_REFRESH)) {
assoc->getnodes = unix_time();
size_t candidate = (rand() % assoc->candidates_bucket_count) + assoc->candidates_bucket_count;
/* in that bucket or the buckets closest to it:
* find the best heard candidate
* find the best seen candidate
* send getnode() requests to both */
uint8_t *target_id = NULL;
Client_entry *heard = NULL, *seen = NULL;
size_t i, k, m, bckt;
for (i = 1; i < assoc->candidates_bucket_count; i++) {
if (i % 2)
k = - (i >> 1);
else
k = i >> 1;
bckt = (candidate + k) % assoc->candidates_bucket_count;
for (m = 0; m < assoc->candidates_bucket_size; m++)
if (assoc->candidates[bckt].list[m].hash) {
Client_entry *entry = &assoc->candidates[bckt].list[m];
if (!is_timeout(entry->getnodes, CANDIDATES_SEEN_TIMEOUT))
continue;
if (!target_id)
target_id = entry->client.client_id;
if (entry->seen_at) {
if (!seen)
if (!is_timeout(entry->seen_at, CANDIDATES_SEEN_TIMEOUT))
seen = entry;
}
if (entry->heard_at) {
if (!heard)
if (!is_timeout(entry->heard_at, CANDIDATES_HEARD_TIMEOUT))
heard = entry;
}
if (seen && heard)
break;
}
if (seen && heard)
break;
}
#ifdef LOGGING
size_t total = 0, written = sprintf(logbuffer, "assoc: [%u] => ",
(uint32_t)(candidate % assoc->candidates_bucket_count));
if (written > 0)
total += written;
#endif
if (seen) {
IPPTsPng *ippts = seen->seen_family == AF_INET ? &seen->client.assoc4 : &seen->client.assoc6;
#ifdef LOGGING
written = sprintf(logbuffer + total, " S[%s...] %s:%u", idpart2str(seen->client.client_id, 8),
ip_ntoa(&ippts->ip_port.ip), htons(ippts->ip_port.port));
if (written > 0)
total += written;
#endif
DHT_getnodes(dht, &ippts->ip_port, seen->client.client_id, target_id);
seen->getnodes = unix_time();
}
if (heard && (heard != seen)) {
IP_Port *ipp = heard->heard_family == AF_INET ? &heard->assoc_heard4 : &heard->assoc_heard6;
#ifdef LOGGING
written = sprintf(logbuffer + total, " H[%s...] %s:%u", idpart2str(heard->client.client_id, 8), ip_ntoa(&ipp->ip),
htons(ipp->port));
if (written > 0)
total += written;
#endif
DHT_getnodes(dht, ipp, heard->client.client_id, target_id);
heard->getnodes = unix_time();
}
#ifdef LOGGING
if (!heard && !seen)
sprintf(logbuffer + total, "no nodes to talk to??\n");
else
/* for arcane reasons, sprintf(str, "\n") doesn't function */
sprintf(logbuffer + total, "%s", "\n");
loglog(logbuffer);
#endif
}
}
/* destroy */
void kill_Assoc(Assoc *assoc)
{

View File

@ -76,6 +76,13 @@ Assoc *new_Assoc(size_t bits, size_t entries, uint8_t *public_id);
/* public_id changed (loaded), update which entry isn't stored */
void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *public_id);
/* every 45s send out a getnodes() for a "random" bucket */
#define ASSOC_BUCKET_REFRESH 45
/* refresh bucket's data from time to time
* this must be called only from DHT */
void do_Assoc(Assoc *assoc, DHT *dht);
/* destroy */
void kill_Assoc(Assoc *assoc);