epoll for tcp server

This commit is contained in:
notsecure 2014-05-20 20:25:45 +00:00
parent 248fd212ba
commit 2c8f6ffc3f
2 changed files with 253 additions and 52 deletions

View File

@ -775,7 +775,7 @@ static int confirm_TCP_connection(TCP_Server *TCP_server, TCP_Secure_Connection
kill_accepted(TCP_server, index);
}
return 0;
return index;
}
/* return 1 on success
@ -818,7 +818,10 @@ static sock_t new_listening_TCP_socket(int family, uint16_t port)
return ~0;
}
int ok = set_socket_nonblock(sock);
int ok = 1;
#ifndef TCP_SERVER_USE_EPOLL
ok = set_socket_nonblock(sock);
#endif
if (ok && family == AF_INET6) {
ok = set_socket_dualstack(sock);
@ -856,6 +859,16 @@ TCP_Server *new_TCP_server(uint8_t ipv6_enabled, uint16_t num_sockets, uint16_t
return NULL;
}
#ifdef TCP_SERVER_USE_EPOLL
temp->efd = epoll_create1(0);
if (temp->efd == -1) {
free(temp);
return NULL;
}
#endif
uint8_t family;
if (ipv6_enabled) {
@ -865,11 +878,24 @@ TCP_Server *new_TCP_server(uint8_t ipv6_enabled, uint16_t num_sockets, uint16_t
}
uint32_t i;
#ifdef TCP_SERVER_USE_EPOLL
struct epoll_event ev;
#endif
for (i = 0; i < num_sockets; ++i) {
sock_t sock = new_listening_TCP_socket(family, ports[i]);
if (sock_valid(sock)) {
#ifdef TCP_SERVER_USE_EPOLL
ev.events = EPOLLIN;
ev.data.u64 = sock | ((uint64_t)TCP_SOCKET_LISTENING << 32);
if (epoll_ctl(temp->efd, EPOLL_CTL_ADD, sock, &ev) == -1) {
continue;
}
#endif
temp->socks_listening[temp->num_listening_socks] = sock;
++temp->num_listening_socks;
}
@ -908,30 +934,89 @@ static void do_TCP_accept_new(TCP_Server *TCP_server)
}
}
static int do_incoming(TCP_Server *TCP_server, uint32_t i)
{
if (TCP_server->incomming_connection_queue[i].status != TCP_STATUS_CONNECTED)
return -1;
int ret = read_connection_handshake(&TCP_server->incomming_connection_queue[i], TCP_server->secret_key);
if (ret == -1) {
kill_TCP_connection(&TCP_server->incomming_connection_queue[i]);
} else if (ret == 1) {
int index_new = TCP_server->unconfirmed_connection_queue_index % MAX_INCOMMING_CONNECTIONS;
TCP_Secure_Connection *conn_old = &TCP_server->incomming_connection_queue[i];
TCP_Secure_Connection *conn_new = &TCP_server->unconfirmed_connection_queue[index_new];
if (conn_new->status != TCP_STATUS_NO_STATUS)
kill_TCP_connection(conn_new);
memcpy(conn_new, conn_old, sizeof(TCP_Secure_Connection));
memset(conn_old, 0, sizeof(TCP_Secure_Connection));
++TCP_server->unconfirmed_connection_queue_index;
return index_new;
}
return -1;
}
static int do_unconfirmed(TCP_Server *TCP_server, uint32_t i)
{
TCP_Secure_Connection *conn = &TCP_server->unconfirmed_connection_queue[i];
if (conn->status != TCP_STATUS_UNCONFIRMED)
return -1;
uint8_t packet[MAX_PACKET_SIZE];
int len = read_packet_TCP_secure_connection(conn->sock, &conn->next_packet_length, conn->shared_key, conn->recv_nonce,
packet, sizeof(packet));
if (len == 0) {
return -1;
} else if (len == -1) {
kill_TCP_connection(conn);
return -1;
} else {
int index_new;
if ((index_new = confirm_TCP_connection(TCP_server, conn, packet, len)) == -1) {
kill_TCP_connection(conn);
} else {
memset(conn, 0, sizeof(TCP_Secure_Connection));
}
return index_new;
}
}
static int do_confirmed_recv(TCP_Server *TCP_server, uint32_t i)
{
TCP_Secure_Connection *conn = &TCP_server->accepted_connection_array[i];
uint8_t packet[MAX_PACKET_SIZE];
int len;
while ((len = read_packet_TCP_secure_connection(conn->sock, &conn->next_packet_length, conn->shared_key,
conn->recv_nonce, packet, sizeof(packet)))) {
if (len == -1) {
kill_accepted(TCP_server, i);
break;
}
if (handle_TCP_packet(TCP_server, i, packet, len) == -1) {
kill_accepted(TCP_server, i);
break;
}
}
}
static void do_TCP_incomming(TCP_Server *TCP_server)
{
uint32_t i;
for (i = 0; i < MAX_INCOMMING_CONNECTIONS; ++i) {
if (TCP_server->incomming_connection_queue[i].status != TCP_STATUS_CONNECTED)
continue;
int ret = read_connection_handshake(&TCP_server->incomming_connection_queue[i], TCP_server->secret_key);
if (ret == -1) {
kill_TCP_connection(&TCP_server->incomming_connection_queue[i]);
} else if (ret == 1) {
TCP_Secure_Connection *conn_old = &TCP_server->incomming_connection_queue[i];
TCP_Secure_Connection *conn_new =
&TCP_server->unconfirmed_connection_queue[TCP_server->unconfirmed_connection_queue_index % MAX_INCOMMING_CONNECTIONS];
if (conn_new->status != TCP_STATUS_NO_STATUS)
kill_TCP_connection(conn_new);
memcpy(conn_new, conn_old, sizeof(TCP_Secure_Connection));
memset(conn_old, 0, sizeof(TCP_Secure_Connection));
++TCP_server->unconfirmed_connection_queue_index;
}
do_incoming(TCP_server, i);
}
}
@ -940,27 +1025,7 @@ static void do_TCP_unconfirmed(TCP_Server *TCP_server)
uint32_t i;
for (i = 0; i < MAX_INCOMMING_CONNECTIONS; ++i) {
TCP_Secure_Connection *conn = &TCP_server->unconfirmed_connection_queue[i];
if (conn->status != TCP_STATUS_UNCONFIRMED)
continue;
uint8_t packet[MAX_PACKET_SIZE];
int len = read_packet_TCP_secure_connection(conn->sock, &conn->next_packet_length, conn->shared_key, conn->recv_nonce,
packet, sizeof(packet));
if (len == 0) {
continue;
} else if (len == -1) {
kill_TCP_connection(conn);
continue;
} else {
if (confirm_TCP_connection(TCP_server, conn, packet, len) == -1) {
kill_TCP_connection(conn);
} else {
memset(conn, 0, sizeof(TCP_Secure_Connection));
}
}
do_unconfirmed(TCP_server, i);
}
}
@ -1002,31 +1067,148 @@ static void do_TCP_confirmed(TCP_Server *TCP_server)
}
send_pending_data(conn);
uint8_t packet[MAX_PACKET_SIZE];
int len;
while ((len = read_packet_TCP_secure_connection(conn->sock, &conn->next_packet_length, conn->shared_key,
conn->recv_nonce, packet, sizeof(packet)))) {
if (len == -1) {
kill_accepted(TCP_server, i);
break;
#ifndef TCP_SERVER_USE_EPOLL
do_confirmed_recv(TCP_server, i);
#endif
}
}
#ifdef TCP_SERVER_USE_EPOLL
static void do_TCP_epoll(TCP_Server *TCP_server)
{
#define MAX_EVENTS 16
struct epoll_event events[MAX_EVENTS];
int nfds;
while ((nfds = epoll_wait(TCP_server->efd, events, MAX_EVENTS, 0)) > 0) {
int n;
for (n = 0; n < nfds; ++n) {
sock_t sock = events[n].data.u64 & 0xFFFFFFFF;
int status = (events[n].data.u64 >> 32) & 0xFFFF, index = (events[n].data.u64 >> 48);
if ((events[n].events & EPOLLERR) || (events[n].events & EPOLLHUP)) {
switch (status) {
case TCP_SOCKET_LISTENING: {
//should never happen
break;
}
case TCP_SOCKET_INCOMING: {
kill_TCP_connection(&TCP_server->incomming_connection_queue[index]);
break;
}
case TCP_SOCKET_UNCONFIRMED: {
kill_TCP_connection(&TCP_server->unconfirmed_connection_queue[index]);
break;
}
case TCP_SOCKET_CONFIRMED: {
kill_accepted(TCP_server, index);
break;
}
}
continue;
}
if (handle_TCP_packet(TCP_server, i, packet, len) == -1) {
kill_accepted(TCP_server, i);
break;
if (!(events[n].events & EPOLLIN)) {
continue;
}
switch (status) {
case TCP_SOCKET_LISTENING: {
//socket is from socks_listening, accept connection
struct sockaddr_storage addr;
unsigned int addrlen = sizeof(addr);
sock_t sock_new;
sock_new = accept(sock, (struct sockaddr *)&addr, &addrlen);
struct sockaddr a = *(struct sockaddr *)&addr;
int index_new = TCP_server->incomming_connection_queue_index % MAX_INCOMMING_CONNECTIONS;
if (!accept_connection(TCP_server, sock_new)) {
break;
}
struct epoll_event ev = {
.events = EPOLLIN | EPOLLET,
.data.u64 = sock_new | ((uint64_t)TCP_SOCKET_INCOMING << 32) | ((uint64_t)index_new << 48)
};
if (epoll_ctl(TCP_server->efd, EPOLL_CTL_ADD, sock_new, &ev) == -1) {
kill_TCP_connection(&TCP_server->incomming_connection_queue[index_new]);
break;
}
break;
}
case TCP_SOCKET_INCOMING: {
int index_new;
if ((index_new = do_incoming(TCP_server, index)) != -1) {
events[n].events = EPOLLIN | EPOLLET;
events[n].data.u64 = sock | ((uint64_t)TCP_SOCKET_UNCONFIRMED << 32) | ((uint64_t)index_new << 48);
if (epoll_ctl(TCP_server->efd, EPOLL_CTL_MOD, sock, &events[n]) == -1) {
kill_TCP_connection(&TCP_server->unconfirmed_connection_queue[index_new]);
break;
}
}
break;
}
case TCP_SOCKET_UNCONFIRMED: {
int index_new;
if ((index_new = do_unconfirmed(TCP_server, index)) != -1) {
events[n].events = EPOLLIN | EPOLLET;
events[n].data.u64 = sock | ((uint64_t)TCP_SOCKET_CONFIRMED << 32) | ((uint64_t)index_new << 48);
if (epoll_ctl(TCP_server->efd, EPOLL_CTL_MOD, sock, &events[n]) == -1) {
//remove from confirmed connections
kill_accepted(TCP_server, index_new);
break;
}
}
break;
}
case TCP_SOCKET_CONFIRMED: {
do_confirmed_recv(TCP_server, index);
break;
}
}
}
}
#undef MAX_EVENTS
}
#endif
void do_TCP_server(TCP_Server *TCP_server)
{
unix_time_update();
#ifdef TCP_SERVER_USE_EPOLL
do_TCP_epoll(TCP_server);
#else
do_TCP_accept_new(TCP_server);
do_TCP_incomming(TCP_server);
do_TCP_unconfirmed(TCP_server);
#endif
do_TCP_confirmed(TCP_server);
}
@ -1044,6 +1226,10 @@ void kill_TCP_server(TCP_Server *TCP_server)
list_free(&TCP_server->accepted_key_list);
#ifdef TCP_SERVER_USE_EPOLL
close(TCP_server->efd);
#endif
free(TCP_server->socks_listening);
free(TCP_server);
}

View File

@ -27,6 +27,10 @@
#include "onion.h"
#include "list.h"
#ifdef TCP_SERVER_USE_EPOLL
#include "sys/epoll.h"
#endif
#if defined(_WIN32) || defined(__WIN32__) || defined(WIN32) || defined(__MACH__)
#define MSG_NOSIGNAL 0
#endif
@ -62,6 +66,13 @@
#define TCP_PING_FREQUENCY 30
#define TCP_PING_TIMEOUT 10
#ifdef TCP_SERVER_USE_EPOLL
#define TCP_SOCKET_LISTENING 0
#define TCP_SOCKET_INCOMING 1
#define TCP_SOCKET_UNCONFIRMED 2
#define TCP_SOCKET_CONFIRMED 3
#endif
enum {
TCP_STATUS_NO_STATUS,
TCP_STATUS_CONNECTED,
@ -96,6 +107,10 @@ typedef struct TCP_Secure_Connection {
typedef struct {
Onion *onion;
#ifdef TCP_SERVER_USE_EPOLL
int efd;
#endif
sock_t *socks_listening;
unsigned int num_listening_socks;