diff --git a/testing/Lossless_UDP_testclient.c b/testing/Lossless_UDP_testclient.c index 3c52c6d6..52c48a97 100644 --- a/testing/Lossless_UDP_testclient.c +++ b/testing/Lossless_UDP_testclient.c @@ -164,7 +164,7 @@ int main(int argc, char *argv[]) exit(0); } - uint8_t buffer[512]; + uint8_t buffer[MAX_DATA_SIZE]; int read; FILE *file = fopen(argv[argvoffset + 3], "rb"); @@ -216,10 +216,10 @@ int main(int argc, char *argv[]) } timer = current_time(); - + unsigned long long bytes_sent = 0; /*read first part of file */ - read = fread(buffer, 1, 512, file); + read = fread(buffer, 1, MAX_DATA_SIZE, file); while (1) { /* printconnection(connection); */ @@ -228,26 +228,32 @@ int main(int argc, char *argv[]) if (is_connected(ludp, connection) == 3) { - if (write_packet(ludp, connection, buffer, read)) { + while (write_packet(ludp, connection, buffer, read)) { + bytes_sent += read; /* printf("Wrote data.\n"); */ - read = fread(buffer, 1, 512, file); + read = fread(buffer, 1, MAX_DATA_SIZE, file); } /* printf("%u\n", sendqueue(connection)); */ if (sendqueue(ludp, connection) == 0) { if (read == 0) { - printf("Sent file successfully in: %llu us\n", (unsigned long long)(current_time() - timer)); + unsigned long long us = (unsigned long long)(current_time() - timer); + printf("Sent file successfully in: %llu us = %llu seconds. Average speed: %llu KB/s\n", us, us / 1000000UL, + bytes_sent / (us / 1024UL)); + //printf("Total bytes sent: %llu B, Total data sent: %llu B, overhead: %llu B\n", total_bytes_sent, bytes_sent, total_bytes_sent-bytes_sent); break; } } } else { - printf("Connecting Lost after: %llu us\n", (unsigned long long)(current_time() - timer)); + printf("%u Client Connecting Lost after: %llu us\n", is_connected(ludp, connection), + (unsigned long long)(current_time() - timer)); return 0; } - /* c_sleep(1); */ } + c_sleep(25); + return 0; } diff --git a/testing/Lossless_UDP_testserver.c b/testing/Lossless_UDP_testserver.c index 9d061c0c..8deace82 100644 --- a/testing/Lossless_UDP_testserver.c +++ b/testing/Lossless_UDP_testserver.c @@ -160,7 +160,7 @@ int main(int argc, char *argv[]) exit(0); } - uint8_t buffer[512]; + uint8_t buffer[MAX_DATA_SIZE]; int read; FILE *file = fopen(argv[argvoffset + 1], "wb"); @@ -204,26 +204,32 @@ int main(int argc, char *argv[]) while (1) { //printconnection(0); networking_poll(ludp->net); - do_lossless_udp(ludp); if (is_connected(ludp, connection) >= 2) { - kill_connection_in(ludp, connection, 3000000); - read = read_packet(ludp, connection, buffer); + confirm_connection(ludp, connection); - if (read != 0) { - // printf("Received data.\n"); - if (!fwrite(buffer, read, 1, file)) - printf("file write error\n"); + while (1) { + read = read_packet(ludp, connection, buffer); + + if (read != 0) { + // printf("Received data.\n"); + if (!fwrite(buffer, read, 1, file)) + printf("file write error\n"); + } else { + break; + } } } + do_lossless_udp(ludp); + if (is_connected(ludp, connection) == 4) { - printf("Connecting Lost after: %llu us\n", (unsigned long long)(current_time() - timer)); + printf("Server Connecting Lost after: %llu us\n", (unsigned long long)(current_time() - timer)); fclose(file); return 1; } - c_sleep(1); + c_sleep(25); } return 0; diff --git a/toxcore/Lossless_UDP.c b/toxcore/Lossless_UDP.c index 81c2a2a5..6b2c83a5 100644 --- a/toxcore/Lossless_UDP.c +++ b/toxcore/Lossless_UDP.c @@ -52,6 +52,43 @@ int getconnection_id(Lossless_UDP *ludp, IP_Port ip_port) return -1; } +/* Resize a queue + * return length of queue on success. + * return ~0 on failure. + */ +uint32_t resize_queue(Data **buffer, uint32_t length, uint32_t new_length, uint32_t min_packetnum, + uint32_t max_packetnum) +{ + if (MAX_QUEUE_NUM < new_length) + new_length = MAX_QUEUE_NUM; + + if (max_packetnum - min_packetnum > new_length) + return ~0; + + if (length == new_length) + return new_length; + + Data *temp = calloc(1, sizeof(Data) * new_length); + + if (temp == NULL) + return ~0; + + if (*buffer == NULL) { + *buffer = temp; + return new_length; + } + + uint32_t i; + + for (i = min_packetnum; i != max_packetnum; ++i) + memcpy(temp + (i % new_length), *buffer + (i % length), sizeof(Data)); + + free(*buffer); + *buffer = temp; + return new_length; +} + + /* * Generate a handshake_id which depends on the ip_port. @@ -170,7 +207,8 @@ int new_connection(Lossless_UDP *ludp, IP_Port ip_port) memset(connection, 0, sizeof(Connection)); uint32_t handshake_id1 = handshake_id(ludp, ip_port); - uint64_t timeout = CONNEXION_TIMEOUT + rand() % CONNEXION_TIMEOUT; + /* add randomness to timeout to prevent connections getting stuck in a loop. */ + uint8_t timeout = CONNEXION_TIMEOUT + rand() % CONNEXION_TIMEOUT; *connection = (Connection) { .ip_port = ip_port, @@ -186,9 +224,18 @@ int new_connection(Lossless_UDP *ludp, IP_Port ip_port) .last_sent = current_time(), .killat = ~0, .send_counter = 0, - /* add randomness to timeout to prevent connections getting stuck in a loop. */ - .timeout = timeout + .timeout = timeout, + .confirmed = 1 }; + connection->sendbuffer_length = resize_queue(&connection->sendbuffer, 0, DEFAULT_QUEUE_NUM, 0, 0); + connection->recvbuffer_length = resize_queue(&connection->recvbuffer, 0, DEFAULT_QUEUE_NUM, 0, 0); + + if (connection->sendbuffer_length == (uint32_t)~0 || connection->recvbuffer_length == (uint32_t)~0) { + free(connection->sendbuffer); + free(connection->recvbuffer); + memset(connection, 0, sizeof(Connection)); + return -1; + } return connection_id; } @@ -221,8 +268,8 @@ static int new_inconnection(Lossless_UDP *ludp, IP_Port ip_port) Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); memset(connection, 0, sizeof(Connection)); - - uint64_t timeout = CONNEXION_TIMEOUT + rand() % CONNEXION_TIMEOUT; + /* Add randomness to timeout to prevent connections getting stuck in a loop. */ + uint8_t timeout = CONNEXION_TIMEOUT + rand() % CONNEXION_TIMEOUT; *connection = (Connection) { .ip_port = ip_port, @@ -234,12 +281,21 @@ static int new_inconnection(Lossless_UDP *ludp, IP_Port ip_port) .last_sent = current_time(), .send_counter = 127, - /* Add randomness to timeout to prevent connections getting stuck in a loop. */ .timeout = timeout, /* If this connection isn't handled within the timeout kill it. */ - .killat = current_time() + 1000000UL * timeout + .killat = current_time() + 1000000UL * timeout, + .confirmed = 0 }; + connection->sendbuffer_length = resize_queue(&connection->sendbuffer, 0, DEFAULT_QUEUE_NUM, 0, 0); + connection->recvbuffer_length = resize_queue(&connection->recvbuffer, 0, DEFAULT_QUEUE_NUM, 0, 0); + + if (connection->sendbuffer_length == (uint32_t)~0 || connection->recvbuffer_length == (uint32_t)~0) { + free(connection->sendbuffer); + free(connection->recvbuffer); + memset(connection, 0, sizeof(Connection)); + return -1; + } return connection_id; } @@ -286,6 +342,8 @@ int kill_connection(Lossless_UDP *ludp, int connection_id) if (connection->status > 0) { connection->status = 0; change_handshake(ludp, connection->ip_port); + free(connection->sendbuffer); + free(connection->recvbuffer); memset(connection, 0, sizeof(Connection)); free_connections(ludp); return 0; @@ -332,6 +390,27 @@ int is_connected(Lossless_UDP *ludp, int connection_id) return 0; } +/* Confirm an incoming connection. + * Also disables the auto kill timeout on incomming connections. + * + * return 0 on success + * return -1 on failure. + */ +int confirm_connection(Lossless_UDP *ludp, int connection_id) +{ + if ((unsigned int)connection_id >= ludp->connections.len) + return -1; + + Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); + + if (connection->status == 0) + return -1; + + connection->killat = ~0; + connection->confirmed = 1; + return 0; +} + /* return the ip_port of the corresponding connection. */ IP_Port connection_ip(Lossless_UDP *ludp, int connection_id) { @@ -383,7 +462,7 @@ char id_packet(Lossless_UDP *ludp, int connection_id) Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); if (connection->status != 0) - return connection->recvbuffer[connection->successful_read % MAX_QUEUE_NUM].data[0]; + return connection->recvbuffer[connection->successful_read % connection->recvbuffer_length].data[0]; return -1; } @@ -401,7 +480,7 @@ int read_packet(Lossless_UDP *ludp, int connection_id, uint8_t *data) if (connection->status == 0) return 0; - uint16_t index = connection->successful_read % MAX_QUEUE_NUM; + uint16_t index = connection->successful_read % connection->recvbuffer_length; uint16_t size = connection->recvbuffer[index].size; memcpy(data, connection->recvbuffer[index].data, size); ++connection->successful_read; @@ -418,15 +497,27 @@ int write_packet(Lossless_UDP *ludp, int connection_id, uint8_t *data, uint32_t if ((unsigned int)connection_id >= ludp->connections.len) return 0; - if (length > MAX_DATA_SIZE || length == 0 || sendqueue(ludp, connection_id) >= BUFFER_PACKET_NUM) + Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); + + if (length > MAX_DATA_SIZE || length == 0 || sendqueue(ludp, connection_id) >= MAX_QUEUE_NUM) return 0; - Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); + if (sendqueue(ludp, connection_id) >= connection->sendbuffer_length) { + uint32_t newlen = connection->sendbuffer_length = resize_queue(&connection->sendbuffer, connection->sendbuffer_length, + connection->sendbuffer_length * 2, connection->successful_sent, connection->sendbuff_packetnum); + + if (newlen == (uint32_t)~0) + return 0; + + connection->sendbuffer_length = newlen; + return write_packet(ludp, connection_id, data, length); + } + if (connection->status == 0) return 0; - uint32_t index = connection->sendbuff_packetnum % MAX_QUEUE_NUM; + uint32_t index = connection->sendbuff_packetnum % connection->sendbuffer_length; memcpy(connection->sendbuffer[index].data, data, length); connection->sendbuffer[index].size = length; connection->sendbuff_packetnum++; @@ -434,26 +525,32 @@ int write_packet(Lossless_UDP *ludp, int connection_id, uint8_t *data, uint32_t } /* Put the packet numbers the we are missing in requested and return the number. */ -uint32_t missing_packets(Lossless_UDP *ludp, int connection_id, uint32_t *requested) +static uint32_t missing_packets(Lossless_UDP *ludp, int connection_id, uint32_t *requested) { + if ((unsigned int)connection_id >= ludp->connections.len) + return 0; + + Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); + /* Don't request packets if the buffer is full. */ - if (recvqueue(ludp, connection_id) >= (BUFFER_PACKET_NUM - 1)) + if (recvqueue(ludp, connection_id) >= (connection->recvbuffer_length - 1)) return 0; uint32_t number = 0; uint32_t i; uint32_t temp; - Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); - for (i = connection->recv_packetnum; i != connection->osent_packetnum; i++) { - if (connection->recvbuffer[i % MAX_QUEUE_NUM].size == 0) { + if (connection->recvbuffer[i % connection->recvbuffer_length].size == 0) { temp = htonl(i); memcpy(requested + number, &temp, 4); ++number; } + + if (number >= MAX_REQUESTED_PACKETS) + return number; } if (number == 0) @@ -485,7 +582,7 @@ static int send_handshake(Lossless_UDP *ludp, IP_Port ip_port, uint32_t handshak static int send_SYNC(Lossless_UDP *ludp, int connection_id) { Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); - uint8_t packet[(BUFFER_PACKET_NUM * 4 + 4 + 4 + 2)]; + uint8_t packet[(MAX_REQUESTED_PACKETS * 4 + 4 + 4 + 2)]; uint16_t index = 0; IP_Port ip_port = connection->ip_port; @@ -493,7 +590,7 @@ static int send_SYNC(Lossless_UDP *ludp, int connection_id) uint32_t recv_packetnum = htonl(connection->recv_packetnum); uint32_t sent_packetnum = htonl(connection->sent_packetnum); - uint32_t requested[BUFFER_PACKET_NUM]; + uint32_t requested[MAX_REQUESTED_PACKETS]; uint32_t number = missing_packets(ludp, connection_id, requested); packet[0] = NET_PACKET_SYNC; @@ -514,7 +611,7 @@ static int send_data_packet(Lossless_UDP *ludp, int connection_id, uint32_t pack { Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); - uint32_t index = packet_num % MAX_QUEUE_NUM; + uint32_t index = packet_num % connection->sendbuffer_length; uint32_t temp; uint8_t packet[1 + 4 + MAX_DATA_SIZE]; packet[0] = NET_PACKET_DATA; @@ -529,7 +626,7 @@ static int send_DATA(Lossless_UDP *ludp, int connection_id) { Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); int ret; - uint32_t buffer[BUFFER_PACKET_NUM]; + uint32_t buffer[MAX_REQUESTED_PACKETS]; if (connection->num_req_paquets > 0) { ret = send_data_packet(ludp, connection_id, connection->req_packets[0]); @@ -609,7 +706,7 @@ static int SYNC_valid(uint32_t length) if (length < 4 + 4 + 2) return 0; - if (length > (BUFFER_PACKET_NUM * 4 + 4 + 4 + 2) || + if (length > (MAX_REQUESTED_PACKETS * 4 + 4 + 4 + 2) || ((length - 4 - 4 - 2) % 4) != 0) return 0; @@ -656,6 +753,35 @@ static int handle_SYNC2(Lossless_UDP *ludp, int connection_id, uint8_t counter, return 1; } + +/* + * Automatically adjusts send rates of data packets for optimal transmission. + * + * TODO: Impove this. + */ +static void adjust_datasendspeed(Connection *connection, uint32_t req_packets) +{ + /* if there are no packets in send buffer */ + if (connection->sendbuff_packetnum - connection->successful_sent == 0) { + connection->data_rate -= connection->data_rate / 8; + + if (connection->data_rate < DATA_SYNC_RATE) + connection->data_rate = DATA_SYNC_RATE; + + return; + } + + if (req_packets <= (connection->data_rate / connection->SYNC_rate) / 20 || req_packets <= 1) { + connection->data_rate += connection->data_rate / 8; + + if (connection->data_rate > connection->sendbuffer_length * connection->SYNC_rate) + connection->data_rate = connection->sendbuffer_length * connection->SYNC_rate; + } else { + connection->data_rate -= connection->data_rate / 8; + } +} + + /* case 3 in handle_SYNC: */ static int handle_SYNC3(Lossless_UDP *ludp, int connection_id, uint8_t counter, uint32_t recv_packetnum, uint32_t sent_packetnum, @@ -672,13 +798,14 @@ static int handle_SYNC3(Lossless_UDP *ludp, int connection_id, uint8_t counter, uint32_t comp_2 = (sent_packetnum - connection->osent_packetnum); /* Packet valid. */ - if (comp_1 <= BUFFER_PACKET_NUM && - comp_2 <= BUFFER_PACKET_NUM && + if (comp_1 <= connection->sendbuffer_length && + comp_2 <= MAX_QUEUE_NUM && comp_counter == 1) { connection->orecv_packetnum = recv_packetnum; connection->osent_packetnum = sent_packetnum; connection->successful_sent = recv_packetnum; connection->last_recvSYNC = current_time(); + connection->recv_counter = counter; ++connection->send_counter; @@ -689,6 +816,7 @@ static int handle_SYNC3(Lossless_UDP *ludp, int connection_id, uint8_t counter, } connection->num_req_paquets = number; + adjust_datasendspeed(connection, number); return 0; } @@ -705,8 +833,8 @@ static int handle_SYNC(void *object, IP_Port source, uint8_t *packet, uint32_t l uint8_t counter; uint32_t temp; uint32_t recv_packetnum, sent_packetnum; - uint32_t req_packets[BUFFER_PACKET_NUM]; uint16_t number = (length - 4 - 4 - 2) / 4; + uint32_t req_packets[number]; memcpy(&counter, packet + 1, 1); memcpy(&temp, packet + 2, 4); @@ -749,17 +877,35 @@ static int add_recv(Lossless_UDP *ludp, int connection_id, uint32_t data_num, ui Connection *connection = &tox_array_get(&ludp->connections, connection_id, Connection); uint32_t i; - uint32_t maxnum = connection->successful_read + BUFFER_PACKET_NUM; + uint32_t test = data_num - connection->recv_packetnum; + + if (test > MAX_QUEUE_NUM) + return 0; + + if (test > connection->recvbuffer_length) { + if (connection->confirmed == 0) + return 0; + + uint32_t len = resize_queue(&connection->recvbuffer, connection->recvbuffer_length, test * 2, + connection->successful_read, connection->successful_read + connection->recvbuffer_length); + + if (len == (uint32_t)~0) + return 0; + + connection->recvbuffer_length = len; + } + + uint32_t maxnum = connection->successful_read + connection->recvbuffer_length; uint32_t sent_packet = data_num - connection->osent_packetnum; for (i = connection->recv_packetnum; i != maxnum; ++i) { if (i == data_num) { - memcpy(connection->recvbuffer[i % MAX_QUEUE_NUM].data, data, size); + memcpy(connection->recvbuffer[data_num % connection->recvbuffer_length].data, data, size); - connection->recvbuffer[i % MAX_QUEUE_NUM].size = size; + connection->recvbuffer[data_num % connection->recvbuffer_length].size = size; connection->last_recvdata = current_time(); - if (sent_packet < BUFFER_PACKET_NUM) + if (sent_packet < connection->recvbuffer_length) connection->osent_packetnum = data_num; break; @@ -767,7 +913,7 @@ static int add_recv(Lossless_UDP *ludp, int connection_id, uint32_t data_num, ui } for (i = connection->recv_packetnum; i != maxnum; ++i) { - if (connection->recvbuffer[i % MAX_QUEUE_NUM].size != 0) + if (connection->recvbuffer[i % connection->recvbuffer_length].size != 0) connection->recv_packetnum = i; else break; @@ -880,7 +1026,7 @@ static void do_data(Lossless_UDP *ludp) } } -#define MAX_SYNC_RATE 10 +#define MAX_SYNC_RATE 20 /* * Automatically adjusts send rates of packets for optimal transmission. @@ -897,12 +1043,12 @@ static void adjust_rates(Lossless_UDP *ludp) if (tmp->status == 3) { if (sendqueue(ludp, tmp_i) != 0) { - tmp->data_rate = (BUFFER_PACKET_NUM - tmp->num_req_paquets) * MAX_SYNC_RATE; tmp->SYNC_rate = MAX_SYNC_RATE; - } else if (tmp->last_recvdata + 1000000UL > temp_time) + } else if (tmp->last_recvdata + 200000UL > temp_time) { /* 200 ms */ tmp->SYNC_rate = MAX_SYNC_RATE; - else + } else { tmp->SYNC_rate = SYNC_RATE; + } } } } @@ -918,6 +1064,11 @@ void do_lossless_udp(Lossless_UDP *ludp) void kill_lossless_udp(Lossless_UDP *ludp) { + uint32_t i; + + for (i = 0; i < ludp->connections.len; ++i) + kill_connection(ludp, i); + tox_array_delete(&ludp->connections); free(ludp); } diff --git a/toxcore/Lossless_UDP.h b/toxcore/Lossless_UDP.h index f0ce0e87..aa6e344f 100644 --- a/toxcore/Lossless_UDP.h +++ b/toxcore/Lossless_UDP.h @@ -32,10 +32,11 @@ #define MAX_DATA_SIZE 1024 /* Maximum data packets in sent and receive queues. */ -#define MAX_QUEUE_NUM 16 +#define MAX_QUEUE_NUM 1024 +#define DEFAULT_QUEUE_NUM 4 /* Maximum number of data packets in the buffer. */ -#define BUFFER_PACKET_NUM (16-1) +#define MAX_REQUESTED_PACKETS 256 /* Timeout per connection is randomly set between CONNEXION_TIMEOUT and 2*CONNEXION_TIMEOUT. */ #define CONNEXION_TIMEOUT 5 @@ -71,7 +72,7 @@ typedef struct { uint8_t inbound; uint16_t SYNC_rate; /* Current SYNC packet send rate packets per second. */ - uint16_t data_rate; /* Current data packet send rate packets per second. */ + uint32_t data_rate; /* Current data packet send rate packets per second. */ uint64_t last_SYNC; /* Time our last SYNC packet was sent. */ uint64_t last_sent; /* Time our last data or handshake packet was sent. */ @@ -79,9 +80,10 @@ typedef struct { uint64_t last_recvdata; /* Time we last received a DATA packet from the other. */ uint64_t killat; /* Time to kill the connection. */ - Data sendbuffer[MAX_QUEUE_NUM]; /* packet send buffer. */ - Data recvbuffer[MAX_QUEUE_NUM]; /* packet receive buffer. */ - + Data *sendbuffer; /* packet send buffer. */ + uint32_t sendbuffer_length; + Data *recvbuffer; /* packet receive buffer. */ + uint32_t recvbuffer_length; uint32_t handshake_id1; uint32_t handshake_id2; @@ -107,7 +109,7 @@ typedef struct { uint32_t successful_read; /* List of currently requested packet numbers(by the other person). */ - uint32_t req_packets[BUFFER_PACKET_NUM]; + uint32_t req_packets[MAX_REQUESTED_PACKETS]; /* Total number of currently requested packets(by the other person). */ uint16_t num_req_paquets; @@ -115,6 +117,9 @@ typedef struct { uint8_t recv_counter; uint8_t send_counter; uint8_t timeout; /* connection timeout in seconds. */ + + /* is the connection confirmed or not 1 if yes, 0 if no */ + uint8_t confirmed; } Connection; typedef struct { @@ -168,6 +173,14 @@ int kill_connection(Lossless_UDP *ludp, int connection_id); */ int kill_connection_in(Lossless_UDP *ludp, int connection_id, uint32_t seconds); +/* Confirm an incoming connection. + * Also disables the auto kill timeout on incomming connections. + * + * return 0 on success + * return -1 on failure. + */ +int confirm_connection(Lossless_UDP *ludp, int connection_id); + /* returns the ip_port of the corresponding connection. * return 0 if there is no such connection. */ diff --git a/toxcore/net_crypto.c b/toxcore/net_crypto.c index a2e42557..ca23957d 100644 --- a/toxcore/net_crypto.c +++ b/toxcore/net_crypto.c @@ -749,8 +749,8 @@ static void receive_crypto(Net_Crypto *c) c->crypto_connections[i].shared_key); c->crypto_connections[i].status = CONN_ESTABLISHED; - /* Connection is accepted so we disable the auto kill by setting it to about 1 month from now. */ - kill_connection_in(c->lossless_udp, c->crypto_connections[i].number, 3000000); + /* Connection is accepted. */ + confirm_connection(c->lossless_udp, c->crypto_connections[i].number); } else { /* This should not happen, kill the connection if it does. */ crypto_kill(c, i);