Merge branch 'assoc-refresh-distant-data' of https://github.com/FullName/ProjectTox-Core into FullName-assoc-refresh-distant-data

This commit is contained in:
irungentoo 2013-12-12 21:51:17 -05:00
commit dfd46a040a
4 changed files with 130 additions and 1 deletions

View File

@ -799,7 +799,6 @@ static int getnodes(DHT *dht, IP_Port ip_port, uint8_t *public_key, uint8_t *cli
return sendpacket(dht->c->lossless_udp->net, ip_port, data, sizeof(data)); return sendpacket(dht->c->lossless_udp->net, ip_port, data, sizeof(data));
} }
/* Send a send nodes response. */ /* Send a send nodes response. */
/* because of BINARY compatibility, the Node_format MUST BE Node4_format, /* because of BINARY compatibility, the Node_format MUST BE Node4_format,
* IPv6 nodes are sent in a different message * IPv6 nodes are sent in a different message
@ -1356,6 +1355,11 @@ static void do_Close(DHT *dht)
} }
} }
void DHT_getnodes(DHT *dht, IP_Port *from_ipp, uint8_t *from_id, uint8_t *which_id)
{
getnodes(dht, *from_ipp, from_id, which_id, NULL);
}
void DHT_bootstrap(DHT *dht, IP_Port ip_port, uint8_t *public_key) void DHT_bootstrap(DHT *dht, IP_Port ip_port, uint8_t *public_key)
{ {
if (dht->assoc) { if (dht->assoc) {
@ -2104,6 +2108,10 @@ void do_DHT(DHT *dht)
do_NAT(dht); do_NAT(dht);
do_toping(dht->ping); do_toping(dht->ping);
do_hardening(dht); do_hardening(dht);
if (dht->assoc)
do_Assoc(dht->assoc, dht);
dht->last_run = unix_time(); dht->last_run = unix_time();
} }
void kill_DHT(DHT *dht) void kill_DHT(DHT *dht)

View File

@ -154,6 +154,8 @@ typedef struct {
} DHT; } DHT;
/*----------------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------------*/
void DHT_getnodes(DHT *dht, IP_Port *from_ipp, uint8_t *from_id, uint8_t *which_id);
/* Add a new friend to the friends list. /* Add a new friend to the friends list.
* client_id must be CLIENT_ID_SIZE bytes long. * client_id must be CLIENT_ID_SIZE bytes long.
* *

View File

@ -65,7 +65,9 @@ typedef struct Client_entry {
hash_t hash; hash_t hash;
/* shortcuts & rumors: timers and data */ /* shortcuts & rumors: timers and data */
uint64_t getnodes;
uint64_t used_at; uint64_t used_at;
uint64_t seen_at; uint64_t seen_at;
uint64_t heard_at; uint64_t heard_at;
@ -91,6 +93,7 @@ struct Assoc {
size_t candidates_bucket_count; size_t candidates_bucket_count;
size_t candidates_bucket_size; size_t candidates_bucket_size;
candidates_bucket *candidates; candidates_bucket *candidates;
uint64_t getnodes;
}; };
/*****************************************************************************/ /*****************************************************************************/
@ -834,6 +837,7 @@ Assoc *new_Assoc(size_t bits, size_t entries, uint8_t *public_id)
} }
assoc->candidates = lists; assoc->candidates = lists;
assoc->getnodes = unix_time();
id_copy(assoc->self_client_id, public_id); id_copy(assoc->self_client_id, public_id);
client_id_self_update(assoc); client_id_self_update(assoc);
@ -858,6 +862,114 @@ void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *id)
} }
} }
#ifdef LOGGING
static char *idpart2str(uint8_t *id, size_t len);
#endif
/* refresh buckets */
void do_Assoc(Assoc *assoc, DHT *dht)
{
if (is_timeout(assoc->getnodes, ASSOC_BUCKET_REFRESH)) {
assoc->getnodes = unix_time();
size_t candidate = (rand() % assoc->candidates_bucket_count) + assoc->candidates_bucket_count;
/* in that bucket or the buckets closest to it:
* find the best heard candidate
* find the best seen candidate
* send getnode() requests to both */
uint8_t *target_id = NULL;
Client_entry *heard = NULL, *seen = NULL;
size_t i, k, m, bckt;
for (i = 1; i < assoc->candidates_bucket_count; i++) {
if (i % 2)
k = - (i >> 1);
else
k = i >> 1;
bckt = (candidate + k) % assoc->candidates_bucket_count;
for (m = 0; m < assoc->candidates_bucket_size; m++)
if (assoc->candidates[bckt].list[m].hash) {
Client_entry *entry = &assoc->candidates[bckt].list[m];
if (!is_timeout(entry->getnodes, CANDIDATES_SEEN_TIMEOUT))
continue;
if (!target_id)
target_id = entry->client.client_id;
if (entry->seen_at) {
if (!seen)
if (!is_timeout(entry->seen_at, CANDIDATES_SEEN_TIMEOUT))
seen = entry;
}
if (entry->heard_at) {
if (!heard)
if (!is_timeout(entry->heard_at, CANDIDATES_HEARD_TIMEOUT))
heard = entry;
}
if (seen && heard)
break;
}
if (seen && heard)
break;
}
#ifdef LOGGING
size_t total = 0, written = sprintf(logbuffer, "assoc: [%u] => ",
(uint32_t)(candidate % assoc->candidates_bucket_count));
if (written > 0)
total += written;
#endif
if (seen) {
IPPTsPng *ippts = seen->seen_family == AF_INET ? &seen->client.assoc4 : &seen->client.assoc6;
#ifdef LOGGING
written = sprintf(logbuffer + total, " S[%s...] %s:%u", idpart2str(seen->client.client_id, 8),
ip_ntoa(&ippts->ip_port.ip), htons(ippts->ip_port.port));
if (written > 0)
total += written;
#endif
DHT_getnodes(dht, &ippts->ip_port, seen->client.client_id, target_id);
seen->getnodes = unix_time();
}
if (heard && (heard != seen)) {
IP_Port *ipp = heard->heard_family == AF_INET ? &heard->assoc_heard4 : &heard->assoc_heard6;
#ifdef LOGGING
written = sprintf(logbuffer + total, " H[%s...] %s:%u", idpart2str(heard->client.client_id, 8), ip_ntoa(&ipp->ip),
htons(ipp->port));
if (written > 0)
total += written;
#endif
DHT_getnodes(dht, ipp, heard->client.client_id, target_id);
heard->getnodes = unix_time();
}
#ifdef LOGGING
if (!heard && !seen)
sprintf(logbuffer + total, "no nodes to talk to??\n");
else
/* for arcane reasons, sprintf(str, "\n") doesn't function */
sprintf(logbuffer + total, "%s", "\n");
loglog(logbuffer);
#endif
}
}
/* destroy */ /* destroy */
void kill_Assoc(Assoc *assoc) void kill_Assoc(Assoc *assoc)
{ {

View File

@ -83,6 +83,13 @@ Assoc *new_Assoc(size_t bits, size_t entries, uint8_t *public_id);
/* public_id changed (loaded), update which entry isn't stored */ /* public_id changed (loaded), update which entry isn't stored */
void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *public_id); void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *public_id);
/* every 45s send out a getnodes() for a "random" bucket */
#define ASSOC_BUCKET_REFRESH 45
/* refresh bucket's data from time to time
* this must be called only from DHT */
void do_Assoc(Assoc *assoc, DHT *dht);
/* destroy */ /* destroy */
void kill_Assoc(Assoc *assoc); void kill_Assoc(Assoc *assoc);