Some thread safety related fixes to TCP connections in net_crypto.

Added a recursive mutex to fix possible thread issues when the A/V
thread sends data at the same time as the main thread.
This commit is contained in:
irungentoo 2014-08-16 22:09:29 -04:00
parent 162a900660
commit 9134048bb5
No known key found for this signature in database
GPG Key ID: 10349DC9BED89E98
2 changed files with 68 additions and 30 deletions

View File

@ -385,7 +385,7 @@ static Crypto_Connection *get_crypto_connection(const Net_Crypto *c, int crypt_c
* return -1 on failure.
* return 0 on success.
*/
static int send_packet_to(const Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint16_t length)
static int send_packet_to(Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint16_t length)
{
//TODO TCP, etc...
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -417,17 +417,35 @@ static int send_packet_to(const Net_Crypto *c, int crypt_connection_id, const ui
unsigned int r = crypt_connection_id;
for (i = 0; i < MAX_TCP_CONNECTIONS; ++i) {
pthread_mutex_lock(&c->tcp_mutex);
int ret = 0;
if (conn->status_tcp[(i + r) % MAX_TCP_CONNECTIONS] == STATUS_TCP_ONLINE) {/* friend is connected to this relay. */
if (send_data(c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS], conn->con_number_tcp[(i + r) % MAX_TCP_CONNECTIONS],
data, length) == 1)
return 0;
ret = send_data(c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS], conn->con_number_tcp[(i + r) % MAX_TCP_CONNECTIONS],
data, length);
}
pthread_mutex_unlock(&c->tcp_mutex);
if (ret == 1) {
return 0;
}
}
for (i = 0; i < MAX_TCP_CONNECTIONS; ++i) {
pthread_mutex_lock(&c->tcp_mutex);
int ret = 0;
if (conn->status_tcp[(i + r) % MAX_TCP_CONNECTIONS] == STATUS_TCP_INVISIBLE) {
if (send_oob_packet(c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS], conn->dht_public_key, data, length) == 1)
return 0;
ret = send_oob_packet(c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS], conn->dht_public_key, data, length);
}
pthread_mutex_unlock(&c->tcp_mutex);
if (ret == 1) {
return 0;
}
}
@ -706,7 +724,7 @@ static int handle_request_packet(Packets_Array *send_array, const uint8_t *data,
* return -1 on failure.
* return 0 on success.
*/
static int send_data_packet(const Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint16_t length)
static int send_data_packet(Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint16_t length)
{
if (length == 0 || length + (1 + sizeof(uint16_t) + crypto_box_MACBYTES) > MAX_CRYPTO_PACKET_SIZE)
return -1;
@ -738,7 +756,7 @@ static int send_data_packet(const Net_Crypto *c, int crypt_connection_id, const
* return -1 on failure.
* return 0 on success.
*/
static int send_data_packet_helper(const Net_Crypto *c, int crypt_connection_id, uint32_t buffer_start, uint32_t num,
static int send_data_packet_helper(Net_Crypto *c, int crypt_connection_id, uint32_t buffer_start, uint32_t num,
const uint8_t *data, uint32_t length)
{
if (length == 0 || length > MAX_CRYPTO_DATA_SIZE)
@ -759,7 +777,7 @@ static int send_data_packet_helper(const Net_Crypto *c, int crypt_connection_id,
/* return -1 if data could not be put in packet queue.
* return positive packet number if data was put into the queue.
*/
static int64_t send_lossless_packet(const Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint32_t length)
static int64_t send_lossless_packet(Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint32_t length)
{
if (length == 0 || length > MAX_CRYPTO_DATA_SIZE)
return -1;
@ -868,7 +886,7 @@ static int handle_data_packet(const Net_Crypto *c, int crypt_connection_id, uint
* return -1 on failure.
* return 0 on success.
*/
static int send_request_packet(const Net_Crypto *c, int crypt_connection_id)
static int send_request_packet(Net_Crypto *c, int crypt_connection_id)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -890,7 +908,7 @@ static int send_request_packet(const Net_Crypto *c, int crypt_connection_id)
* return -1 on failure.
* return number of packets sent on success.
*/
static int send_requested_packets(const Net_Crypto *c, int crypt_connection_id, uint16_t max_num)
static int send_requested_packets(Net_Crypto *c, int crypt_connection_id, uint16_t max_num)
{
if (max_num == 0)
return -1;
@ -990,7 +1008,7 @@ static int clear_temp_packet(const Net_Crypto *c, int crypt_connection_id)
* return -1 on failure.
* return 0 on success.
*/
static int send_temp_packet(const Net_Crypto *c, int crypt_connection_id)
static int send_temp_packet(Net_Crypto *c, int crypt_connection_id)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -1014,7 +1032,7 @@ static int send_temp_packet(const Net_Crypto *c, int crypt_connection_id)
* return -1 on failure.
* return 0 on success.
*/
static int create_send_handshake(const Net_Crypto *c, int crypt_connection_id, const uint8_t *cookie,
static int create_send_handshake(Net_Crypto *c, int crypt_connection_id, const uint8_t *cookie,
const uint8_t *dht_public_key)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -1040,7 +1058,7 @@ static int create_send_handshake(const Net_Crypto *c, int crypt_connection_id, c
* return -1 on failure.
* return 0 on success.
*/
static int send_kill_packet(const Net_Crypto *c, int crypt_connection_id)
static int send_kill_packet(Net_Crypto *c, int crypt_connection_id)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -1531,7 +1549,7 @@ int new_crypto_connection(Net_Crypto *c, const uint8_t *real_public_key)
* return -1 on failure.
* return 0 on success.
*/
static int disconnect_peer_tcp(const Net_Crypto *c, int crypt_connection_id)
static int disconnect_peer_tcp(Net_Crypto *c, int crypt_connection_id)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -1542,9 +1560,11 @@ static int disconnect_peer_tcp(const Net_Crypto *c, int crypt_connection_id)
for (i = 0; i < MAX_TCP_CONNECTIONS; ++i) {
if (conn->status_tcp[i] != STATUS_TCP_NULL) {
pthread_mutex_lock(&c->tcp_mutex);
send_disconnect_request(c->tcp_connections[i], conn->con_number_tcp[i]);
conn->status_tcp[i] = STATUS_TCP_NULL;
conn->con_number_tcp[i] = 0;
pthread_mutex_unlock(&c->tcp_mutex);
}
}
@ -1556,7 +1576,7 @@ static int disconnect_peer_tcp(const Net_Crypto *c, int crypt_connection_id)
* return -1 on failure.
* return 0 on success.
*/
static int connect_peer_tcp(const Net_Crypto *c, int crypt_connection_id)
static int connect_peer_tcp(Net_Crypto *c, int crypt_connection_id)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -1569,8 +1589,10 @@ static int connect_peer_tcp(const Net_Crypto *c, int crypt_connection_id)
if (c->tcp_connections[i] == NULL)
continue;
pthread_mutex_lock(&c->tcp_mutex);
//TODO check function return?
send_routing_request(c->tcp_connections[i], conn->dht_public_key);
pthread_mutex_unlock(&c->tcp_mutex);
}
return 0;
@ -1603,7 +1625,7 @@ uint64_t get_connection_dht_key(const Net_Crypto *c, int crypt_connection_id, ui
* return -1 on failure.
* return 0 on success.
*/
int set_connection_dht_public_key(const Net_Crypto *c, int crypt_connection_id, const uint8_t *dht_public_key,
int set_connection_dht_public_key(Net_Crypto *c, int crypt_connection_id, const uint8_t *dht_public_key,
uint64_t timestamp)
{
Crypto_Connection *conn = get_crypto_connection(c, crypt_connection_id);
@ -1936,13 +1958,17 @@ int add_tcp_relay(Net_Crypto *c, IP_Port ip_port, const uint8_t *public_key)
* return 0 on success.
* return -1 on failure.
*/
int send_tcp_onion_request(const Net_Crypto *c, const uint8_t *data, uint16_t length)
int send_tcp_onion_request(Net_Crypto *c, const uint8_t *data, uint16_t length)
{
unsigned int i, r = rand();
for (i = 0; i < MAX_TCP_CONNECTIONS; ++i) {
if (c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS]) {
if (send_onion_request(c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS], data, length) == 1)
pthread_mutex_lock(&c->tcp_mutex);
int ret = send_onion_request(c->tcp_connections[(i + r) % MAX_TCP_CONNECTIONS], data, length);
pthread_mutex_unlock(&c->tcp_mutex);
if (ret == 1)
return 0;
}
}
@ -2053,7 +2079,11 @@ static void do_tcp(Net_Crypto *c)
do_TCP_connection(c->tcp_connections_new[i]);
if (c->tcp_connections_new[i]->status == TCP_CLIENT_CONFIRMED) {
if (add_tcp_connected(c, c->tcp_connections_new[i]) == 0) {
pthread_mutex_lock(&c->tcp_mutex);
int ret = add_tcp_connected(c, c->tcp_connections_new[i]);
pthread_mutex_unlock(&c->tcp_mutex);
if (ret == 0) {
c->tcp_connections_new[i] = NULL;
} else {
kill_TCP_connection(c->tcp_connections_new[i]);
@ -2066,7 +2096,9 @@ static void do_tcp(Net_Crypto *c)
if (c->tcp_connections[i] == NULL)
continue;
pthread_mutex_lock(&c->tcp_mutex);
do_TCP_connection(c->tcp_connections[i]);
pthread_mutex_unlock(&c->tcp_mutex);
}
}
@ -2106,19 +2138,23 @@ static void clear_disconnected_tcp(Net_Crypto *c)
if (tcp_con->status != TCP_CLIENT_DISCONNECTED)
continue;
c->tcp_connections[i] = NULL;
/* Try reconnecting to relay on disconnect. */
add_tcp_relay(c, tcp_con->ip_port, tcp_con->public_key);
pthread_mutex_lock(&c->tcp_mutex);
c->tcp_connections[i] = NULL;
kill_TCP_connection(tcp_con);
for (j = 0; j < c->crypto_connections_length; ++j) {
Crypto_Connection *conn = get_crypto_connection(c, j);
if (conn == 0)
return;
continue;
clear_disconnected_tcp_peer(conn, i);
}
pthread_mutex_unlock(&c->tcp_mutex);
}
}
@ -2240,9 +2276,6 @@ static int udp_handle_packet(void *object, IP_Port source, const uint8_t *packet
return 0;
}
/* Value to set sending variable */
#define CONN_SENDING_VALUE 2
/* The dT for the average packet recieving rate calculations.
Also used as the */
#define PACKET_COUNTER_AVERAGE_INTERVAL 100
@ -2306,7 +2339,6 @@ static void send_crypto_packets(Net_Crypto *c)
/* conjestion control
calculate a new value of conn->packet_send_rate based on some data
notes: needs improvement but seems to work fine for packet loss <1%
*/
unsigned int pos = conn->last_sendqueue_counter % CONGESTION_QUEUE_ARRAY_SIZE;
@ -2409,7 +2441,7 @@ uint32_t crypto_num_free_sendqueue_slots(const Net_Crypto *c, int crypt_connecti
*
* The first byte of data must be in the CRYPTO_RESERVED_PACKETS to PACKET_ID_LOSSY_RANGE_START range.
*/
int64_t write_cryptpacket(const Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint32_t length)
int64_t write_cryptpacket(Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint32_t length)
{
if (length == 0)
return -1;
@ -2583,6 +2615,10 @@ Net_Crypto *new_net_crypto(DHT *dht, TCP_Proxy_Info *proxy_info)
bs_list_init(&temp->ip_port_list, sizeof(IP_Port), 8);
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&temp->tcp_mutex, &attr);
pthread_mutex_init(&temp->connections_mutex, NULL);
if (proxy_info) {
@ -2663,6 +2699,7 @@ void kill_net_crypto(Net_Crypto *c)
kill_TCP_connection(c->tcp_connections[i]);
}
pthread_mutex_destroy(&c->tcp_mutex);
pthread_mutex_destroy(&c->connections_mutex);
bs_list_free(&c->ip_port_list);

View File

@ -179,6 +179,7 @@ typedef struct {
Crypto_Connection *crypto_connections;
TCP_Client_Connection *tcp_connections_new[MAX_TCP_CONNECTIONS];
TCP_Client_Connection *tcp_connections[MAX_TCP_CONNECTIONS];
pthread_mutex_t tcp_mutex;
pthread_mutex_t connections_mutex;
unsigned int connection_use_counter;
@ -246,7 +247,7 @@ uint64_t get_connection_dht_key(const Net_Crypto *c, int crypt_connection_id, ui
* return -1 on failure.
* return 0 on success.
*/
int set_connection_dht_public_key(const Net_Crypto *c, int crypt_connection_id, const uint8_t *dht_public_key,
int set_connection_dht_public_key(Net_Crypto *c, int crypt_connection_id, const uint8_t *dht_public_key,
uint64_t timestamp);
/* Set the direct ip of the crypto connection.
@ -305,7 +306,7 @@ uint32_t crypto_num_free_sendqueue_slots(const Net_Crypto *c, int crypt_connecti
*
* The first byte of data must be in the CRYPTO_RESERVED_PACKETS to PACKET_ID_LOSSY_RANGE_START range.
*/
int64_t write_cryptpacket(const Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint32_t length);
int64_t write_cryptpacket(Net_Crypto *c, int crypt_connection_id, const uint8_t *data, uint32_t length);
/* return -1 on failure.
* return 0 on success.
@ -338,7 +339,7 @@ void tcp_onion_response_handler(Net_Crypto *c, int (*tcp_onion_callback)(void *o
* return 0 on success.
* return -1 on failure.
*/
int send_tcp_onion_request(const Net_Crypto *c, const uint8_t *data, uint16_t length);
int send_tcp_onion_request(Net_Crypto *c, const uint8_t *data, uint16_t length);
/* Copy a maximum of num TCP relays we are connected to to tcp_relays.
* NOTE that the family of the copied ip ports will be set to TCP_INET or TCP_INET6.