From 0d8329b3a9b16cd6089810e61ce958fde00046b8 Mon Sep 17 00:00:00 2001 From: "Coren[m]" Date: Thu, 14 Nov 2013 19:05:36 +0100 Subject: [PATCH 1/2] Significantly trimmed down version of an ID<=>IP cache. Besides acknowledging timeouts, the module isn't trying to do anything fancy with the data besides storing and retrieving. --- auto_tests/Makefile.inc | 11 +- auto_tests/assoc_test.c | 68 ++++ toxcore/DHT.c | 81 ++++- toxcore/DHT.h | 33 +- toxcore/Makefile.inc | 2 + toxcore/Messenger.c | 9 +- toxcore/assoc.c | 739 ++++++++++++++++++++++++++++++++++++++++ toxcore/assoc.h | 73 ++++ toxcore/group_chats.c | 37 +- toxcore/group_chats.h | 8 +- toxcore/network.h | 2 +- toxcore/ping.c | 29 +- toxcore/ping.h | 4 - 13 files changed, 1054 insertions(+), 42 deletions(-) create mode 100644 auto_tests/assoc_test.c create mode 100644 toxcore/assoc.c create mode 100644 toxcore/assoc.h diff --git a/auto_tests/Makefile.inc b/auto_tests/Makefile.inc index 9c06bc78..1831fc50 100644 --- a/auto_tests/Makefile.inc +++ b/auto_tests/Makefile.inc @@ -1,8 +1,8 @@ if BUILD_TESTS -TESTS = messenger_autotest crypto_test network_test +TESTS = messenger_autotest crypto_test network_test assoc_test -check_PROGRAMS = messenger_autotest crypto_test network_test +check_PROGRAMS = messenger_autotest crypto_test network_test assoc_test AUTOTEST_CFLAGS = \ $(LIBSODIUM_CFLAGS) \ @@ -38,6 +38,13 @@ network_test_CFLAGS = $(AUTOTEST_CFLAGS) network_test_LDADD = $(AUTOTEST_LDADD) +assoc_test_SOURCES = ../auto_tests/assoc_test.c + +assoc_test_CFLAGS = $(AUTOTEST_CFLAGS) + +assoc_test_LDADD = $(AUTOTEST_LDADD) + + endif EXTRA_DIST += $(top_srcdir)/auto_tests/friends_test.c diff --git a/auto_tests/assoc_test.c b/auto_tests/assoc_test.c new file mode 100644 index 00000000..c0b8d065 --- /dev/null +++ b/auto_tests/assoc_test.c @@ -0,0 +1,68 @@ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#define AUTO_TEST +#include "../toxcore/DHT.h" +#include "../toxcore/assoc.h" +#include "../toxcore/util.h" + +#include +#include +#include + +#include + +START_TEST(test_X) +{ + /* TODO: real test */ + Assoc *assoc = new_Assoc(NULL); + + uint8_t id[CLIENT_ID_SIZE]; + IPPTs ippts_send; + IP_Port ipp_recv; + + Assoc_add_entry(assoc, id, &ippts_send, &ipp_recv); + + Assoc_close_entries close_entries; + memset(&close_entries, 0, sizeof(close_entries)); + + /* uint8_t found = */ Assoc_get_close_entries(assoc, &close_entries); +} +END_TEST + + +#define DEFTESTCASE(NAME) \ + TCase *tc_##NAME = tcase_create(#NAME); \ + tcase_add_test(tc_##NAME, test_##NAME); \ + suite_add_tcase(s, tc_##NAME); + +#define DEFTESTCASE_SLOW(NAME, TIMEOUT) \ + DEFTESTCASE(NAME) \ + tcase_set_timeout(tc_##NAME, TIMEOUT); + +Suite *Assoc_suite(void) +{ + Suite *s = suite_create("Assoc"); + + DEFTESTCASE(X); + + return s; +} + +int main(int argc, char *argv[]) +{ + Suite *Assoc = Assoc_suite(); + SRunner *test_runner = srunner_create(Assoc); + + srunner_set_fork_status(test_runner, CK_NOFORK); + + srunner_run_all(test_runner, CK_NORMAL); + + int number_failed = srunner_ntests_failed(test_runner); + + srunner_free(test_runner); + + return number_failed; +} diff --git a/toxcore/DHT.c b/toxcore/DHT.c index eb50cc43..9730771b 100644 --- a/toxcore/DHT.c +++ b/toxcore/DHT.c @@ -28,27 +28,20 @@ #endif #include "DHT.h" -#include "network.h" +#include "assoc.h" #include "ping.h" + +#include "network.h" +#include "LAN_discovery.h" #include "misc_tools.h" #include "util.h" -#include "LAN_discovery.h" - -/* The number of seconds for a non responsive node to become bad. */ -#define BAD_NODE_TIMEOUT 70 /* The max number of nodes to send with send nodes. */ #define MAX_SENT_NODES 8 -/* Ping timeout in seconds */ -#define PING_TIMEOUT 5 - /* The timeout after which a node is discarded completely. */ #define KILL_NODE_TIMEOUT 300 -/* Ping interval in seconds for each node in our lists. */ -#define PING_INTERVAL 60 - /* Ping interval in seconds for each random sending of a get nodes request. */ #define GET_NODE_INTERVAL 5 @@ -850,6 +843,14 @@ static int handle_sendnodes_core(void *object, IP_Port source, uint8_t *packet, /* store the address the *request* was sent to */ addto_lists(dht, dht->send_nodes[send_nodes_index - 1].ip_port, packet + 1); + if (dht->assoc) { + IPPTs ippts; + + ippts.ip_port = dht->send_nodes[send_nodes_index - 1].ip_port; + ippts.timestamp = dht->send_nodes[send_nodes_index - 1].timestamp; + Assoc_add_entry(dht->assoc, packet + 1, &ippts, &source); + } + *num_nodes_out = num_nodes; return 0; @@ -879,6 +880,13 @@ static int handle_sendnodes(void *object, IP_Port source, uint8_t *packet, uint3 send_ping_request(dht->ping, ipp, nodes4_list[i].client_id); returnedip_ports(dht, ipp, nodes4_list[i].client_id, packet + 1); + + if (dht->assoc) { + IPPTs ippts; + ippts.ip_port = ipp; + ippts.timestamp = 0; + Assoc_add_entry(dht->assoc, nodes4_list[i].client_id, &ippts, NULL); + } } return 0; @@ -902,6 +910,13 @@ static int handle_sendnodes_ipv6(void *object, IP_Port source, uint8_t *packet, if (ipport_isset(&nodes_list[i].ip_port)) { send_ping_request(dht->ping, nodes_list[i].ip_port, nodes_list[i].client_id); returnedip_ports(dht, nodes_list[i].ip_port, nodes_list[i].client_id, packet + 1); + + if (dht->assoc) { + IPPTs ippts; + ippts.ip_port = nodes_list[i].ip_port; + ippts.timestamp = 0; + Assoc_add_entry(dht->assoc, nodes_list[i].client_id, &ippts, NULL); + } } return 0; @@ -950,7 +965,38 @@ int DHT_addfriend(DHT *dht, uint8_t *client_id) dht->friends_list[dht->num_friends].nat.NATping_id = ((uint64_t)random_int() << 32) + random_int(); ++dht->num_friends; - get_bunchnodes(dht, dht->close_clientlist, LCLIENT_LIST, MAX_FRIEND_CLIENTS, client_id);/*TODO: make this better?*/ + + if (dht->assoc) { + /* get up to MAX_FRIEND_CLIENTS connectable nodes */ + DHT_Friend *friend = &dht->friends_list[dht->num_friends - 1]; + + Assoc_close_entries close_entries; + memset(&close_entries, 0, sizeof(close_entries)); + close_entries.wanted_id = client_id; + close_entries.count_good = MAX_FRIEND_CLIENTS / 2; + close_entries.count = MAX_FRIEND_CLIENTS; + close_entries.result = calloc(MAX_FRIEND_CLIENTS, sizeof(*close_entries.result)); + + uint8_t i, found = Assoc_get_close_entries(dht->assoc, &close_entries); + + for (i = 0; i < found; i++) + memcpy(&friend->client_list[i], close_entries.result[i], sizeof(*close_entries.result[i])); + + if (found) { + /* send getnodes to the "best" entry */ + Client_data *client = &friend->client_list[0]; + + if (ipport_isset(&client->assoc4.ip_port)) + getnodes(dht, client->assoc4.ip_port, client->client_id, friend->client_id); + + if (ipport_isset(&client->assoc6.ip_port)) + getnodes(dht, client->assoc6.ip_port, client->client_id, friend->client_id); + } + } + + /*TODO: make this better?*/ + get_bunchnodes(dht, dht->close_clientlist, LCLIENT_LIST, MAX_FRIEND_CLIENTS, client_id); + return 0; } @@ -1085,6 +1131,13 @@ static void do_Close(DHT *dht) void DHT_bootstrap(DHT *dht, IP_Port ip_port, uint8_t *public_key) { + if (dht->assoc) { + IPPTs ippts; + ippts.ip_port = ip_port; + ippts.timestamp = 0; + Assoc_add_entry(dht->assoc, public_key, &ippts, NULL); + } + getnodes(dht, ip_port, public_key, dht->c->self_public_key); } int DHT_bootstrap_from_address(DHT *dht, const char *address, uint8_t ipv6enabled, @@ -1556,6 +1609,9 @@ DHT *new_DHT(Net_Crypto *c) init_cryptopackets(dht); cryptopacket_registerhandler(c, CRYPTO_PACKET_NAT_PING, &handle_NATping, dht); + /* dhtassoc is not mandatory for now */ + dht->assoc = new_Assoc(dht); + return dht; } @@ -1570,6 +1626,7 @@ void do_DHT(DHT *dht) } void kill_DHT(DHT *dht) { + kill_Assoc(dht->assoc); kill_ping(dht->ping); free(dht->friends_list); free(dht); diff --git a/toxcore/DHT.h b/toxcore/DHT.h index 360773ff..dc8b0e38 100644 --- a/toxcore/DHT.h +++ b/toxcore/DHT.h @@ -43,6 +43,22 @@ /* Maximum newly announced nodes to ping per TIME_TOPING seconds. */ #define MAX_TOPING 16 +/* Ping timeout in seconds */ +#define PING_TIMEOUT 5 + +/* Ping interval in seconds for each node in our lists. */ +#define PING_INTERVAL 60 + +/* The number of seconds for a non responsive node to become bad. */ +#define PINGS_MISSED_NODE_GOES_BAD 3 +#define PING_ROUNDTRIP 2 +#define BAD_NODE_TIMEOUT (PING_INTERVAL + PINGS_MISSED_NODE_GOES_BAD * PING_INTERVAL + PING_ROUNDTRIP) + +typedef struct { + IP_Port ip_port; + uint64_t timestamp; +} IPPTs; + typedef struct { IP_Port ip_port; uint64_t timestamp; @@ -117,22 +133,25 @@ typedef struct { uint64_t timestamp; } pinged_t; +typedef struct PING PING; +typedef struct Assoc Assoc; + typedef struct { Net_Crypto *c; Client_data close_clientlist[LCLIENT_LIST]; - DHT_Friend *friends_list; - uint16_t num_friends; uint64_t close_lastgetnodes; + DHT_Friend *friends_list; + uint16_t num_friends; + pinged_t send_nodes[LSEND_NODES_ARRAY]; - void *ping; + PING *ping; + + Assoc *assoc; } DHT; /*----------------------------------------------------------------------------------*/ - -Client_data *DHT_get_close_list(DHT *dht); - /* Add a new friend to the friends list. * client_id must be CLIENT_ID_SIZE bytes long. * @@ -256,5 +275,5 @@ int DHT_isconnected(DHT *dht); void addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id); - #endif + diff --git a/toxcore/Makefile.inc b/toxcore/Makefile.inc index 116a3e29..8208c548 100644 --- a/toxcore/Makefile.inc +++ b/toxcore/Makefile.inc @@ -27,6 +27,8 @@ libtoxcore_la_SOURCES = ../toxcore/DHT.h \ ../toxcore/util.c \ ../toxcore/group_chats.h \ ../toxcore/group_chats.c \ + ../toxcore/assoc.h \ + ../toxcore/assoc.c \ ../toxcore/misc_tools.h libtoxcore_la_CFLAGS = -I$(top_srcdir) \ diff --git a/toxcore/Messenger.c b/toxcore/Messenger.c index 57cafc3f..49cffc98 100644 --- a/toxcore/Messenger.c +++ b/toxcore/Messenger.c @@ -26,9 +26,11 @@ #endif #include "Messenger.h" +#include "assoc.h" #include "network.h" #include "util.h" + #define MIN(a,b) (((a)<(b))?(a):(b)) @@ -784,7 +786,7 @@ int add_groupchat(Messenger *m) for (i = 0; i < m->numchats; ++i) { if (m->chats[i] == NULL) { - Group_Chat *newchat = new_groupchat(m->net); + Group_Chat *newchat = new_groupchat(m->net, m->dht->assoc); if (newchat == NULL) return -1; @@ -803,7 +805,7 @@ int add_groupchat(Messenger *m) if (temp == NULL) return -1; - temp[m->numchats] = new_groupchat(m->net); + temp[m->numchats] = new_groupchat(m->net, m->dht->assoc); if (temp[m->numchats] == NULL) return -1; @@ -2006,6 +2008,9 @@ static int messenger_load_state_callback(void *outer, uint8_t *data, uint32_t le if (length == crypto_box_PUBLICKEYBYTES + crypto_box_SECRETKEYBYTES + sizeof(uint32_t)) { set_nospam(&(m->fr), *(uint32_t *)data); load_keys(m->net_crypto, &data[sizeof(uint32_t)]); + + if (m->dht->assoc) + Assoc_self_client_id_changed(m->dht->assoc); } else return -1; /* critical */ diff --git a/toxcore/assoc.c b/toxcore/assoc.c new file mode 100644 index 00000000..7368819d --- /dev/null +++ b/toxcore/assoc.c @@ -0,0 +1,739 @@ + +#include "DHT.h" +#include "assoc.h" +#include "ping.h" + +#include "LAN_discovery.h" + +#include "util.h" + +/* + * BASIC OVERVIEW: + * + * Hash: The client_id is hashed with a local hash function. + * Hashes are used in multiple places for searching. + * Bucket: The first n bits of the client_id are used to + * select a bucket. This speeds up sorting, but the more + * important reason is to enforce a spread in the space of + * client_ids available. + * + * + * Candidates: + * + * Candidates are kept in buckets of hash tables. The hash + * function is calculated from the client_id. Up to + * HASH_COLLIDE_COUNT alternative positions are tried if + * the inital position is already used by a different entry. + * The collision function is multiplicative, not additive. + * + * A new candidate can bump an existing candidate, if it is + * more "desirable": Seen beats Heard. + */ + +/* candidates: number of bucket bits/buckets + * if this is raised dramatically, DISTANCE_INDEX_DISTANCE_BITS + * might have to be adjusted */ +#define CANDIDATES_BUCKET_BITS 8 +#define CANDIDATES_BUCKET_COUNT (1 << CANDIDATES_BUCKET_BITS) + +/* candidates: number of candidates to keep PER BUCKET (should be a prime + * for hash reasons, other primes e.g. 251, 509, 1021, 2039, 4093, 8191) + * total number of candidates is therefore less than (BUCKET_COUNT * TO_KEEP), + * given that a hash table is usually filling decently to around 50%, the + * total long-term number of entries will be around 0.5 * 256 * 251 ~= 32k + * + * if this is raised dramatically, DISTANCE_INDEX_DISTANCE_BITS + * might have to be adjusted */ +#define CANDIDATES_TO_KEEP 251 + +/* candidates: alternative places for the same hash value */ +#define HASH_COLLIDE_COUNT 5 + +/* candidates: bump entries: timeout values for seen/heard to be considered of value */ +#define CANDIDATES_SEEN_TIMEOUT 1800 +#define CANDIDATES_HEARD_TIMEOUT 600 + +/* distance/index: index size & access mask */ +#define DISTANCE_INDEX_INDEX_BITS (64 - DISTANCE_INDEX_DISTANCE_BITS) +#define DISTANCE_INDEX_INDEX_MASK ((1 << DISTANCE_INDEX_INDEX_BITS) - 1) + +/* types to stay consistent */ +#if (CANDIDATES_BUCKET_BITS <= 16) +typedef uint16_t bucket_t; +#else +typedef uint32_t bucket_t; +#endif +typedef uint16_t usecnt_t; +typedef uint32_t hash_t; + +/* abbreviations ... */ +typedef Assoc_distance_relative_callback dist_rel_cb; +typedef Assoc_distance_absolute_callback dist_abs_cb; + +/* + * Client_data wrapped with additional data + */ +typedef struct Client_entry { + hash_t hash; + + /* shortcuts & rumors: timers and data */ + uint64_t seen_at; + uint64_t heard_at; + + uint16_t seen_family; + uint16_t heard_family; + + IP_Port assoc_heard4; + IP_Port assoc_heard6; + + Client_data client; +} Client_entry; + +typedef struct candidates_bucket { + Client_entry list[CANDIDATES_TO_KEEP]; /* hashed list (with holes) */ +} candidates_bucket; + +typedef struct Assoc { + DHT *dht; /* for ping/getnodes */ + hash_t self_hash; /* hash of self_client_id */ + uint8_t *self_client_id; /* don't store entries for this */ + + /* association centralization: clients not in use */ + candidates_bucket candidates[CANDIDATES_BUCKET_COUNT]; +} Assoc; + +/*****************************************************************************/ +/* HELPER FUNCTIONS */ +/*****************************************************************************/ + +/* the complete distance would be CLIENT_ID_SIZE long... + * returns DISTANCE_INDEX_DISTANCE_BITS valid bits */ +static uint64_t id_distance(Assoc *assoc, void *callback_data, uint8_t *id_ref, uint8_t *id_test) +{ + /* with BIG_ENDIAN, this would be a one-liner... */ + uint64_t retval = 0; + + uint8_t pos = 0, bits = DISTANCE_INDEX_DISTANCE_BITS; + + while (bits > 8) { + uint8_t distance = abs((int8_t)id_ref[pos] ^ (int8_t)id_test[pos]); + retval = (retval << 8) | distance; + bits -= 8; + pos++; + } + + return (retval << bits) | ((id_ref[pos] ^ id_test[pos]) >> (8 - bits)); +} + +/* qsort() callback for a sorting by id_distance() values */ +static int dist_index_comp(const void *a, const void *b) +{ + const uint64_t *_a = a; + const uint64_t *_b = b; + + if (*_a < *_b) + return -1; + + if (*_a > *_b) + return 1; + + return 0; +} + +/* get actual entry to a distance_index */ +static Client_entry *dist_index_entry(Assoc *assoc, uint64_t dist_ind) +{ + if ((dist_ind & DISTANCE_INDEX_INDEX_MASK) == DISTANCE_INDEX_INDEX_MASK) + return NULL; + + size_t offset = CANDIDATES_BUCKET_COUNT * CANDIDATES_TO_KEEP; + uint32_t index = dist_ind & DISTANCE_INDEX_INDEX_MASK; + + if (index < offset) { + bucket_t b_id = index / CANDIDATES_TO_KEEP; + candidates_bucket *cnd_bckt = &assoc->candidates[b_id]; + size_t b_ix = index % CANDIDATES_TO_KEEP; + Client_entry *entry = &cnd_bckt->list[b_ix]; + + if (entry->hash) + return entry; + } + + return NULL; +} + +/* get actual entry's client_id to a distance_index */ +static uint8_t *dist_index_id(Assoc *assoc, uint64_t dist_ind) +{ + Client_entry *entry = dist_index_entry(assoc, dist_ind); + + if (entry) + return entry->client.client_id; + + return NULL; +} + +/* sorts first .. last, i.e. last is included */ +static void dist_index_bubble(Assoc *assoc, uint64_t *dist_list, size_t first, size_t last, uint8_t *id, + void *custom_data, Assoc_distance_relative_callback dist_rel_func) +{ + size_t i, k; + + for (i = first; i <= last; i++) { + uint8_t *id1 = dist_index_id(assoc, dist_list[i]); + + for (k = i + 1; k <= last; k++) { + uint8_t *id2 = dist_index_id(assoc, dist_list[k]); + + if (id1 && id2) + if (dist_rel_func(assoc, custom_data, id, id1, id2) == 2) { + uint64_t swap = dist_list[i]; + dist_list[i] = dist_list[k]; + dist_list[k] = swap; + } + } + } +} + +/* TODO: Check that there isn't a function like this elsewhere hidden. + * E.g. the one which creates a handshake_id isn't usable for this, it must + * always map the same ID to the same hash. + * + * Result is NOT MAPPED to CANDIDATES_TO_KEEP range, i.e. map before using + * it for list access. */ +static hash_t id_hash(uint8_t *id) +{ + uint32_t i, res = 0x19a64e82; + + for (i = 0; i < CLIENT_ID_SIZE; i++) + res = ((res << 1) ^ (res >> 30)) ^ id[i]; + + /* can't have zero as hash, a) marks an unused spot, + * and b) slots for collision are multiplied, for + * the latter reason also remap 1 .. 7 */ + if ((res % CANDIDATES_TO_KEEP) < 8) + res = res + (CANDIDATES_TO_KEEP >> 2); + + return res; +} + +/* up to HASH_COLLIDE_COUNT calls to different spots, + * result IS mapped to CANDIDATES_TO_KEEP range */ +static hash_t hash_collide(hash_t hash) +{ + uint64_t hash64 = hash % CANDIDATES_TO_KEEP; + hash64 = (hash64 * 101) % CANDIDATES_TO_KEEP; + + hash_t retval = hash64; + + /* this should never happen when CANDIDATES_TO_KEEP is prime and hash not a multiple + * (id_hash() checks for a multiple and returns a different hash in that case) + * + * ( 1 .. (prime - 1) is a group over multiplication and every number has its inverse + * in the group, so no multiplication should ever end on zero as long neither + * of the two factors was zero-equivalent ) + * + * BUT: because the usage of the word "never" invokes Murphy's law, catch it */ + if (!retval) + retval = 1; + + return retval; +} + +/* returns the "seen" assoc related to the ipp */ +static IPPTsPng *entry_assoc(Client_entry *cl_entry, IP_Port *ipp) +{ + if (!cl_entry) + return NULL; + + if (ipp->ip.family == AF_INET) + return &cl_entry->client.assoc4; + + if (ipp->ip.family == AF_INET6) + return &cl_entry->client.assoc6; + + return NULL; +} + +/* returns the "heard" assoc related to the ipp */ +static IP_Port *entry_heard_get(Client_entry *entry, IP_Port *ipp) +{ + if (ipp->ip.family == AF_INET) + return &entry->assoc_heard4; + else if (ipp->ip.family == AF_INET6) + return &entry->assoc_heard6; + else + return NULL; +} + +/* store a "heard" entry + * overwrites empty entry, does NOT overwrite non-LAN ip with + * LAN ip + * + * returns 1 if the entry did change */ +static int entry_heard_store(Client_entry *entry, IPPTs *ippts) +{ + if (!entry || !ippts) + return 0; + + if (!ipport_isset(&ippts->ip_port)) + return 0; + + IP_Port *heard, *ipp = &ippts->ip_port; + + if (ipp->ip.family == AF_INET) + heard = &entry->assoc_heard4; + else if (ipp->ip.family == AF_INET6) + heard = &entry->assoc_heard6; + else + return 0; + + if (ipport_equal(ipp, heard)) + return 0; + + if (!ipport_isset(heard)) { + *heard = *ipp; + entry->heard_at = ippts->timestamp; + entry->heard_family = ipp->ip.family; + return 1; + } + + /* don't destroy a good address with a crappy one + * (unless we're very timed out) */ + uint8_t LAN_ipp = LAN_ip(ipp->ip) == 0; + uint8_t LAN_entry = LAN_ip(heard->ip) == 0; + + if (LAN_ipp && !LAN_entry && !is_timeout(entry->heard_at, CANDIDATES_HEARD_TIMEOUT)) + return 0; + + *heard = *ipp; + entry->heard_at = ippts->timestamp; + entry->heard_family = ipp->ip.family; + + return 1; +} + +/* maps Assoc callback signature to id_closest() */ +static int assoc_id_closest(Assoc *assoc, void *callback_data, uint8_t *client_id, uint8_t *client_id1, + uint8_t *client_id2) +{ + return id_closest(client_id, client_id1, client_id2); +} + +static bucket_t id_bucket(uint8_t *id, uint8_t bits) +{ + /* return the first "bits" bits of id */ + bucket_t retval = 0; + + uint8_t pos = 0; + + while (bits > 8) { + retval = (retval << 8) | id[pos++]; + bits -= 8; + } + + return (retval << bits) | (id[pos] >> (8 - bits)); +} + +/*****************************************************************************/ +/* CANDIDATES FUNCTIONS */ +/*****************************************************************************/ + + +static bucket_t candidates_id_bucket(uint8_t *id) +{ + return id_bucket(id, CANDIDATES_BUCKET_BITS); +} + +static uint8_t candidates_search(Assoc *assoc, uint8_t *id, hash_t hash, Client_entry **entryptr) +{ + bucket_t bucket = candidates_id_bucket(id); + candidates_bucket *cnd_bckt = &assoc->candidates[bucket]; + size_t coll, pos = hash % CANDIDATES_TO_KEEP; + + for (coll = 0; coll < HASH_COLLIDE_COUNT; pos = hash_collide(pos) , coll++) { + Client_entry *entry = &cnd_bckt->list[pos]; + + if (entry->hash == hash) + if (id_equal(entry->client.client_id, id)) { + *entryptr = entry; + return 1; + } + } + + *entryptr = NULL; + return 0; +} + +static void candidates_update_assoc(Assoc *assoc, Client_entry *entry, IPPTs *ippts_send, IP_Port *ipp_recv) +{ + if (!assoc || !entry || !ippts_send) + return; + + IPPTsPng *ipptsp = entry_assoc(entry, &ippts_send->ip_port); + + if (!ipptsp) + return; + + /* do NOT do anything related to wanted, that's handled outside, + * just update the assoc (in the most sensible way) + */ + if (ipp_recv) { + ipptsp->ip_port = ippts_send->ip_port; + ipptsp->timestamp = ippts_send->timestamp; + ipptsp->ret_ip_port = *ipp_recv; + ipptsp->ret_timestamp = unix_time(); + + entry->seen_at = unix_time(); + entry->seen_family = ippts_send->ip_port.ip.family; + + return; + } + + entry_heard_store(entry, ippts_send); +} + +static uint8_t candidates_create_internal(Assoc *assoc, hash_t hash, uint8_t *id, uint8_t seen, + bucket_t *bucketptr, size_t *posptr) +{ + if (!assoc || !id || !bucketptr || !posptr) + return 0; + + bucket_t bucket = candidates_id_bucket(id); + candidates_bucket *cnd_bckt = &assoc->candidates[bucket]; + + size_t coll, pos = hash % CANDIDATES_TO_KEEP, check; + size_t pos_check[6]; + + memset(pos_check, 0, sizeof(pos_check)); + + for (coll = 0; coll < HASH_COLLIDE_COUNT; pos = hash_collide(pos) , coll++) { + Client_entry *entry = &cnd_bckt->list[pos]; + + /* unset */ + if (!entry->hash) { + *bucketptr = bucket; + *posptr = pos; + + return 1; + } + + /* 0. bad + * 1. seen bad, heard good + * 2. seen good */ + if (!is_timeout(entry->seen_at, CANDIDATES_SEEN_TIMEOUT)) + check = 2; + else if (!is_timeout(entry->heard_at, CANDIDATES_HEARD_TIMEOUT)) + check = 1; + else + check = 0; + + if (!pos_check[check]) + pos_check[check] = pos + 1; + } + + /* seen can bump heard&bad, heard can bump only bad */ + size_t i, pos_max = seen ? 2 : 1; + + for (i = 0; i < pos_max; i++) + if (pos_check[i]) { + *bucketptr = bucket; + *posptr = pos_check[i] - 1; + + return 1; + } + + return 0; +} + +static void candidates_create_new(Assoc *assoc, hash_t hash, uint8_t *id, + IPPTs *ippts_send, IP_Port *ipp_recv) +{ + if (!assoc || !id || !ippts_send) + return; + + bucket_t bucket; + size_t pos; + + if (!candidates_create_internal(assoc, hash, id, ipp_recv != NULL, &bucket, &pos)) + return; + + candidates_bucket *cnd_bckt = &assoc->candidates[bucket]; + Client_entry *entry = &cnd_bckt->list[pos]; + memset(entry, 0, sizeof(*entry)); + + IPPTsPng *ipptsp = entry_assoc(entry, &ippts_send->ip_port); + + if (!ipptsp) + return; + + entry->hash = hash; + id_copy(entry->client.client_id, id); + + if (ipp_recv && !ipport_isset(ipp_recv)) + ipp_recv = NULL; + + if (ipp_recv) { + entry->seen_at = unix_time(); + entry->seen_family = ippts_send->ip_port.ip.family; + + ipptsp->ip_port = ippts_send->ip_port; + ipptsp->timestamp = ippts_send->timestamp; + ipptsp->ret_ip_port = *ipp_recv; + ipptsp->ret_timestamp = unix_time(); + } else { + IP_Port *heard = entry_heard_get(entry, &ippts_send->ip_port); + + if (heard) { + entry->heard_at = ippts_send->timestamp; + entry->heard_family = ippts_send->ip_port.ip.family; + + *heard = ippts_send->ip_port; + } + } +} + +/*****************************************************************************/ + +static void client_id_self_update(Assoc *assoc) +{ + if (assoc->self_hash || !assoc->self_client_id) + return; + + if (!assoc->self_hash) { + size_t i, sum = 0; + + for (i = 0; i < crypto_box_PUBLICKEYBYTES; i++) + sum |= assoc->self_client_id[i]; + + if (!sum) + return; + + assoc->self_hash = id_hash(assoc->self_client_id); + } + +#ifdef LOGGING + loglog("assoc: id is now set, purging cache of self-references...\n"); +#endif + + /* if we already added some (or loaded some) entries, + * look and remove if we find a match + */ + bucket_t b_id = candidates_id_bucket(assoc->self_client_id); + candidates_bucket *cnd_bckt = &assoc->candidates[b_id]; + size_t i, pos = assoc->self_hash % CANDIDATES_TO_KEEP; + + for (i = 0; i < HASH_COLLIDE_COUNT; pos = hash_collide(pos), i++) { + Client_entry *entry = &cnd_bckt->list[pos]; + + if (entry->hash == assoc->self_hash) + if (id_equal(entry->client.client_id, assoc->self_client_id)) + entry->hash = 0; + } +} + +/*****************************************************************************/ +/* TRIGGER FUNCTIONS */ +/*****************************************************************************/ + +/* Central entry point for new associations: add a new candidate to the cache + * seen should be 0 (zero), if the candidate was announced by someone else, + * seen should be 1 (one), if there is confirmed connectivity (a definite response) + */ +void Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_recv) +{ + if (!assoc || !id || !ippts_send) + return; + + if (!assoc->self_hash) { + client_id_self_update(assoc); + + if (!assoc->self_hash) + return; + } + + if (!ipport_isset(&ippts_send->ip_port)) + return; + + if (ipp_recv && !ipport_isset(ipp_recv)) + ipp_recv = NULL; + + hash_t hash = id_hash(id); + + if (hash == assoc->self_hash) + if (id_equal(id, assoc->self_client_id)) + return; + + /* if it's new: + * callback, if there's desire, add to clients, else to candidates + * + * if it's "old": + * if it's client: refresh + * if it's candidate: + * if !ipp_recv, refresh + * if ipp_recv: callback, if there's desire, move to candidates + */ + Client_entry *cnd_entry; + + if (!candidates_search(assoc, id, hash, &cnd_entry)) + candidates_create_new(assoc, hash, id, ippts_send, ipp_recv); + else + candidates_update_assoc(assoc, cnd_entry, ippts_send, ipp_recv); +} + +/*****************************************************************************/ +/* MAIN USE */ +/*****************************************************************************/ + +uint8_t Assoc_get_close_entries(Assoc *assoc, Assoc_close_entries *state) +{ + if (!assoc || !state || !state->wanted_id || !state->result) + return 0; + + if (!assoc->self_hash) { + client_id_self_update(assoc); + + if (!assoc->self_hash) + return 0; + } + + if (!state->distance_relative_func) + state->distance_relative_func = assoc_id_closest; + + if (!state->distance_absolute_func) + state->distance_absolute_func = id_distance; + + size_t clients_offset = CANDIDATES_BUCKET_COUNT * CANDIDATES_TO_KEEP; + size_t dist_list_len = clients_offset; + uint64_t dist_list[dist_list_len]; + memset(dist_list, ~0, dist_list_len * sizeof(dist_list[0])); + bucket_t b; + size_t i; + + for (b = 0; b < CANDIDATES_BUCKET_COUNT; b++) { + candidates_bucket *cnd_bckt = &assoc->candidates[b]; + + for (i = 0; i < CANDIDATES_TO_KEEP; i++) { + Client_entry *entry = &cnd_bckt->list[i]; + + if (entry->hash) { + uint64_t dist = state->distance_absolute_func(assoc, state->custom_data, state->wanted_id, entry->client.client_id); + uint32_t index = b * CANDIDATES_TO_KEEP + i; + dist_list[index] = (dist << DISTANCE_INDEX_INDEX_BITS) | index; + } + } + } + + qsort(dist_list, dist_list_len, sizeof(dist_list[0]), dist_index_comp); + + /* ok, ok, it's not *perfectly* sorted, because we used an absolute distance + * go over the result and see if we need to "smoothen things out" + * because those should be only very few and short streaks, the worst regularly + * used sorting function aka bubble sort is used */ + uint64_t dist_prev = ~0; + size_t ind_prev = ~0, ind_curr; + size_t len = 1; + + for (ind_curr = 0; ind_curr < dist_list_len; ind_curr++) { + /* sorted increasingly, so an invalid entry marks the end */ + if ((dist_list[ind_curr] & DISTANCE_INDEX_INDEX_MASK) == DISTANCE_INDEX_INDEX_MASK) + break; + + uint64_t dist_curr = dist_list[ind_curr] >> DISTANCE_INDEX_INDEX_BITS; + + if (dist_prev == dist_curr) + len++; + else { + if (len > 1) + dist_index_bubble(assoc, dist_list, ind_prev, ind_curr - 1, state->wanted_id, state->custom_data, + state->distance_relative_func); + + dist_prev = dist_curr; + ind_prev = ind_curr; + len = 1; + } + } + + if (len > 1) + dist_index_bubble(assoc, dist_list, ind_prev, ind_curr - 1, state->wanted_id, state->custom_data, + state->distance_relative_func); + + /* ok, now dist_list is a strictly ascending sorted list of nodes + * a) extract CLOSE_QUOTA_USED clients, not timed out + * b) extract (1 - QUOTA) (better!) clients & candidates, not timed out + * c) save candidates which would be better, if contact can be established */ + size_t client_quota_good = 0, pos = 0; + size_t client_quota_max = state->count_good; + + ssize_t taken_last = - 1; + + for (i = 0; (i < dist_list_len) && (pos < state->count); i++) { + Client_entry *entry = dist_index_entry(assoc, dist_list[i]); + + if (entry && entry->hash) { + if (client_quota_good >= client_quota_max) { + state->result[pos++] = &entry->client; + taken_last = i; + } else if (!is_timeout(entry->seen_at, BAD_NODE_TIMEOUT)) { + state->result[pos++] = &entry->client; + client_quota_good++; + taken_last = i; + } + } + } + + /* if we had not enough valid entries the list might still not be filled. + * + * start again from last taken client, but leave out any requirement + */ + if (pos < state->count) { + for (i = taken_last + 1; (i < dist_list_len) && (pos < state->count); i++) { + Client_entry *entry = dist_index_entry(assoc, dist_list[i]); + + if (entry && entry->hash) + state->result[pos++] = &entry->client; + } + } + + return pos; +} + +/*****************************************************************************/ +/* GLOBAL STRUCTURE FUNCTIONS */ +/*****************************************************************************/ + +/* create */ +Assoc *new_Assoc(DHT *dht) +{ + Assoc *assoc = calloc(1, sizeof(*assoc)); + + if (!assoc) + return NULL; + + /* dht MAY be NULL! (e.g. testing) */ + if (dht) { + assoc->dht = dht; + assoc->self_client_id = dht->c->self_public_key; + } else { + assoc->self_client_id = malloc(CLIENT_ID_SIZE); + assoc->self_client_id[0] = 42; + } + + return assoc; +} + +/* own client_id, assocs for this have to be ignored */ +void Assoc_self_client_id_changed(Assoc *assoc) +{ + if (assoc) { + assoc->self_hash = 0; + client_id_self_update(assoc); + } +} + +/* destroy */ +void kill_Assoc(Assoc *assoc) +{ + /* nothing dynamic left in trim */ + free(assoc); +} diff --git a/toxcore/assoc.h b/toxcore/assoc.h new file mode 100644 index 00000000..e0a899be --- /dev/null +++ b/toxcore/assoc.h @@ -0,0 +1,73 @@ + +#ifndef __ASSOC_H__ +#define __ASSOC_H__ + +/* used by rendezvous */ +#define ASSOC_AVAILABLE + +/* For the legalese parts, see tox.h. */ + +/* + * Module to store currently unused ID <=> IP associations + * for a potential future use + */ + +typedef struct IP_Port IP_Port; +typedef struct Assoc Assoc; + +/*****************************************************************************/ + +/* custom distance handler, if it's not ID-distance based + * return values exactly like id_closest() */ +typedef int (*Assoc_distance_relative_callback)(Assoc *assoc, void *callback_data, uint8_t *client_id, + uint8_t *client_id1, uint8_t *client_id2); + +#define DISTANCE_INDEX_DISTANCE_BITS 44 + +/* absolute distance: can be same for different client_id_check values + * return value should have DISTANCE_INDEX_DISTANCE_BITS valid bits */ +typedef uint64_t (*Assoc_distance_absolute_callback)(Assoc *assoc, void *callback_data, + uint8_t *client_id_ref, uint8_t *client_id_check); + +/*****************************************************************************/ + +/* Central entry point for new associations: add a new candidate to the cache */ +void Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_recv); + +/*****************************************************************************/ + +typedef struct Assoc_close_entries { + uint8_t *wanted_id; /* the target client_id */ + void *custom_data; /* given to distance functions */ + + Assoc_distance_relative_callback distance_relative_func; + Assoc_distance_absolute_callback distance_absolute_func; + + uint8_t count_good; /* that many should be "good" w.r.t. timeout */ + uint8_t count; /* allocated number of close_indices */ + Client_data **result; +} Assoc_close_entries; + +/* find up to close_count nodes to put into close_nodes_used of ID_Nodes + * the distance functions can be NULL, then standard distance functions will be used + * the caller is responsible for allocating close_indices of sufficient size + * + * returns 0 on error + * returns the number of found nodes and the list of indices usable by Assoc_client() + * the caller is assumed to be registered from Assoc_register_callback() + * if they aren't, they should copy the Client_data and call Assoc_client_drop() + */ +uint8_t Assoc_get_close_entries(Assoc *assoc, Assoc_close_entries *close_entries); + +/*****************************************************************************/ + +/* create */ +Assoc *new_Assoc(DHT *dht); + +/* avoid storing own ID/assoc */ +void Assoc_self_client_id_changed(Assoc *assoc); + +/* destroy */ +void kill_Assoc(Assoc *assoc); + +#endif /* !__ASSOC_H__ */ diff --git a/toxcore/group_chats.c b/toxcore/group_chats.c index 5376713c..ed867365 100644 --- a/toxcore/group_chats.c +++ b/toxcore/group_chats.c @@ -27,6 +27,8 @@ #endif #include "group_chats.h" +#include "assoc.h" +#include "LAN_discovery.h" #include "util.h" #define GROUPCHAT_MAXDATA_LENGTH (MAX_DATA_SIZE - (1 + crypto_box_PUBLICKEYBYTES * 2 + crypto_box_NONCEBYTES)) @@ -306,6 +308,16 @@ static int send_getnodes(Group_Chat *chat, IP_Port ip_port, int peernum) chat->group[peernum].last_pinged = unix_time(); chat->group[peernum].pingid = contents.pingid; + chat->group[peernum].ping_via = ip_port; + + + if (chat->assoc) { + IPPTs ippts; + ippts.timestamp = unix_time(); + ippts.ip_port = ip_port; + + Assoc_add_entry(chat->assoc, chat->group[peernum].client_id, &ippts, NULL); + } return send_groupchatpacket(chat, ip_port, chat->group[peernum].client_id, (uint8_t *)&contents, sizeof(contents), CRYPTO_PACKET_GROUP_CHAT_GET_NODES); @@ -373,6 +385,9 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 uint16_t numnodes = (len - sizeof(contents.pingid)) / sizeof(groupchat_nodes); uint32_t i; + IPPTs ippts_send; + ippts_send.timestamp = unix_time(); + for (i = 0; i < numnodes; ++i) { if (peer_okping(chat, contents.nodes[i].client_id) > 0) { int peern = peer_in_chat(chat, contents.nodes[i].client_id); @@ -385,10 +400,26 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 continue; send_getnodes(chat, contents.nodes[i].ip_port, peern); + + if (chat->assoc) { + ippts_send.ip_port = contents.nodes[i].ip_port; + Assoc_add_entry(chat->assoc, contents.nodes[i].client_id, &ippts_send, NULL); + } } } add_closepeer(chat, chat->group[peernum].client_id, source); + + if (chat->assoc) { + ippts_send.ip_port = chat->group[peernum].ping_via; + ippts_send.timestamp = chat->group[peernum].last_pinged; + + IP_Port ipp_recv; + ipp_recv = source; + + Assoc_add_entry(chat->assoc, contents.nodes[i].client_id, &ippts_send, &ipp_recv); + } + return 0; } @@ -583,7 +614,8 @@ void callback_groupmessage(Group_Chat *chat, void (*function)(Group_Chat *chat, chat->group_message_userdata = userdata; } -Group_Chat *new_groupchat(Networking_Core *net) + +Group_Chat *new_groupchat(Networking_Core *net, Assoc *assoc) { unix_time_update(); @@ -593,6 +625,9 @@ Group_Chat *new_groupchat(Networking_Core *net) Group_Chat *chat = calloc(1, sizeof(Group_Chat)); chat->net = net; crypto_box_keypair(chat->self_public_key, chat->self_secret_key); + + chat->assoc = assoc; + return chat; } diff --git a/toxcore/group_chats.h b/toxcore/group_chats.h index 74e2f2d7..90440d5b 100644 --- a/toxcore/group_chats.h +++ b/toxcore/group_chats.h @@ -27,12 +27,15 @@ #include "net_crypto.h" +typedef struct Assoc Assoc; + #define MAX_NICK_BYTES 128 typedef struct { uint8_t client_id[crypto_box_PUBLICKEYBYTES]; uint64_t pingid; uint64_t last_pinged; + IP_Port ping_via; uint64_t last_recv; uint64_t last_recv_msgping; @@ -46,7 +49,6 @@ typedef struct { uint8_t client_id[crypto_box_PUBLICKEYBYTES]; IP_Port ip_port; uint64_t last_recv; - } Group_Close; #define GROUP_CLOSE_CONNECTIONS 6 @@ -69,6 +71,8 @@ typedef struct Group_Chat { uint8_t nick[MAX_NICK_BYTES]; uint16_t nick_len; uint64_t last_sent_nick; + + Assoc *assoc; } Group_Chat; #define GROUP_CHAT_PING 0 @@ -120,7 +124,7 @@ uint32_t group_newpeer(Group_Chat *chat, uint8_t *client_id); * * Returns a NULL pointer if fail. */ -Group_Chat *new_groupchat(Networking_Core *net); +Group_Chat *new_groupchat(Networking_Core *net, Assoc *assoc); /* Kill a group chat diff --git a/toxcore/network.h b/toxcore/network.h index 8b9b8b2f..87cb4794 100644 --- a/toxcore/network.h +++ b/toxcore/network.h @@ -129,7 +129,7 @@ typedef union { uint8_t uint8[8]; } IP4_Port; -typedef struct { +typedef struct IP_Port { IP ip; uint16_t port; } IP_Port; diff --git a/toxcore/ping.c b/toxcore/ping.c index bd754c53..141a3fb4 100644 --- a/toxcore/ping.c +++ b/toxcore/ping.c @@ -27,19 +27,24 @@ #include "config.h" #endif -#include #include -#include "net_crypto.h" #include "DHT.h" +#include "assoc.h" +#include "ping.h" + +#include "network.h" +#include "util.h" #define PING_NUM_MAX 384 -#define PING_TIMEOUT 5 // 5s + +/* 5 seconds */ +#define PING_TIMEOUT 5 /* Ping newly announced nodes to ping per TIME_TOPING seconds*/ #define TIME_TOPING 5 -typedef struct { +typedef struct PING { Net_Crypto *c; pinged_t pings[PING_NUM_MAX]; @@ -50,13 +55,7 @@ typedef struct { uint64_t last_toping; } PING; -#define __PING_C__ - -#include "network.h" -#include "util.h" -#include "ping.h" - -static bool is_ping_timeout(uint64_t time) +static int is_ping_timeout(uint64_t time) { return is_timeout(time, PING_TIMEOUT); } @@ -260,6 +259,14 @@ static int handle_ping_response(void *_dht, IP_Port source, uint8_t *packet, uin /* Associate client_id with the ip the request was sent to */ addto_lists(dht, ping->pings[ping_index - 1].ip_port, packet + 1); + + if (dht->assoc) { + IPPTs ippts; + ippts.ip_port = ping->pings[ping_index - 1].ip_port; + ippts.timestamp = ping->pings[ping_index - 1].timestamp; + Assoc_add_entry(dht->assoc, packet + 1, &ippts, &source); + } + return 0; } diff --git a/toxcore/ping.h b/toxcore/ping.h index 32742401..361fd3ef 100644 --- a/toxcore/ping.h +++ b/toxcore/ping.h @@ -24,11 +24,7 @@ #ifndef __PING_H__ #define __PING_H__ -#include - -#ifndef __PING_C__ typedef struct PING PING; -#endif /* Add nodes to the toping list. * All nodes in this list are pinged every TIME_TOPING seconds From b132c92b3ae3aeee293a4e815336cac76c7cfe69 Mon Sep 17 00:00:00 2001 From: "Coren[m]" Date: Sun, 17 Nov 2013 01:04:49 +0100 Subject: [PATCH 2/2] Assoc's array is now allocated dynamically and per default much smaller (320 entries). id_hash() was not at all working as expected for very small bucket size (when (size / 4) was zero). Simplified to be trivially correct. Also added a used flag on adding an entry, which is set by callers if they have that association in active use. Those get priority over unused entries on collision. Fleshed out test to be at least elementary useful. Each group chat now uses an own, small assoc (80 entries). --- auto_tests/assoc_test.c | 41 ++++- toxcore/DHT.c | 65 +++++--- toxcore/DHT.h | 2 +- toxcore/Messenger.c | 18 ++- toxcore/assoc.c | 335 +++++++++++++++++++++++++++++----------- toxcore/assoc.h | 26 +++- toxcore/group_chats.c | 14 +- toxcore/group_chats.h | 4 +- toxcore/ping.c | 5 +- 9 files changed, 362 insertions(+), 148 deletions(-) diff --git a/auto_tests/assoc_test.c b/auto_tests/assoc_test.c index c0b8d065..79a0e778 100644 --- a/auto_tests/assoc_test.c +++ b/auto_tests/assoc_test.c @@ -14,21 +14,46 @@ #include -START_TEST(test_X) +START_TEST(test_basics) { /* TODO: real test */ - Assoc *assoc = new_Assoc(NULL); - uint8_t id[CLIENT_ID_SIZE]; - IPPTs ippts_send; - IP_Port ipp_recv; + Assoc *assoc = new_Assoc_default(id); + ck_assert_msg(assoc != NULL, "failed to create default assoc"); - Assoc_add_entry(assoc, id, &ippts_send, &ipp_recv); + kill_Assoc(assoc); + assoc = new_Assoc(17, 4, id); /* results in an assoc of 16/3 */ + ck_assert_msg(assoc != NULL, "failed to create customized assoc"); + + IP_Port ipp; + ipp.ip.family = AF_INET; + ipp.ip.ip4.uint8[0] = 1; + ipp.port = htons(12345); + + IPPTs ippts_send; + ippts_send.ip_port = ipp; + ippts_send.timestamp = unix_time(); + IP_Port ipp_recv = ipp; + + uint8_t res = Assoc_add_entry(assoc, id, &ippts_send, &ipp_recv, 0); + ck_assert_msg(res == 0, "stored self as entry: expected %u, got %u", 0, res); + + id[0]++; + + res = Assoc_add_entry(assoc, id, &ippts_send, &ipp_recv, 0); + ck_assert_msg(res == 1, "failed to store entry: expected %u, got %u", 1, res); Assoc_close_entries close_entries; memset(&close_entries, 0, sizeof(close_entries)); + close_entries.count = 4; + close_entries.count_good = 2; + close_entries.wanted_id = id; - /* uint8_t found = */ Assoc_get_close_entries(assoc, &close_entries); + Client_data *entries[close_entries.count]; + close_entries.result = entries; + + uint8_t found = Assoc_get_close_entries(assoc, &close_entries); + ck_assert_msg(found == 1, "get_close_entries(): expected %u, got %u", 1, found); } END_TEST @@ -46,7 +71,7 @@ Suite *Assoc_suite(void) { Suite *s = suite_create("Assoc"); - DEFTESTCASE(X); + DEFTESTCASE(basics); return s; } diff --git a/toxcore/DHT.c b/toxcore/DHT.c index 9730771b..f5d93fc4 100644 --- a/toxcore/DHT.c +++ b/toxcore/DHT.c @@ -425,7 +425,9 @@ static void sort_list(Client_data *list, uint32_t length, uint8_t *comp_client_i list[i] = pairs[i].c2; } -/* Replace the first good node that is further to the comp_client_id than that of the client_id in the list */ +/* Replace the first good node that is further to the comp_client_id than that of the client_id in the list + * + * returns 0 when the item was stored, 1 otherwise */ static int replace_good( Client_data *list, uint32_t length, uint8_t *client_id, @@ -478,10 +480,12 @@ static int replace_good( Client_data *list, /* Attempt to add client with ip_port and client_id to the friends client list * and close_clientlist. + * + * returns 1+ if the item is used in any list, 0 else */ -void addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id) +int addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id) { - uint32_t i; + uint32_t i, used = 0; /* convert IPv4-in-IPv6 to IPv4 */ if ((ip_port.ip.family == AF_INET6) && IN6_IS_ADDR_V4MAPPED(&ip_port.ip.ip6.in6_addr)) { @@ -495,10 +499,13 @@ void addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id) if (!client_or_ip_port_in_list(dht->close_clientlist, LCLIENT_LIST, client_id, ip_port)) { if (replace_bad(dht->close_clientlist, LCLIENT_LIST, client_id, ip_port)) { /* If we can't replace bad nodes we try replacing good ones. */ - replace_good(dht->close_clientlist, LCLIENT_LIST, client_id, ip_port, - dht->c->self_public_key); - } - } + if (!replace_good(dht->close_clientlist, LCLIENT_LIST, client_id, ip_port, + dht->c->self_public_key)) + used++; + } else + used++; + } else + used++; for (i = 0; i < dht->num_friends; ++i) { if (!client_or_ip_port_in_list(dht->friends_list[i].client_list, @@ -507,17 +514,22 @@ void addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id) if (replace_bad(dht->friends_list[i].client_list, MAX_FRIEND_CLIENTS, client_id, ip_port)) { /* If we can't replace bad nodes we try replacing good ones. */ - replace_good(dht->friends_list[i].client_list, MAX_FRIEND_CLIENTS, - client_id, ip_port, dht->friends_list[i].client_id); - } - } + if (!replace_good(dht->friends_list[i].client_list, MAX_FRIEND_CLIENTS, + client_id, ip_port, dht->friends_list[i].client_id)) + used++; + } else + used++; + } else + used++; } + + return used; } /* If client_id is a friend or us, update ret_ip_port * nodeclient_id is the id of the node that sent us this info. */ -static void returnedip_ports(DHT *dht, IP_Port ip_port, uint8_t *client_id, uint8_t *nodeclient_id) +static int returnedip_ports(DHT *dht, IP_Port ip_port, uint8_t *client_id, uint8_t *nodeclient_id) { uint32_t i, j; uint64_t temp_time = unix_time(); @@ -539,10 +551,9 @@ static void returnedip_ports(DHT *dht, IP_Port ip_port, uint8_t *client_id, uint dht->close_clientlist[i].assoc6.ret_timestamp = temp_time; } - return; + return 1; } } - } else { for (i = 0; i < dht->num_friends; ++i) { if (id_equal(client_id, dht->friends_list[i].client_id)) { @@ -556,13 +567,14 @@ static void returnedip_ports(DHT *dht, IP_Port ip_port, uint8_t *client_id, uint dht->friends_list[i].client_list[j].assoc6.ret_timestamp = temp_time; } - return; + return 1; } } } } - } + + return 0; } /* checks if ip/port or ping_id are already in the list to get nodes @@ -841,14 +853,15 @@ static int handle_sendnodes_core(void *object, IP_Port source, uint8_t *packet, return 1; /* store the address the *request* was sent to */ - addto_lists(dht, dht->send_nodes[send_nodes_index - 1].ip_port, packet + 1); + int used = addto_lists(dht, dht->send_nodes[send_nodes_index - 1].ip_port, packet + 1); if (dht->assoc) { IPPTs ippts; ippts.ip_port = dht->send_nodes[send_nodes_index - 1].ip_port; ippts.timestamp = dht->send_nodes[send_nodes_index - 1].timestamp; - Assoc_add_entry(dht->assoc, packet + 1, &ippts, &source); + + Assoc_add_entry(dht->assoc, packet + 1, &ippts, &source, used ? 1 : 0); } *num_nodes_out = num_nodes; @@ -879,13 +892,14 @@ static int handle_sendnodes(void *object, IP_Port source, uint8_t *packet, uint3 ipp.port = nodes4_list[i].ip_port.port; send_ping_request(dht->ping, ipp, nodes4_list[i].client_id); - returnedip_ports(dht, ipp, nodes4_list[i].client_id, packet + 1); + int used = returnedip_ports(dht, ipp, nodes4_list[i].client_id, packet + 1); if (dht->assoc) { IPPTs ippts; ippts.ip_port = ipp; ippts.timestamp = 0; - Assoc_add_entry(dht->assoc, nodes4_list[i].client_id, &ippts, NULL); + + Assoc_add_entry(dht->assoc, nodes4_list[i].client_id, &ippts, NULL, used ? 1 : 0); } } @@ -909,13 +923,14 @@ static int handle_sendnodes_ipv6(void *object, IP_Port source, uint8_t *packet, for (i = 0; i < num_nodes; i++) if (ipport_isset(&nodes_list[i].ip_port)) { send_ping_request(dht->ping, nodes_list[i].ip_port, nodes_list[i].client_id); - returnedip_ports(dht, nodes_list[i].ip_port, nodes_list[i].client_id, packet + 1); + int used = returnedip_ports(dht, nodes_list[i].ip_port, nodes_list[i].client_id, packet + 1); if (dht->assoc) { IPPTs ippts; ippts.ip_port = nodes_list[i].ip_port; ippts.timestamp = 0; - Assoc_add_entry(dht->assoc, nodes_list[i].client_id, &ippts, NULL); + + Assoc_add_entry(dht->assoc, nodes_list[i].client_id, &ippts, NULL, used ? 1 : 0); } } @@ -1135,7 +1150,8 @@ void DHT_bootstrap(DHT *dht, IP_Port ip_port, uint8_t *public_key) IPPTs ippts; ippts.ip_port = ip_port; ippts.timestamp = 0; - Assoc_add_entry(dht->assoc, public_key, &ippts, NULL); + + Assoc_add_entry(dht->assoc, public_key, &ippts, NULL, 0); } getnodes(dht, ip_port, public_key, dht->c->self_public_key); @@ -1609,8 +1625,7 @@ DHT *new_DHT(Net_Crypto *c) init_cryptopackets(dht); cryptopacket_registerhandler(c, CRYPTO_PACKET_NAT_PING, &handle_NATping, dht); - /* dhtassoc is not mandatory for now */ - dht->assoc = new_Assoc(dht); + dht->assoc = new_Assoc_default(dht->c->self_public_key); return dht; } diff --git a/toxcore/DHT.h b/toxcore/DHT.h index dc8b0e38..2f6334ee 100644 --- a/toxcore/DHT.h +++ b/toxcore/DHT.h @@ -273,7 +273,7 @@ int DHT_load_new(DHT *dht, uint8_t *data, uint32_t size); */ int DHT_isconnected(DHT *dht); -void addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id); +int addto_lists(DHT *dht, IP_Port ip_port, uint8_t *client_id); #endif diff --git a/toxcore/Messenger.c b/toxcore/Messenger.c index 49cffc98..904c3ecc 100644 --- a/toxcore/Messenger.c +++ b/toxcore/Messenger.c @@ -786,7 +786,7 @@ int add_groupchat(Messenger *m) for (i = 0; i < m->numchats; ++i) { if (m->chats[i] == NULL) { - Group_Chat *newchat = new_groupchat(m->net, m->dht->assoc); + Group_Chat *newchat = new_groupchat(m->net); if (newchat == NULL) return -1; @@ -805,7 +805,7 @@ int add_groupchat(Messenger *m) if (temp == NULL) return -1; - temp[m->numchats] = new_groupchat(m->net, m->dht->assoc); + temp[m->numchats] = new_groupchat(m->net); if (temp[m->numchats] == NULL) return -1; @@ -1820,6 +1820,18 @@ void do_messenger(Messenger *m) #ifdef LOGGING if (unix_time() > lastdump + DUMPING_CLIENTS_FRIENDS_EVERY_N_SECONDS) { + loglog(" = = = = = = = = \n"); + Assoc_status(m->dht->assoc); + + if (m->numchats > 0) { + size_t c; + + for (c = 0; c < m->numchats; c++) { + loglog("---------------- \n"); + Assoc_status(m->chats[c]->assoc); + } + } + loglog(" = = = = = = = = \n"); lastdump = unix_time(); @@ -2010,7 +2022,7 @@ static int messenger_load_state_callback(void *outer, uint8_t *data, uint32_t le load_keys(m->net_crypto, &data[sizeof(uint32_t)]); if (m->dht->assoc) - Assoc_self_client_id_changed(m->dht->assoc); + Assoc_self_client_id_changed(m->dht->assoc, m->net_crypto->self_public_key); } else return -1; /* critical */ diff --git a/toxcore/assoc.c b/toxcore/assoc.c index 7368819d..fa44d0e3 100644 --- a/toxcore/assoc.c +++ b/toxcore/assoc.c @@ -5,6 +5,7 @@ #include "LAN_discovery.h" +#include #include "util.h" /* @@ -30,25 +31,12 @@ * more "desirable": Seen beats Heard. */ -/* candidates: number of bucket bits/buckets - * if this is raised dramatically, DISTANCE_INDEX_DISTANCE_BITS - * might have to be adjusted */ -#define CANDIDATES_BUCKET_BITS 8 -#define CANDIDATES_BUCKET_COUNT (1 << CANDIDATES_BUCKET_BITS) - -/* candidates: number of candidates to keep PER BUCKET (should be a prime - * for hash reasons, other primes e.g. 251, 509, 1021, 2039, 4093, 8191) - * total number of candidates is therefore less than (BUCKET_COUNT * TO_KEEP), - * given that a hash table is usually filling decently to around 50%, the - * total long-term number of entries will be around 0.5 * 256 * 251 ~= 32k - * - * if this is raised dramatically, DISTANCE_INDEX_DISTANCE_BITS - * might have to be adjusted */ -#define CANDIDATES_TO_KEEP 251 - /* candidates: alternative places for the same hash value */ #define HASH_COLLIDE_COUNT 5 +/* bucket size shall be co-prime to this */ +#define HASH_COLLIDE_PRIME 101 + /* candidates: bump entries: timeout values for seen/heard to be considered of value */ #define CANDIDATES_SEEN_TIMEOUT 1800 #define CANDIDATES_HEARD_TIMEOUT 600 @@ -58,13 +46,9 @@ #define DISTANCE_INDEX_INDEX_MASK ((1 << DISTANCE_INDEX_INDEX_BITS) - 1) /* types to stay consistent */ -#if (CANDIDATES_BUCKET_BITS <= 16) typedef uint16_t bucket_t; -#else -typedef uint32_t bucket_t; -#endif -typedef uint16_t usecnt_t; typedef uint32_t hash_t; +typedef uint16_t usecnt_t; /* abbreviations ... */ typedef Assoc_distance_relative_callback dist_rel_cb; @@ -77,6 +61,7 @@ typedef struct Client_entry { hash_t hash; /* shortcuts & rumors: timers and data */ + uint64_t used_at; uint64_t seen_at; uint64_t heard_at; @@ -90,16 +75,17 @@ typedef struct Client_entry { } Client_entry; typedef struct candidates_bucket { - Client_entry list[CANDIDATES_TO_KEEP]; /* hashed list (with holes) */ + Client_entry *list; /* hashed list */ } candidates_bucket; - typedef struct Assoc { - DHT *dht; /* for ping/getnodes */ - hash_t self_hash; /* hash of self_client_id */ - uint8_t *self_client_id; /* don't store entries for this */ + hash_t self_hash; /* hash of self_client_id */ + uint8_t self_client_id[CLIENT_ID_SIZE]; /* don't store entries for this */ /* association centralization: clients not in use */ - candidates_bucket candidates[CANDIDATES_BUCKET_COUNT]; + size_t candidates_bucket_bits; + size_t candidates_bucket_count; + size_t candidates_bucket_size; + candidates_bucket *candidates; } Assoc; /*****************************************************************************/ @@ -146,13 +132,13 @@ static Client_entry *dist_index_entry(Assoc *assoc, uint64_t dist_ind) if ((dist_ind & DISTANCE_INDEX_INDEX_MASK) == DISTANCE_INDEX_INDEX_MASK) return NULL; - size_t offset = CANDIDATES_BUCKET_COUNT * CANDIDATES_TO_KEEP; + size_t total = assoc->candidates_bucket_count * assoc->candidates_bucket_size; uint32_t index = dist_ind & DISTANCE_INDEX_INDEX_MASK; - if (index < offset) { - bucket_t b_id = index / CANDIDATES_TO_KEEP; + if (index < total) { + bucket_t b_id = index / assoc->candidates_bucket_size; candidates_bucket *cnd_bckt = &assoc->candidates[b_id]; - size_t b_ix = index % CANDIDATES_TO_KEEP; + size_t b_ix = index % assoc->candidates_bucket_size; Client_entry *entry = &cnd_bckt->list[b_ix]; if (entry->hash) @@ -201,28 +187,27 @@ static void dist_index_bubble(Assoc *assoc, uint64_t *dist_list, size_t first, s * * Result is NOT MAPPED to CANDIDATES_TO_KEEP range, i.e. map before using * it for list access. */ -static hash_t id_hash(uint8_t *id) +static hash_t id_hash(Assoc *assoc, uint8_t *id) { uint32_t i, res = 0x19a64e82; for (i = 0; i < CLIENT_ID_SIZE; i++) - res = ((res << 1) ^ (res >> 30)) ^ id[i]; + res = ((res << 1) ^ id[i]) + (res >> 31); /* can't have zero as hash, a) marks an unused spot, - * and b) slots for collision are multiplied, for - * the latter reason also remap 1 .. 7 */ - if ((res % CANDIDATES_TO_KEEP) < 8) - res = res + (CANDIDATES_TO_KEEP >> 2); + * b) collision function is multiplicative */ + if (!(res % assoc->candidates_bucket_size)) + res++; return res; } /* up to HASH_COLLIDE_COUNT calls to different spots, * result IS mapped to CANDIDATES_TO_KEEP range */ -static hash_t hash_collide(hash_t hash) +static hash_t hash_collide(Assoc *assoc, hash_t hash) { - uint64_t hash64 = hash % CANDIDATES_TO_KEEP; - hash64 = (hash64 * 101) % CANDIDATES_TO_KEEP; + uint64_t hash64 = hash % assoc->candidates_bucket_size; + hash64 = (hash64 * HASH_COLLIDE_PRIME) % assoc->candidates_bucket_size; hash_t retval = hash64; @@ -234,8 +219,14 @@ static hash_t hash_collide(hash_t hash) * of the two factors was zero-equivalent ) * * BUT: because the usage of the word "never" invokes Murphy's law, catch it */ - if (!retval) + if (!retval) { +#ifdef DEBUG + fprintf(stderr, "assoc::hash_collide: hash %u, bucket size %u => %u!", hash, (uint)assoc->candidates_bucket_size, + retval); + assert(retval != 0); +#endif retval = 1; + } return retval; } @@ -340,18 +331,18 @@ static bucket_t id_bucket(uint8_t *id, uint8_t bits) /*****************************************************************************/ -static bucket_t candidates_id_bucket(uint8_t *id) +static bucket_t candidates_id_bucket(Assoc *assoc, uint8_t *id) { - return id_bucket(id, CANDIDATES_BUCKET_BITS); + return id_bucket(id, assoc->candidates_bucket_bits); } static uint8_t candidates_search(Assoc *assoc, uint8_t *id, hash_t hash, Client_entry **entryptr) { - bucket_t bucket = candidates_id_bucket(id); + bucket_t bucket = candidates_id_bucket(assoc, id); candidates_bucket *cnd_bckt = &assoc->candidates[bucket]; - size_t coll, pos = hash % CANDIDATES_TO_KEEP; + size_t coll, pos = hash % assoc->candidates_bucket_size; - for (coll = 0; coll < HASH_COLLIDE_COUNT; pos = hash_collide(pos) , coll++) { + for (coll = 0; coll < HASH_COLLIDE_COUNT; pos = hash_collide(assoc, pos) , coll++) { Client_entry *entry = &cnd_bckt->list[pos]; if (entry->hash == hash) @@ -365,7 +356,8 @@ static uint8_t candidates_search(Assoc *assoc, uint8_t *id, hash_t hash, Client_ return 0; } -static void candidates_update_assoc(Assoc *assoc, Client_entry *entry, IPPTs *ippts_send, IP_Port *ipp_recv) +static void candidates_update_assoc(Assoc *assoc, Client_entry *entry, uint8_t used, IPPTs *ippts_send, + IP_Port *ipp_recv) { if (!assoc || !entry || !ippts_send) return; @@ -375,6 +367,9 @@ static void candidates_update_assoc(Assoc *assoc, Client_entry *entry, IPPTs *ip if (!ipptsp) return; + if (used) + entry->used_at = unix_time(); + /* do NOT do anything related to wanted, that's handled outside, * just update the assoc (in the most sensible way) */ @@ -394,20 +389,20 @@ static void candidates_update_assoc(Assoc *assoc, Client_entry *entry, IPPTs *ip } static uint8_t candidates_create_internal(Assoc *assoc, hash_t hash, uint8_t *id, uint8_t seen, - bucket_t *bucketptr, size_t *posptr) + uint8_t used, bucket_t *bucketptr, size_t *posptr) { if (!assoc || !id || !bucketptr || !posptr) return 0; - bucket_t bucket = candidates_id_bucket(id); + bucket_t bucket = candidates_id_bucket(assoc, id); candidates_bucket *cnd_bckt = &assoc->candidates[bucket]; - size_t coll, pos = hash % CANDIDATES_TO_KEEP, check; + size_t coll, pos = hash % assoc->candidates_bucket_size, check; size_t pos_check[6]; memset(pos_check, 0, sizeof(pos_check)); - for (coll = 0; coll < HASH_COLLIDE_COUNT; pos = hash_collide(pos) , coll++) { + for (coll = 0; coll < HASH_COLLIDE_COUNT; pos = hash_collide(assoc, pos) , coll++) { Client_entry *entry = &cnd_bckt->list[pos]; /* unset */ @@ -420,7 +415,11 @@ static uint8_t candidates_create_internal(Assoc *assoc, hash_t hash, uint8_t *id /* 0. bad * 1. seen bad, heard good - * 2. seen good */ + * 2. seen good + * 3. used */ + if (!is_timeout(entry->used_at, BAD_NODE_TIMEOUT)) + check = 3; + if (!is_timeout(entry->seen_at, CANDIDATES_SEEN_TIMEOUT)) check = 2; else if (!is_timeout(entry->heard_at, CANDIDATES_HEARD_TIMEOUT)) @@ -432,8 +431,8 @@ static uint8_t candidates_create_internal(Assoc *assoc, hash_t hash, uint8_t *id pos_check[check] = pos + 1; } - /* seen can bump heard&bad, heard can bump only bad */ - size_t i, pos_max = seen ? 2 : 1; + /* used > seen > heard > bad */ + size_t i, pos_max = used ? 3 : (seen ? 2 : 1); for (i = 0; i < pos_max; i++) if (pos_check[i]) { @@ -446,30 +445,32 @@ static uint8_t candidates_create_internal(Assoc *assoc, hash_t hash, uint8_t *id return 0; } -static void candidates_create_new(Assoc *assoc, hash_t hash, uint8_t *id, - IPPTs *ippts_send, IP_Port *ipp_recv) +static uint8_t candidates_create_new(Assoc *assoc, hash_t hash, uint8_t *id, uint8_t used, + IPPTs *ippts_send, IP_Port *ipp_recv) { if (!assoc || !id || !ippts_send) - return; + return 0; bucket_t bucket; size_t pos; - if (!candidates_create_internal(assoc, hash, id, ipp_recv != NULL, &bucket, &pos)) - return; + if (!candidates_create_internal(assoc, hash, id, ipp_recv != NULL, used, &bucket, &pos)) + return 0; candidates_bucket *cnd_bckt = &assoc->candidates[bucket]; Client_entry *entry = &cnd_bckt->list[pos]; memset(entry, 0, sizeof(*entry)); - IPPTsPng *ipptsp = entry_assoc(entry, &ippts_send->ip_port); if (!ipptsp) - return; + return 0; entry->hash = hash; id_copy(entry->client.client_id, id); + if (used) + entry->used_at = unix_time(); + if (ipp_recv && !ipport_isset(ipp_recv)) ipp_recv = NULL; @@ -491,6 +492,8 @@ static void candidates_create_new(Assoc *assoc, hash_t hash, uint8_t *id, *heard = ippts_send->ip_port; } } + + return 1; } /*****************************************************************************/ @@ -509,7 +512,7 @@ static void client_id_self_update(Assoc *assoc) if (!sum) return; - assoc->self_hash = id_hash(assoc->self_client_id); + assoc->self_hash = id_hash(assoc, assoc->self_client_id); } #ifdef LOGGING @@ -519,11 +522,11 @@ static void client_id_self_update(Assoc *assoc) /* if we already added some (or loaded some) entries, * look and remove if we find a match */ - bucket_t b_id = candidates_id_bucket(assoc->self_client_id); + bucket_t b_id = candidates_id_bucket(assoc, assoc->self_client_id); candidates_bucket *cnd_bckt = &assoc->candidates[b_id]; - size_t i, pos = assoc->self_hash % CANDIDATES_TO_KEEP; + size_t i, pos = assoc->self_hash % assoc->candidates_bucket_size; - for (i = 0; i < HASH_COLLIDE_COUNT; pos = hash_collide(pos), i++) { + for (i = 0; i < HASH_COLLIDE_COUNT; pos = hash_collide(assoc, pos), i++) { Client_entry *entry = &cnd_bckt->list[pos]; if (entry->hash == assoc->self_hash) @@ -540,29 +543,29 @@ static void client_id_self_update(Assoc *assoc) * seen should be 0 (zero), if the candidate was announced by someone else, * seen should be 1 (one), if there is confirmed connectivity (a definite response) */ -void Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_recv) +uint8_t Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_recv, uint8_t used) { if (!assoc || !id || !ippts_send) - return; + return 0; if (!assoc->self_hash) { client_id_self_update(assoc); if (!assoc->self_hash) - return; + return 0; } if (!ipport_isset(&ippts_send->ip_port)) - return; + return 0; if (ipp_recv && !ipport_isset(ipp_recv)) ipp_recv = NULL; - hash_t hash = id_hash(id); + hash_t hash = id_hash(assoc, id); if (hash == assoc->self_hash) if (id_equal(id, assoc->self_client_id)) - return; + return 0; /* if it's new: * callback, if there's desire, add to clients, else to candidates @@ -575,10 +578,15 @@ void Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_ */ Client_entry *cnd_entry; - if (!candidates_search(assoc, id, hash, &cnd_entry)) - candidates_create_new(assoc, hash, id, ippts_send, ipp_recv); - else - candidates_update_assoc(assoc, cnd_entry, ippts_send, ipp_recv); + if (!candidates_search(assoc, id, hash, &cnd_entry)) { + if (candidates_create_new(assoc, hash, id, used, ippts_send, ipp_recv)) + return 1; + else + return 0; + } else { + candidates_update_assoc(assoc, cnd_entry, used, ippts_send, ipp_recv); + return 2; + } } /*****************************************************************************/ @@ -603,22 +611,21 @@ uint8_t Assoc_get_close_entries(Assoc *assoc, Assoc_close_entries *state) if (!state->distance_absolute_func) state->distance_absolute_func = id_distance; - size_t clients_offset = CANDIDATES_BUCKET_COUNT * CANDIDATES_TO_KEEP; - size_t dist_list_len = clients_offset; + size_t dist_list_len = assoc->candidates_bucket_count * assoc->candidates_bucket_size; uint64_t dist_list[dist_list_len]; memset(dist_list, ~0, dist_list_len * sizeof(dist_list[0])); bucket_t b; size_t i; - for (b = 0; b < CANDIDATES_BUCKET_COUNT; b++) { + for (b = 0; b < assoc->candidates_bucket_count; b++) { candidates_bucket *cnd_bckt = &assoc->candidates[b]; - for (i = 0; i < CANDIDATES_TO_KEEP; i++) { + for (i = 0; i < assoc->candidates_bucket_size; i++) { Client_entry *entry = &cnd_bckt->list[i]; if (entry->hash) { uint64_t dist = state->distance_absolute_func(assoc, state->custom_data, state->wanted_id, entry->client.client_id); - uint32_t index = b * CANDIDATES_TO_KEEP + i; + uint32_t index = b * assoc->candidates_bucket_size + i; dist_list[index] = (dist << DISTANCE_INDEX_INDEX_BITS) | index; } } @@ -702,31 +709,115 @@ uint8_t Assoc_get_close_entries(Assoc *assoc, Assoc_close_entries *state) /* GLOBAL STRUCTURE FUNCTIONS */ /*****************************************************************************/ -/* create */ -Assoc *new_Assoc(DHT *dht) +static uint8_t odd_min9_is_prime(size_t value) { + size_t i = 3; + + while (i * i <= value) { + if (!(value % i)) + return 0; + + i += 2; + } + + return 1; +} + +static size_t prime_upto_min9(size_t limit) +{ + /* primes besides 2 are odd */ + if (!(limit % 2)) + limit--; + + while (!odd_min9_is_prime(limit)) + limit -= 2; + + return limit; +} + +/* create */ +Assoc *new_Assoc(size_t bits, size_t entries, uint8_t *public_id) +{ + if (!public_id) + return NULL; + Assoc *assoc = calloc(1, sizeof(*assoc)); if (!assoc) return NULL; - /* dht MAY be NULL! (e.g. testing) */ - if (dht) { - assoc->dht = dht; - assoc->self_client_id = dht->c->self_public_key; - } else { - assoc->self_client_id = malloc(CLIENT_ID_SIZE); - assoc->self_client_id[0] = 42; + /* + * bits must be in [ 2 .. 15 ] + * entries must be a prime + */ + if (bits < 2) + bits = 2; + else if (bits > 15) + bits = 15; + + assoc->candidates_bucket_bits = bits; + assoc->candidates_bucket_count = 1U << bits; + + if (entries <= 8) { + if (entries <= 4) + entries = 3; + else if (!(entries % 2)) /* 6, 8 => 5, 7 */ + entries--; + } else if (entries > ((1 << 17) - 1)) /* 130k+ */ + entries = (1 << 17) - 1; + else { + /* 9+: test and find a prime less or equal */ + size_t entries_test = prime_upto_min9(entries); + + if (entries_test == HASH_COLLIDE_PRIME) /* disallowed */ + entries_test = prime_upto_min9(entries_test - 1); + + if (entries_test != entries) { +#ifdef LOGGING + sprintf(logbuffer, "new_Assoc(): trimmed %i to %i.\n", (int)entries, (int)entries_test); + loglog(logbuffer); +#endif + entries = (size_t)entries_test; + } } + assoc->candidates_bucket_size = entries; + + /* allocation: preferably few blobs */ + size_t bckt, cix; + Client_entry *clients = malloc(sizeof(*clients) * assoc->candidates_bucket_count * assoc->candidates_bucket_size); + candidates_bucket *lists = malloc(sizeof(*lists) * assoc->candidates_bucket_count); + + for (bckt = 0; bckt < assoc->candidates_bucket_count; bckt++) { + candidates_bucket *list = &lists[bckt]; + + list->list = &clients[bckt * assoc->candidates_bucket_size]; + + for (cix = 0; cix < assoc->candidates_bucket_size; cix++) + list->list[cix].hash = 0; + } + + assoc->candidates = lists; + + id_copy(assoc->self_client_id, public_id); + client_id_self_update(assoc); + return assoc; } -/* own client_id, assocs for this have to be ignored */ -void Assoc_self_client_id_changed(Assoc *assoc) +Assoc *new_Assoc_default(uint8_t *public_id) { - if (assoc) { + /* original 8, 251 averages to ~32k entries... probably the whole DHT :D + * 320 entries is fine, hopefully */ + return new_Assoc(6, 5, public_id); +} + +/* own client_id, assocs for this have to be ignored */ +void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *id) +{ + if (assoc && id) { assoc->self_hash = 0; + id_copy(assoc->self_client_id, id); client_id_self_update(assoc); } } @@ -734,6 +825,62 @@ void Assoc_self_client_id_changed(Assoc *assoc) /* destroy */ void kill_Assoc(Assoc *assoc) { - /* nothing dynamic left in trim */ - free(assoc); + if (assoc) { + free(assoc->candidates->list); + free(assoc->candidates); + free(assoc); + } } + +#ifdef LOGGING + +static char buffer[CLIENT_ID_SIZE * 2 + 1]; +static char *idpart2str(uint8_t *id, size_t len) +{ + if (len > CLIENT_ID_SIZE) + len = CLIENT_ID_SIZE; + + size_t i; + + for (i = 0; i < len; i++) + sprintf(buffer + i * 2, "%02hhx", id[i]); + + buffer[len * 2] = 0; + return buffer; +} + +void Assoc_status(Assoc *assoc) +{ + if (!assoc) { + loglog("Assoc status: no assoc\n"); + return; + } + + size_t bid, cid, total = 0; + + for (bid = 0; bid < assoc->candidates_bucket_count; bid++) { + candidates_bucket *bucket = &assoc->candidates[bid]; + + for (cid = 0; cid < assoc->candidates_bucket_size; cid++) { + Client_entry *entry = &bucket->list[cid]; + + if (entry->hash) { + sprintf(logbuffer, "[%3i:%3i] %x => [%s...] %i, %i, %i\n", + (int)bid, (int)cid, entry->hash, idpart2str(entry->client.client_id, 8), + entry->used_at ? (int)(unix_time() - entry->used_at) : 0, + entry->seen_at ? (int)(unix_time() - entry->seen_at) : 0, + entry->heard_at ? (int)(unix_time() - entry->heard_at) : 0); + loglog(logbuffer); + total++; + } + } + } + + if (total) { + sprintf(logbuffer, "Total: %i entries, table usage %i%%.\n", (int)total, + (int)(total * 100 / (assoc->candidates_bucket_count * assoc->candidates_bucket_size))); + loglog(logbuffer); + } +} + +#endif diff --git a/toxcore/assoc.h b/toxcore/assoc.h index e0a899be..ee9e5e21 100644 --- a/toxcore/assoc.h +++ b/toxcore/assoc.h @@ -31,8 +31,9 @@ typedef uint64_t (*Assoc_distance_absolute_callback)(Assoc *assoc, void *callbac /*****************************************************************************/ -/* Central entry point for new associations: add a new candidate to the cache */ -void Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_recv); +/* Central entry point for new associations: add a new candidate to the cache + * returns 1 if entry is stored, 2 if existing entry was updated, 0 else */ +uint8_t Assoc_add_entry(Assoc *assoc, uint8_t *id, IPPTs *ippts_send, IP_Port *ipp_recv, uint8_t used); /*****************************************************************************/ @@ -61,13 +62,26 @@ uint8_t Assoc_get_close_entries(Assoc *assoc, Assoc_close_entries *close_entries /*****************************************************************************/ -/* create */ -Assoc *new_Assoc(DHT *dht); +/* create: default sizes (6, 5 => 320 entries) */ +Assoc *new_Assoc_default(uint8_t *public_id); -/* avoid storing own ID/assoc */ -void Assoc_self_client_id_changed(Assoc *assoc); +/* create: customized sizes + * total is (2^bits) * entries + * bits should be between 2 and 15 (else it's trimmed) + * entries will be reduced to the closest prime smaller or equal + * + * preferably bits should be large and entries small to ensure spread + * in the search space (e. g. 5, 5 is preferable to 2, 41) */ +Assoc *new_Assoc(size_t bits, size_t entries, uint8_t *public_id); + +/* public_id changed (loaded), update which entry isn't stored */ +void Assoc_self_client_id_changed(Assoc *assoc, uint8_t *public_id); /* destroy */ void kill_Assoc(Assoc *assoc); +#ifdef LOGGING +void Assoc_status(Assoc *assoc); +#endif + #endif /* !__ASSOC_H__ */ diff --git a/toxcore/group_chats.c b/toxcore/group_chats.c index ed867365..3fa90c42 100644 --- a/toxcore/group_chats.c +++ b/toxcore/group_chats.c @@ -310,13 +310,12 @@ static int send_getnodes(Group_Chat *chat, IP_Port ip_port, int peernum) chat->group[peernum].pingid = contents.pingid; chat->group[peernum].ping_via = ip_port; - if (chat->assoc) { IPPTs ippts; ippts.timestamp = unix_time(); ippts.ip_port = ip_port; - Assoc_add_entry(chat->assoc, chat->group[peernum].client_id, &ippts, NULL); + Assoc_add_entry(chat->assoc, chat->group[peernum].client_id, &ippts, NULL, 1); } return send_groupchatpacket(chat, ip_port, chat->group[peernum].client_id, (uint8_t *)&contents, sizeof(contents), @@ -403,12 +402,12 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 if (chat->assoc) { ippts_send.ip_port = contents.nodes[i].ip_port; - Assoc_add_entry(chat->assoc, contents.nodes[i].client_id, &ippts_send, NULL); + Assoc_add_entry(chat->assoc, contents.nodes[i].client_id, &ippts_send, NULL, 0); } } } - add_closepeer(chat, chat->group[peernum].client_id, source); + int ok = add_closepeer(chat, chat->group[peernum].client_id, source); if (chat->assoc) { ippts_send.ip_port = chat->group[peernum].ping_via; @@ -417,7 +416,7 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 IP_Port ipp_recv; ipp_recv = source; - Assoc_add_entry(chat->assoc, contents.nodes[i].client_id, &ippts_send, &ipp_recv); + Assoc_add_entry(chat->assoc, contents.nodes[i].client_id, &ippts_send, &ipp_recv, ok == 0 ? 1 : 0); } return 0; @@ -615,7 +614,7 @@ void callback_groupmessage(Group_Chat *chat, void (*function)(Group_Chat *chat, } -Group_Chat *new_groupchat(Networking_Core *net, Assoc *assoc) +Group_Chat *new_groupchat(Networking_Core *net) { unix_time_update(); @@ -626,7 +625,8 @@ Group_Chat *new_groupchat(Networking_Core *net, Assoc *assoc) chat->net = net; crypto_box_keypair(chat->self_public_key, chat->self_secret_key); - chat->assoc = assoc; + /* (2^4) * 5 = 80 entries seems to be a moderate size */ + chat->assoc = new_Assoc(4, 5, chat->self_public_key); return chat; } diff --git a/toxcore/group_chats.h b/toxcore/group_chats.h index 90440d5b..16e4e722 100644 --- a/toxcore/group_chats.h +++ b/toxcore/group_chats.h @@ -106,7 +106,7 @@ uint32_t group_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length); /* * Set our nick for this group. - * + * * returns -1 on failure, 0 on success. */ int set_nick(Group_Chat *chat, uint8_t *nick, uint16_t nick_len); @@ -124,7 +124,7 @@ uint32_t group_newpeer(Group_Chat *chat, uint8_t *client_id); * * Returns a NULL pointer if fail. */ -Group_Chat *new_groupchat(Networking_Core *net, Assoc *assoc); +Group_Chat *new_groupchat(Networking_Core *net); /* Kill a group chat diff --git a/toxcore/ping.c b/toxcore/ping.c index 141a3fb4..9228649e 100644 --- a/toxcore/ping.c +++ b/toxcore/ping.c @@ -258,13 +258,14 @@ static int handle_ping_response(void *_dht, IP_Port source, uint8_t *packet, uin return 1; /* Associate client_id with the ip the request was sent to */ - addto_lists(dht, ping->pings[ping_index - 1].ip_port, packet + 1); + int used = addto_lists(dht, ping->pings[ping_index - 1].ip_port, packet + 1); if (dht->assoc) { IPPTs ippts; ippts.ip_port = ping->pings[ping_index - 1].ip_port; ippts.timestamp = ping->pings[ping_index - 1].timestamp; - Assoc_add_entry(dht->assoc, packet + 1, &ippts, &source); + + Assoc_add_entry(dht->assoc, packet + 1, &ippts, &source, used > 0 ? 1 : 0); } return 0;