Base of group chats seems to be working now.

This commit is contained in:
irungentoo 2013-09-05 17:00:41 -04:00
parent c59975dd7e
commit cc8a536cb0
3 changed files with 169 additions and 21 deletions

View File

@ -111,8 +111,10 @@ static int peer_okping(Group_Chat *chat, uint8_t *client_id)
uint64_t temp_time = unix_time();
for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) {
if (chat->close[i].last_recv < temp_time + BAD_NODE_TIMEOUT)
if (chat->close[i].last_recv + BAD_NODE_TIMEOUT < temp_time) {
++j;
continue;
}
/* Equal */
if (memcmp(chat->close[i].client_id, client_id, crypto_box_PUBLICKEYBYTES) == 0)
@ -147,7 +149,7 @@ static int add_closepeer(Group_Chat *chat, uint8_t *client_id, IP_Port ip_port)
}
for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) { /* Try replacing bad nodes first */
if (chat->close[i].last_recv < temp_time + BAD_NODE_TIMEOUT) {
if (chat->close[i].last_recv + BAD_NODE_TIMEOUT < temp_time) {
memcpy(chat->close[i].client_id, client_id, crypto_box_PUBLICKEYBYTES);
chat->close[i].ip_port = ip_port;
chat->close[i].last_recv = temp_time;
@ -224,6 +226,7 @@ static int addpeer(Group_Chat *chat, uint8_t *client_id)
Group_Peer *temp;
temp = realloc(chat->group, sizeof(Group_Peer) * (chat->numpeers + 1));
memset(&(temp[chat->numpeers]), 0, sizeof(Group_Peer));
if (temp == NULL)
return -1;
@ -301,14 +304,11 @@ static int send_sendnodes(Group_Chat *chat, IP_Port ip_port, int peernum, uint64
for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) {
if (chat->close[i].last_recv + BAD_NODE_TIMEOUT > temp_time) {
memcpy(contents.nodes[j].client_id, chat->close[i].client_id, crypto_box_PUBLICKEYBYTES);
contents.nodes[j].ip_port = ip_port;
contents.nodes[j].ip_port = chat->close[i].ip_port;
++j;
}
}
if (j == 0)
return -1;
return send_groupchatpacket(chat, ip_port, chat->group[peernum].client_id, (uint8_t *)&contents,
sizeof(contents.pingid) + sizeof(groupchat_nodes) * j, 49);
}
@ -324,6 +324,10 @@ static int handle_getnodes(Group_Chat *chat, IP_Port source, int peernum, uint8_
getnodes_data contents;
memcpy(&contents, data, sizeof(contents));
send_sendnodes(chat, source, peernum, contents.pingid);
if (peer_okping(chat, chat->group[peernum].client_id) > 0)
send_getnodes(chat, source, peernum);
return 0;
}
@ -332,7 +336,7 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8
if (peernum < 0 || peernum >= chat->numpeers)
return 1;
if (len > sizeof(sendnodes_data) || len < (sizeof(uint64_t) + sizeof(groupchat_nodes)))
if (len > sizeof(sendnodes_data) || len < sizeof(uint64_t))
return 1;
if ((len - sizeof(uint64_t)) % sizeof(groupchat_nodes) != 0)
@ -354,6 +358,10 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8
if (peer_okping(chat, contents.nodes[i].client_id) > 0) {
int peern = peer_in_chat(chat, contents.nodes[i].client_id);
if (peern == -1) { /*NOTE: This is just for testing and will be removed later.*/
peern = addpeer(chat, contents.nodes[i].client_id);
}
if (peern == -1)
continue;
@ -364,17 +372,37 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8
add_closepeer(chat, chat->group[peernum].client_id, source);
return 0;
}
#define GROUP_DATA_MIN_SIZE (crypto_box_PUBLICKEYBYTES + sizeof(uint32_t) + 1)
static int handle_data(Group_Chat *chat, uint8_t *data, uint32_t len)
{
if (len < 2)
if (len < GROUP_DATA_MIN_SIZE)
return 1;
//TODO:
int peernum = peer_in_chat(chat, data);
if (peernum == -1) { /*NOTE: This is just for testing and will be removed later.*/
peernum = addpeer(chat, data);
}
if (peernum == -1)
return 1;
uint32_t message_num;
memcpy(&message_num, data + crypto_box_PUBLICKEYBYTES, sizeof(uint32_t));
message_num = ntohl(message_num);
if (message_num - chat->group[peernum].last_message_number > 64 ||
message_num == chat->group[peernum].last_message_number)
return 1;
chat->group[peernum].last_message_number = message_num;
int handled = 0;
if (data[0] == 64 && chat->group_message != NULL) {
//TODO
(*chat->group_message)(chat, 0, data + 1, len - 1, chat->group_message_userdata);
if (data[crypto_box_PUBLICKEYBYTES + sizeof(message_num)] == 64
&& chat->group_message != NULL) { /* If message is chat message */
(*chat->group_message)(chat, peernum, data + GROUP_DATA_MIN_SIZE, len - 1, chat->group_message_userdata);
handled = 1;
}
@ -386,6 +414,19 @@ static int handle_data(Group_Chat *chat, uint8_t *data, uint32_t len)
return 1;
}
static uint8_t send_data(Group_Chat *chat, uint8_t *data, uint32_t len, uint8_t message_id)
{
if (len + GROUP_DATA_MIN_SIZE > MAX_DATA_SIZE) /*NOTE: not the real maximum len.*/
return 1;
uint8_t packet[MAX_DATA_SIZE];
uint32_t message_num = htonl(chat->message_number);
//TODO
memcpy(packet, chat->self_public_key, crypto_box_PUBLICKEYBYTES);
memcpy(packet + crypto_box_PUBLICKEYBYTES, &message_num, sizeof(message_num));
packet[crypto_box_PUBLICKEYBYTES + sizeof(message_num)] = message_id;
return sendto_allpeers(chat, packet, len + GROUP_DATA_MIN_SIZE, 50);
}
/*
* Handle get nodes group packet.
*
@ -417,7 +458,6 @@ int handle_groupchatpacket(Group_Chat *chat, IP_Port source, uint8_t *packet, ui
if (peernum == -1)
return 1;
switch (number) {
case 48:
return handle_getnodes(chat, source, peernum, data, len);
@ -435,9 +475,9 @@ int handle_groupchatpacket(Group_Chat *chat, IP_Port source, uint8_t *packet, ui
return 1;
}
uint32_t m_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length)
uint32_t group_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length)
{
return send_data(chat, message, length, 64); //TODO: better return values?
}
void callback_groupmessage(Group_Chat *chat, void (*function)(Group_Chat *chat, int, uint8_t *, uint16_t, void *),
@ -454,9 +494,34 @@ Group_Chat *new_groupchat(Networking_Core *net)
Group_Chat *chat = calloc(1, sizeof(Group_Chat));
chat->net = net;
crypto_box_keypair(chat->self_public_key, chat->self_secret_key);
return chat;
}
#define NODE_PING_INTERVAL 10
static void ping_close(Group_Chat *chat)
{
uint32_t i;
uint64_t temp_time = unix_time();
for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) {
if (chat->close[i].last_recv < temp_time + BAD_NODE_TIMEOUT) {
int peernum = peer_in_chat(chat, chat->close[i].client_id);
if (peernum == -1)
continue;
if (chat->group[peernum].last_pinged + NODE_PING_INTERVAL < temp_time)
send_getnodes(chat, chat->close[i].ip_port, peernum);
}
}
}
void do_groupchat(Group_Chat *chat)
{
ping_close(chat);
}
void kill_groupchat(Group_Chat *chat)
{
@ -464,7 +529,7 @@ void kill_groupchat(Group_Chat *chat)
free(chat);
}
void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, int peernum)
void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, uint8_t *client_id)
{
send_getnodes(chat, ip_port, peernum);
send_getnodes(chat, ip_port, addpeer(chat, client_id));
}

View File

@ -76,7 +76,7 @@ void callback_groupmessage(Group_Chat *chat, void (*function)(Group_Chat *chat,
* Send a message to the group.
*
*/
uint32_t m_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length);
uint32_t group_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length);
/* Create a new group chat.
*
@ -93,6 +93,10 @@ Group_Chat *new_groupchat(Networking_Core *net);
*/
void kill_groupchat(Group_Chat *chat);
/*
* This is the main loop.
*/
void do_groupchat(Group_Chat *chat);
/* if we receive a group chat packet we call this function so it can be handled.
return 0 if packet is handled correctly.
@ -100,7 +104,7 @@ void kill_groupchat(Group_Chat *chat);
int handle_groupchatpacket(Group_Chat *chat, IP_Port source, uint8_t *packet, uint32_t length);
void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, int peernum);
void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, uint8_t *client_id);
#ifdef __cplusplus
}

View File

@ -1,18 +1,97 @@
#include "group_chats.h"
#define NUM_CHATS 8
#ifdef WIN32
#define c_sleep(x) Sleep(1*x)
#else
#define c_sleep(x) usleep(1000*x)
#endif
Group_Chat *chats[NUM_CHATS];
void print_close(Group_Close *close)
{
uint32_t i, j;
IP_Port p_ip;
printf("___________________CLOSE________________________________\n");
for (i = 0; i < GROUP_CLOSE_CONNECTIONS; i++) {
printf("ClientID: ");
for (j = 0; j < CLIENT_ID_SIZE; j++) {
printf("%02hhX", close[i].client_id[j]);
}
p_ip = close[i].ip_port;
printf("\nIP: %u.%u.%u.%u Port: %u", p_ip.ip.uint8[0], p_ip.ip.uint8[1], p_ip.ip.uint8[2], p_ip.ip.uint8[3],
ntohs(p_ip.port));
printf("\nTimestamp: %llu", (long long unsigned int) close[i].last_recv);
printf("\n");
}
}
void print_group(Group_Chat *chat)
{
uint32_t i, j;
printf("-----------------\nClientID: ");
for (j = 0; j < CLIENT_ID_SIZE; j++) {
printf("%02hhX", chat->self_public_key[j]);
}
printf("\n___________________GROUP________________________________\n");
for (i = 0; i < chat->numpeers; i++) {
printf("ClientID: ");
for (j = 0; j < CLIENT_ID_SIZE; j++) {
printf("%02hhX", chat->group[i].client_id[j]);
}
printf("\nTimestamp: %llu", (long long unsigned int) chat->group[i].last_recv);
printf("\nlast_pinged: %llu", (long long unsigned int) chat->group[i].last_pinged);
printf("\npingid: %llu", (long long unsigned int) chat->group[i].pingid);
printf("\n");
}
}
int main()
{
IP ip;
ip.uint32 = 0;
uint32_t i;
Group_Chat *chats[NUM_CHATS];
for (i = 0; i < NUM_CHATS; ++i) {
chats[i] = new_groupchat(new_networking(ip, 1234));
chats[i] = new_groupchat(new_networking(ip, 12745));
if (chats[i] == 0)
exit(1);
networking_registerhandler(chats[i]->net, 48, &handle_groupchatpacket, chats[i]);
}
printf("ok\n");
IP_Port ip_port;
ip_port.ip.uint32 = 0;
ip_port.ip.uint8[0] = 127;
ip_port.ip.uint8[3] = 1;
ip_port.port = htons(12745);
for (i = 0; i < NUM_CHATS; ++i) {
chat_bootstrap(chats[i], ip_port, chats[0]->self_public_key);
printf("%u\n", i);
}
while (1) {
for (i = 0; i < NUM_CHATS; ++i) {
networking_poll(chats[i]->net);
do_groupchat(chats[i]);
printf("%u\n", chats[i]->numpeers);
print_close(chats[i]->close);
print_group(chats[i]);
}
c_sleep(100);
}
return 0;