Started custom RTCP

This commit is contained in:
mannol 2015-04-13 01:45:53 +02:00
parent b2d88a4544
commit 2465f486ac
9 changed files with 500 additions and 447 deletions

View File

@ -1,5 +1,31 @@
/** av_test.c
*
* Copyright (C) 2013-2015 Tox project All Rights Reserved.
*
* This file is part of Tox.
*
* Tox is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Tox is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Tox. If not, see <http://www.gnu.org/licenses/>.
*
* Compile with (Linux only; in newly created directory toxcore/dir_name):
* gcc -o av_test ../toxav/av_test.c ../build/.libs/libtox*.a -lopencv_core \
* -lopencv_highgui -lopencv_imgproc -lsndfile -pthread -lvpx -lopus -lsodium -lportaudio
*/
#include "../toxav/toxav.h"
#include "../toxcore/tox.h"
#include "../toxcore/util.h"
/* Playing audio data */
#include <portaudio.h>
@ -11,7 +37,6 @@
#include <opencv/highgui.h>
#include <opencv/cvwimage.h>
#include <sys/stat.h>
#include <assert.h>
#include <stdio.h>
@ -63,8 +88,8 @@ struct toxav_thread_data {
int32_t sig;
};
const char* vdout = "AV Test";
PaStream* adout = NULL;
const char* vdout = "AV Test"; /* Video output */
PaStream* adout = NULL; /* Audio output */
const char* stringify_state(TOXAV_CALL_STATE s)
{
@ -230,7 +255,7 @@ void* iterate_toxav (void * data)
toxav_iterate(data_cast->BobAV);
int rc = MIN(toxav_iteration_interval(data_cast->AliceAV), toxav_iteration_interval(data_cast->BobAV));
// cvWaitKey(rc);
// cvWaitKey(10);
c_sleep(10);
}
@ -306,7 +331,26 @@ int print_help (const char* name)
int main (int argc, char** argv)
{
RingBuffer* rb = rb_new(4);
int a[5] = {0, 1, 2, 3, 4};
int* x;
rb_write(rb, a + 0);
rb_write(rb, a + 1);
rb_write(rb, a + 2);
rb_write(rb, a + 3);
// rb_write(rb, a + 4);
x = rb_write(rb, a + 4);
while (rb_read(rb, (void**) &x))
// rb_read(rb, (void**)&x);
printf("%d ", *x);
printf("\n");
// int r = 43;
// printf("%d\n", r >= 40 ? 3 : r / 10);
return 0;
Pa_Initialize();
struct stat st;
/* AV files for testing */
@ -395,53 +439,6 @@ int main (int argc, char** argv)
return 1;
}
if (0) {
SNDFILE* af_handle;
SF_INFO af_info;
/* Open audio file */
af_handle = sf_open(af_name, SFM_READ, &af_info);
if (af_handle == NULL) {
printf("Failed to open the file.\n");
exit(1);
}
int frame_size = (af_info.samplerate * audio_frame_duration / 1000) * af_info.channels;
struct PaStreamParameters output;
output.device = audio_out_dev_idx; /* default output device */
output.channelCount = af_info.channels;
output.sampleFormat = paInt16;
output.suggestedLatency = audio_dev->defaultHighOutputLatency;
output.hostApiSpecificStreamInfo = NULL;
PaError err = Pa_OpenStream(&adout, NULL, &output, af_info.samplerate, frame_size, paNoFlag, NULL, NULL);
assert(err == paNoError);
err = Pa_StartStream(adout);
assert(err == paNoError);
int16_t PCM[frame_size];
time_t start_time = time(NULL);
time_t expected_time = af_info.frames / af_info.samplerate + 2;
printf("Sample rate %d\n", af_info.samplerate);
while ( start_time + expected_time > time(NULL) ) {
int64_t count = sf_read_short(af_handle, PCM, frame_size);
if (count > 0) {
t_toxav_receive_audio_frame_cb(NULL, 0, PCM, count, af_info.channels, af_info.samplerate, NULL);
}
c_sleep(audio_frame_duration / 2);
}
Pa_Terminate();
return 0;
}
printf("Using audio device: %s\n", audio_dev->name);
printf("Using audio file: %s\n", af_name);
printf("Using video file: %s\n", vf_name);
@ -758,17 +755,15 @@ int main (int argc, char** argv)
printf("Sample rate %d\n", af_info.samplerate);
while ( start_time + expected_time > time(NULL) ) {
int64_t count = sf_read_short(af_handle, PCM, frame_size);
if (count > 0) {
TOXAV_ERR_SEND_FRAME rc;
if (toxav_send_audio_frame(AliceAV, 0, PCM, count/af_info.channels, af_info.channels, af_info.samplerate, &rc) == false) {
printf("Error sending frame of size %ld: %d\n", count, rc);
// exit(1);
}
}
iterate_tox(bootstrap, AliceAV, BobAV);
c_sleep(30);
c_sleep(53);
}
@ -794,6 +789,8 @@ int main (int argc, char** argv)
while(data.sig != 1)
pthread_yield();
Pa_StopStream(adout);
printf("Success!");
}
@ -890,5 +887,7 @@ int main (int argc, char** argv)
tox_kill(bootstrap);
printf("\nTest successful!\n");
Pa_Terminate();
return 0;
}

View File

@ -46,78 +46,13 @@
#define MAX_VIDEOFRAME_SIZE 0x40000 /* 256KiB */
#define VIDEOFRAME_HEADER_SIZE 0x2
/* FIXME: Might not be enough */
/* FIXME: Might not be enough? NOTE: I think it is enough */
#define VIDEO_DECODE_BUFFER_SIZE 20
#define ARRAY(TYPE__) struct { uint16_t size; TYPE__ data[]; }
typedef ARRAY(uint8_t) Payload;
typedef struct {
uint16_t size; /* Max size */
uint16_t start;
uint16_t end;
Payload **packets;
} PayloadBuffer;
static bool buffer_full(const PayloadBuffer *b)
{
return (b->end + 1) % b->size == b->start;
}
static bool buffer_empty(const PayloadBuffer *b)
{
return b->end == b->start;
}
static void buffer_write(PayloadBuffer *b, Payload *p)
{
b->packets[b->end] = p;
b->end = (b->end + 1) % b->size;
if (b->end == b->start) b->start = (b->start + 1) % b->size; /* full, overwrite */
}
static void buffer_read(PayloadBuffer *b, Payload **p)
{
*p = b->packets[b->start];
b->start = (b->start + 1) % b->size;
}
static void buffer_clear(PayloadBuffer *b)
{
while (!buffer_empty(b)) {
Payload *p;
buffer_read(b, &p);
free(p);
}
}
static PayloadBuffer *buffer_new(int size)
{
PayloadBuffer *buf = calloc(sizeof(PayloadBuffer), 1);
if (!buf) return NULL;
buf->size = size + 1; /* include empty elem */
if (!(buf->packets = calloc(buf->size, sizeof(Payload *)))) {
free(buf);
return NULL;
}
return buf;
}
static void buffer_free(PayloadBuffer *b)
{
if (b) {
buffer_clear(b);
free(b->packets);
free(b);
}
}
/* JITTER BUFFER WORK */
typedef struct JitterBuffer_s {
RTPMessage **queue;
@ -318,7 +253,7 @@ bool reconfigure_audio_decoder(CSession* cs, int32_t sampling_rate, int8_t chann
int status;
OpusDecoder* new_dec = opus_decoder_create(sampling_rate, channels, &status );
if ( status != OPUS_OK ) {
LOGGER_ERROR("Error while starting audio decoder: %s", opus_strerror(status));
LOGGER_ERROR("Error while starting audio decoder(%d %d): %s", sampling_rate, channels, opus_strerror(status));
return false;
}
@ -336,7 +271,6 @@ bool reconfigure_audio_decoder(CSession* cs, int32_t sampling_rate, int8_t chann
}
/* PUBLIC */
void cs_do(CSession *cs)
{
/* Codec session should always be protected by call mutex so no need to check for cs validity
@ -416,9 +350,9 @@ void cs_do(CSession *cs)
}
/********************* VIDEO *********************/
if (cs->vbuf_raw && !buffer_empty(cs->vbuf_raw)) {
if (cs->vbuf_raw && !rb_empty(cs->vbuf_raw)) {
/* Decode video */
buffer_read(cs->vbuf_raw, &p);
rb_read(cs->vbuf_raw, (void**)&p);
/* Leave space for (possibly) other thread to queue more data after we read it here */
LOGGED_UNLOCK(cs->queue_mutex);
@ -447,7 +381,6 @@ void cs_do(CSession *cs)
LOGGED_UNLOCK(cs->queue_mutex);
}
CSession *cs_new(uint32_t peer_video_frame_piece_size)
{
CSession *cs = calloc(sizeof(CSession), 1);
@ -510,7 +443,7 @@ CSession *cs_new(uint32_t peer_video_frame_piece_size)
goto AUDIO_DECODER_CLEANUP;
}
if ( !(cs->vbuf_raw = buffer_new(VIDEO_DECODE_BUFFER_SIZE)) ) {
if ( !(cs->vbuf_raw = rb_new(VIDEO_DECODE_BUFFER_SIZE)) ) {
free(cs->frame_buf);
vpx_codec_destroy(cs->v_decoder);
goto AUDIO_DECODER_CLEANUP;
@ -542,7 +475,7 @@ CSession *cs_new(uint32_t peer_video_frame_piece_size)
return cs;
VIDEO_DECODER_CLEANUP:
buffer_free(cs->vbuf_raw);
rb_free(cs->vbuf_raw);
free(cs->frame_buf);
vpx_codec_destroy(cs->v_decoder);
AUDIO_DECODER_CLEANUP:
@ -553,7 +486,6 @@ FAILURE:
free(cs);
return NULL;
}
void cs_kill(CSession *cs)
{
if (!cs)
@ -567,22 +499,21 @@ void cs_kill(CSession *cs)
vpx_codec_destroy(cs->v_decoder);
opus_encoder_destroy(cs->audio_encoder);
opus_decoder_destroy(cs->audio_decoder);
buffer_free(cs->vbuf_raw);
rb_free(cs->vbuf_raw);
jbuf_free(cs->j_buf);
free(cs->frame_buf);
free(cs->split_video_frame);
pthread_mutex_destroy(cs->queue_mutex);
LOGGER_DEBUG("Terminated codec state: %p", cs);
free(cs);
}
void cs_init_video_splitter_cycle(CSession* cs)
{
cs->split_video_frame[0] = cs->frameid_out++;
cs->split_video_frame[1] = 0;
}
int cs_update_video_splitter_cycle(CSession *cs, const uint8_t *payload, uint16_t length)
{
cs->processing_video_frame = payload;
@ -590,7 +521,6 @@ int cs_update_video_splitter_cycle(CSession *cs, const uint8_t *payload, uint16_
return ((length - 1) / VIDEOFRAME_PIECE_SIZE) + 1;
}
const uint8_t *cs_iterate_split_video_frame(CSession *cs, uint16_t *size)
{
if (!cs || !size) return NULL;
@ -616,9 +546,6 @@ const uint8_t *cs_iterate_split_video_frame(CSession *cs, uint16_t *size)
return cs->split_video_frame;
}
int cs_reconfigure_video_encoder(CSession* cs, int32_t bitrate, uint16_t width, uint16_t height)
{
vpx_codec_enc_cfg_t cfg = *cs->v_encoder[0].config.enc;
@ -637,7 +564,6 @@ int cs_reconfigure_video_encoder(CSession* cs, int32_t bitrate, uint16_t width,
return 0;
}
int cs_reconfigure_audio_encoder(CSession* cs, int32_t bitrate, int32_t sampling_rate, uint8_t channels)
{
/* Values are checked in toxav.c */
@ -667,8 +593,6 @@ int cs_reconfigure_audio_encoder(CSession* cs, int32_t bitrate, int32_t sampling
LOGGER_DEBUG ("Reconfigured audio encoder br: %d sr: %d cc:%d", bitrate, sampling_rate, channels);
return 0;
}
/* Called from RTP */
void queue_message(RTPSession *session, RTPMessage *msg)
{
@ -705,10 +629,10 @@ void queue_message(RTPSession *session, RTPMessage *msg)
if (p) {
LOGGED_LOCK(cs->queue_mutex);
if (buffer_full(cs->vbuf_raw)) {
if (rb_full(cs->vbuf_raw)) {
LOGGER_DEBUG("Dropped video frame");
Payload *tp;
buffer_read(cs->vbuf_raw, &tp);
rb_read(cs->vbuf_raw, (void**)&tp);
free(tp);
} else {
p->size = cs->frame_size;
@ -720,7 +644,7 @@ void queue_message(RTPSession *session, RTPMessage *msg)
cs->lcfd = t_lcfd > 100 ? cs->lcfd : t_lcfd;
cs->linfts = current_time_monotonic();
buffer_write(cs->vbuf_raw, p);
rb_write(cs->vbuf_raw, p);
LOGGED_UNLOCK(cs->queue_mutex);
} else {
LOGGER_WARNING("Allocation failed! Program might misbehave!");

View File

@ -25,6 +25,8 @@
#include "toxav.h"
#include "rtp.h"
#include "../toxcore/util.h"
#include <stdio.h>
#include <math.h>
#include <pthread.h>
@ -40,8 +42,6 @@
/* Audio encoding/decoding */
#include <opus.h>
#define PAIR(TYPE1__, TYPE2__) struct { TYPE1__ first; TYPE2__ second; }
#define PACKED_AUDIO_SIZE(x) (x + 5)
#define UNPACKED_AUDIO_SIZE(x) (x - 5)
@ -125,8 +125,4 @@ const uint8_t *cs_iterate_split_video_frame(CSession *cs, uint16_t *size);
int cs_reconfigure_video_encoder(CSession* cs, int32_t bitrate, uint16_t width, uint16_t height);
int cs_reconfigure_audio_encoder(CSession* cs, int32_t bitrate, int32_t sampling_rate, uint8_t channels);
/* Internal. Called from rtp_handle_message */
void queue_message(RTPSession *session, RTPMessage *msg);
#endif /* CODEC_H */

View File

@ -29,7 +29,7 @@
#include "../toxcore/Messenger.h"
/** Preconfigured value for video splitting */
#define VIDEOFRAME_PIECE_SIZE 500 /* 1.25 KiB*/
#define VIDEOFRAME_PIECE_SIZE 500
/**
* Error codes.
@ -42,7 +42,7 @@ typedef enum {
msi_EStrayMessage,
msi_ESystem,
msi_EHandle,
msi_EUndisclosed, /* NOTE: must be last enum otherwise parsing wont work */
msi_EUndisclosed, /* NOTE: must be last enum otherwise parsing will not work */
} MSIError;
/**

View File

@ -28,9 +28,9 @@
#include "rtp.h"
#include <stdlib.h>
void queue_message(RTPSession *_session, RTPMessage *_msg);
#define size_32 4
#define RTCP_REPORT_INTERVAL_MS 500
#define ADD_FLAG_VERSION(_h, _v) do { ( _h->flags ) &= 0x3F; ( _h->flags ) |= ( ( ( _v ) << 6 ) & 0xC0 ); } while(0)
#define ADD_FLAG_PADDING(_h, _v) do { if ( _v > 0 ) _v = 1; ( _h->flags ) &= 0xDF; ( _h->flags ) |= ( ( ( _v ) << 5 ) & 0x20 ); } while(0)
@ -46,24 +46,236 @@ void queue_message(RTPSession *_session, RTPMessage *_msg);
#define GET_SETTING_MARKER(_h) (( _h->marker_payloadt ) >> 7)
#define GET_SETTING_PAYLOAD(_h) ((_h->marker_payloadt) & 0x7f)
/**
* Checks if message came in late.
*/
static int check_late_message (RTPSession *session, RTPMessage *msg)
typedef struct {
uint64_t timestamp; /* in ms */
uint32_t packets_missing;
uint32_t expected_packets;
/* ... other stuff in the future */
} RTCPReport;
typedef struct RTCPSession_s {
uint8_t prefix;
uint64_t last_sent_report_ts;
uint32_t last_missing_packets;
uint32_t last_expected_packets;
RingBuffer* pl_stats; /* Packet loss stats over time */
} RTCPSession;
/* queue_message() is defined in codec.c */
void queue_message(RTPSession *session, RTPMessage *msg);
RTPHeader *parse_header_in ( const uint8_t *payload, int length );
RTPExtHeader *parse_ext_header_in ( const uint8_t *payload, uint16_t length );
RTPMessage *msg_parse ( const uint8_t *data, int length );
uint8_t *parse_header_out ( const RTPHeader* header, uint8_t* payload );
uint8_t *parse_ext_header_out ( const RTPExtHeader* header, uint8_t* payload );
void build_header ( RTPSession* session, RTPHeader* header );
void send_rtcp_report ( RTCPSession* session, Messenger* m, int32_t friendnumber );
int handle_rtp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object );
int handle_rtcp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object );
RTPSession *rtp_new ( int payload_type, Messenger *messenger, int friend_num )
{
/*
* Check Sequence number. If this new msg has lesser number then the session->rsequnum
* it shows that the message came in late. Also check timestamp to be 100% certain.
*
*/
return ( msg->header->sequnum < session->rsequnum && msg->header->timestamp < session->timestamp ) ? 0 : -1;
RTPSession *retu = calloc(1, sizeof(RTPSession));
if ( !retu ) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
return NULL;
}
retu->version = RTP_VERSION; /* It's always 2 */
retu->padding = 0; /* If some additional data is needed about the packet */
retu->extension = 0; /* If extension to header is needed */
retu->cc = 1; /* Amount of contributors */
retu->csrc = NULL; /* Container */
retu->ssrc = random_int();
retu->marker = 0;
retu->payload_type = payload_type % 128;
retu->m = messenger;
retu->dest = friend_num;
retu->rsequnum = retu->sequnum = 0;
retu->ext_header = NULL; /* When needed allocate */
if ( !(retu->csrc = calloc(1, sizeof(uint32_t))) ) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
free(retu);
return NULL;
}
retu->csrc[0] = retu->ssrc; /* Set my ssrc to the list receive */
/* Also set payload type as prefix */
retu->prefix = payload_type;
/* Initialize rtcp session */
if (!(retu->rtcp = calloc(1, sizeof(RTCPSession)))) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
free(retu->csrc);
free(retu);
return NULL;
}
retu->rtcp->prefix = 222 + payload_type % 192;
retu->rtcp->pl_stats = rb_new(4);
return retu;
}
void rtp_kill ( RTPSession *session )
{
if ( !session ) return;
rtp_stop_receiving (session);
free ( session->ext_header );
free ( session->csrc );
void* t;
while (!rb_empty(session->rtcp->pl_stats)) {
rb_read(session->rtcp->pl_stats, (void**) &t);
free(t);
}
rb_free(session->rtcp->pl_stats);
LOGGER_DEBUG("Terminated RTP session: %p", session);
/* And finally free session */
free ( session );
}
void rtp_do(RTPSession *session)
{
if (!session || !session->rtcp)
return;
if (current_time_monotonic() - session->rtcp->last_sent_report_ts >= RTCP_REPORT_INTERVAL_MS) {
send_rtcp_report(session->rtcp, session->m, session->dest);
}
if (rb_full(session->rtcp->pl_stats)) {
RTCPReport* reports[4];
int i = 0;
for (; rb_read(session->rtcp->pl_stats, (void**) reports + i); i++);
/* Check for timed out reports (> 6 sec) */
uint64_t now = current_time_monotonic();
for (i = 0; i < 4 && now - reports[i]->timestamp < 6000; i ++);
for (; i < 4; i ++) {
rb_write(session->rtcp->pl_stats, reports[i]);
reports[i] = NULL;
}
if (!rb_empty(session->rtcp->pl_stats)) {
for (i = 0; reports[i] != NULL; i ++)
free(reports[i]);
return; /* As some reports are timed out, we need more... */
}
/* We have 4 on-time reports so we can proceed */
uint32_t quality_loss = 0;
for (i = 0; i < 4; i++) {
uint32_t idx = reports[i]->packets_missing * 100 / reports[i]->expected_packets;
quality_loss += idx;
}
if (quality_loss > 40) {
LOGGER_DEBUG("Packet loss detected");
}
}
}
int rtp_start_receiving(RTPSession* session)
{
if (session == NULL)
return -1;
if (custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix,
handle_rtp_packet, session) == -1) {
LOGGER_WARNING("Failed to register rtp receive handler");
return -1;
}
if (custom_lossy_packet_registerhandler(session->m, session->dest, session->rtcp->prefix,
handle_rtcp_packet, session->rtcp) == -1) {
LOGGER_WARNING("Failed to register rtcp receive handler");
custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, NULL, NULL);
return -1;
}
return 0;
}
int rtp_stop_receiving(RTPSession* session)
{
if (session == NULL)
return -1;
custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, NULL, NULL);
custom_lossy_packet_registerhandler(session->m, session->dest, session->rtcp->prefix, NULL, NULL); /* RTCP */
return 0;
}
int rtp_send_msg ( RTPSession *session, const uint8_t *data, uint16_t length )
{
if ( !session ) {
LOGGER_WARNING("No session!");
return -1;
}
uint8_t parsed[MAX_RTP_SIZE];
uint8_t *it;
RTPHeader header[1];
build_header(session, header);
uint32_t parsed_len = length + header->length + 1;
parsed[0] = session->prefix;
it = parse_header_out ( header, parsed + 1 );
if ( session->ext_header ) {
parsed_len += ( 4 /* Minimum ext header len */ + session->ext_header->length * size_32 );
it = parse_ext_header_out ( session->ext_header, it );
}
memcpy ( it, data, length );
if ( -1 == send_custom_lossy_packet(session->m, session->dest, parsed, parsed_len) ) {
LOGGER_WARNING("Failed to send full packet (len: %d)! std error: %s", length, strerror(errno));
return -1;
}
/* Set sequ number */
session->sequnum = session->sequnum >= MAX_SEQU_NUM ? 0 : session->sequnum + 1;
return 0;
}
void rtp_free_msg ( RTPSession *session, RTPMessage *msg )
{
if ( !session ) {
if ( msg->ext_header ) {
free ( msg->ext_header->table );
free ( msg->ext_header );
}
} else {
if ( msg->ext_header && session->ext_header != msg->ext_header ) {
free ( msg->ext_header->table );
free ( msg->ext_header );
}
}
free ( msg->header );
free ( msg );
}
/**
* Extracts header from payload.
*/
RTPHeader *extract_header ( const uint8_t *payload, int length )
RTPHeader *parse_header_in ( const uint8_t *payload, int length )
{
if ( !payload || !length ) {
LOGGER_WARNING("No payload to extract!");
@ -111,8 +323,6 @@ RTPHeader *extract_header ( const uint8_t *payload, int length )
return NULL;
}
memset(retu->csrc, 0, 16 * sizeof (uint32_t));
retu->marker_payloadt = *it;
++it;
retu->length = total;
@ -125,7 +335,6 @@ RTPHeader *extract_header ( const uint8_t *payload, int length )
retu->ssrc = ntohl(retu->ssrc);
uint8_t x;
for ( x = 0; x < cc; x++ ) {
it += 4;
memcpy(&retu->csrc[x], it, sizeof(retu->csrc[x]));
@ -134,11 +343,7 @@ RTPHeader *extract_header ( const uint8_t *payload, int length )
return retu;
}
/**
* Extracts external header from payload. Must be called AFTER extract_header()!
*/
RTPExtHeader *extract_ext_header ( const uint8_t *payload, uint16_t length )
RTPExtHeader *parse_ext_header_in ( const uint8_t *payload, uint16_t length )
{
const uint8_t *it = payload;
@ -182,11 +387,47 @@ RTPExtHeader *extract_ext_header ( const uint8_t *payload, uint16_t length )
return retu;
}
RTPMessage *msg_parse ( const uint8_t *data, int length )
{
RTPMessage *retu = calloc(1, sizeof (RTPMessage));
/**
* Adds header to payload. Make sure _payload_ has enough space.
*/
uint8_t *add_header ( RTPHeader *header, uint8_t *payload )
retu->header = parse_header_in ( data, length ); /* It allocates memory and all */
if ( !retu->header ) {
LOGGER_WARNING("Header failed to extract!");
free(retu);
return NULL;
}
uint16_t from_pos = retu->header->length;
retu->length = length - from_pos;
if ( GET_FLAG_EXTENSION ( retu->header ) ) {
retu->ext_header = parse_ext_header_in ( data + from_pos, length );
if ( retu->ext_header ) {
retu->length -= ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 );
from_pos += ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 );
} else { /* Error */
LOGGER_WARNING("Ext Header failed to extract!");
rtp_free_msg(NULL, retu);
return NULL;
}
} else {
retu->ext_header = NULL;
}
if ( length - from_pos <= MAX_RTP_SIZE )
memcpy ( retu->data, data + from_pos, length - from_pos );
else {
LOGGER_WARNING("Invalid length!");
rtp_free_msg(NULL, retu);
return NULL;
}
return retu;
}
uint8_t *parse_header_out ( const RTPHeader *header, uint8_t *payload )
{
uint8_t cc = GET_FLAG_CSRCC ( header );
uint8_t *it = payload;
@ -223,11 +464,7 @@ uint8_t *add_header ( RTPHeader *header, uint8_t *payload )
return it + 4;
}
/**
* Adds extension header to payload. Make sure _payload_ has enough space.
*/
uint8_t *add_ext_header ( RTPExtHeader *header, uint8_t *payload )
uint8_t *parse_ext_header_out ( const RTPExtHeader *header, uint8_t *payload )
{
uint8_t *it = payload;
uint16_t length;
@ -242,9 +479,7 @@ uint8_t *add_ext_header ( RTPExtHeader *header, uint8_t *payload )
it -= 2; /* Return to 0 position */
if ( header->table ) {
uint16_t x;
for ( x = 0; x < header->length; x++ ) {
it += 4;
entry = htonl(header->table[x]);
@ -254,92 +489,45 @@ uint8_t *add_ext_header ( RTPExtHeader *header, uint8_t *payload )
return it + 4;
}
/**
* Builds header from control session values.
*/
RTPHeader *build_header ( RTPSession *session )
void build_header ( RTPSession *session, RTPHeader *header )
{
RTPHeader *retu = calloc ( 1, sizeof (RTPHeader) );
ADD_FLAG_VERSION ( header, session->version );
ADD_FLAG_PADDING ( header, session->padding );
ADD_FLAG_EXTENSION ( header, session->extension );
ADD_FLAG_CSRCC ( header, session->cc );
ADD_SETTING_MARKER ( header, session->marker );
ADD_SETTING_PAYLOAD ( header, session->payload_type );
if ( !retu ) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
return NULL;
}
ADD_FLAG_VERSION ( retu, session->version );
ADD_FLAG_PADDING ( retu, session->padding );
ADD_FLAG_EXTENSION ( retu, session->extension );
ADD_FLAG_CSRCC ( retu, session->cc );
ADD_SETTING_MARKER ( retu, session->marker );
ADD_SETTING_PAYLOAD ( retu, session->payload_type );
retu->sequnum = session->sequnum;
retu->timestamp = current_time_monotonic(); /* milliseconds */
retu->ssrc = session->ssrc;
header->sequnum = session->sequnum;
header->timestamp = current_time_monotonic(); /* milliseconds */
header->ssrc = session->ssrc;
int i;
for ( i = 0; i < session->cc; i++ )
retu->csrc[i] = session->csrc[i];
header->csrc[i] = session->csrc[i];
retu->length = 12 /* Minimum header len */ + ( session->cc * size_32 );
return retu;
header->length = 12 /* Minimum header len */ + ( session->cc * size_32 );
}
/**
* Parses data into RTPMessage struct. Stores headers separately from the payload data
* and so the length variable is set accordingly.
*/
RTPMessage *msg_parse ( const uint8_t *data, int length )
void send_rtcp_report(RTCPSession* session, Messenger* m, int32_t friendnumber)
{
RTPMessage *retu = calloc(1, sizeof (RTPMessage));
retu->header = extract_header ( data, length ); /* It allocates memory and all */
if ( !retu->header ) {
LOGGER_WARNING("Header failed to extract!");
free(retu);
return NULL;
}
uint16_t from_pos = retu->header->length;
retu->length = length - from_pos;
if ( GET_FLAG_EXTENSION ( retu->header ) ) {
retu->ext_header = extract_ext_header ( data + from_pos, length );
if ( retu->ext_header ) {
retu->length -= ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 );
from_pos += ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 );
} else { /* Error */
LOGGER_WARNING("Ext Header failed to extract!");
rtp_free_msg(NULL, retu);
return NULL;
}
} else {
retu->ext_header = NULL;
}
if ( length - from_pos <= MAX_RTP_SIZE )
memcpy ( retu->data, data + from_pos, length - from_pos );
else {
LOGGER_WARNING("Invalid length!");
rtp_free_msg(NULL, retu);
return NULL;
}
return retu;
if (session->last_expected_packets == 0)
return;
uint8_t parsed[9];
parsed[0] = session->prefix;
uint32_t packets_missing = htonl(session->last_missing_packets);
uint32_t expected_packets = htonl(session->last_expected_packets);
memcpy(parsed + 1, &packets_missing, 4);
memcpy(parsed + 5, &expected_packets, 4);
if (-1 == send_custom_lossy_packet(m, friendnumber, parsed, sizeof(parsed)))
LOGGER_WARNING("Failed to send full packet (len: %d)! std error: %s", sizeof(parsed), strerror(errno));
else
session->last_sent_report_ts = current_time_monotonic();
}
/**
* Callback for networking core.
*/
int rtp_handle_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object )
int handle_rtp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object )
{
RTPSession *session = object;
RTPMessage *msg;
@ -357,178 +545,37 @@ int rtp_handle_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data,
}
/* Check if message came in late */
if ( check_late_message(session, msg) < 0 ) { /* Not late */
if ( msg->header->sequnum > session->rsequnum && msg->header->timestamp > session->rtimestamp ) {
/* Not late */
session->rsequnum = msg->header->sequnum;
session->timestamp = msg->header->timestamp;
session->rtimestamp = msg->header->timestamp;
}
queue_message(session, msg);
return 0;
}
/**
* Allocate message and store data there
*/
RTPMessage *rtp_new_message ( RTPSession *session, const uint8_t *data, uint32_t length )
int handle_rtcp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object )
{
if ( !session ) {
LOGGER_WARNING("No session!");
return NULL;
}
uint8_t *from_pos;
RTPMessage *retu = calloc(1, sizeof (RTPMessage));
if ( !retu ) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
return NULL;
}
/* Sets header values and copies the extension header in retu */
retu->header = build_header ( session ); /* It allocates memory and all */
retu->ext_header = session->ext_header;
uint32_t total_length = length + retu->header->length + 1;
retu->data[0] = session->prefix;
if ( retu->ext_header ) {
total_length += ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 );
from_pos = add_header ( retu->header, retu->data + 1 );
from_pos = add_ext_header ( retu->ext_header, from_pos + 1 );
} else {
from_pos = add_header ( retu->header, retu->data + 1 );
}
/*
* Parses the extension header into the message
* Of course if any
*/
/* Appends data on to retu->data */
memcpy ( from_pos, data, length );
retu->length = total_length;
return retu;
}
RTPSession *rtp_new ( int payload_type, Messenger *messenger, int friend_num )
{
RTPSession *retu = calloc(1, sizeof(RTPSession));
if ( !retu ) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
return NULL;
}
retu->version = RTP_VERSION; /* It's always 2 */
retu->padding = 0; /* If some additional data is needed about the packet */
retu->extension = 0; /* If extension to header is needed */
retu->cc = 1; /* Amount of contributors */
retu->csrc = NULL; /* Container */
retu->ssrc = random_int();
retu->marker = 0;
retu->payload_type = payload_type % 128;
retu->dest = friend_num;
retu->rsequnum = retu->sequnum = 0;
retu->ext_header = NULL; /* When needed allocate */
if ( !(retu->csrc = calloc(1, sizeof (uint32_t))) ) {
LOGGER_WARNING("Alloc failed! Program might misbehave!");
free(retu);
return NULL;
}
retu->csrc[0] = retu->ssrc; /* Set my ssrc to the list receive */
/* Also set payload type as prefix */
retu->prefix = payload_type;
retu->m = messenger;
/*
*
*/
return retu;
}
void rtp_kill ( RTPSession *session )
{
if ( !session ) return;
rtp_stop_receiving (session);
free ( session->ext_header );
free ( session->csrc );
LOGGER_DEBUG("Terminated RTP session: %p", session);
/* And finally free session */
free ( session );
}
int rtp_start_receiving(RTPSession* session)
{
if (session == NULL)
return 0;
LOGGER_DEBUG("Registering packet handler: pt: %d; friend: %d", session->prefix, session->dest);
return custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix,
rtp_handle_packet, session);
}
int rtp_stop_receiving(RTPSession* session)
{
if (session == NULL)
return 0;
LOGGER_DEBUG("Unregistering packet handler: pt: %d; friend: %d", session->prefix, session->dest);
return custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix,
NULL, NULL);
}
int rtp_send_msg ( RTPSession *session, const uint8_t *data, uint16_t length )
{
RTPMessage *msg = rtp_new_message (session, data, length);
if ( !msg ) return -1;
if ( -1 == send_custom_lossy_packet(session->m, session->dest, msg->data, msg->length) ) {
LOGGER_WARNING("Failed to send full packet (len: %d)! std error: %s", length, strerror(errno));
rtp_free_msg ( session, msg );
if (length < 9)
return -1;
RTCPSession* session = object;
RTCPReport* report = malloc(sizeof(RTCPReport));
memcpy(&report->packets_missing, data + 1, 4);
memcpy(&report->expected_packets, data + 5, 4);
report->packets_missing = ntohl(report->packets_missing);
report->expected_packets = ntohl(report->expected_packets);
/* This would cause undefined behaviour */
if (report->expected_packets == 0) {
free(report);
return 0;
}
report->timestamp = current_time_monotonic();
/* Set sequ number */
session->sequnum = session->sequnum >= MAX_SEQU_NUM ? 0 : session->sequnum + 1;
rtp_free_msg ( session, msg );
free(rb_write(session->pl_stats, report));
return 0;
}
void rtp_free_msg ( RTPSession *session, RTPMessage *msg )
{
if ( !session ) {
if ( msg->ext_header ) {
free ( msg->ext_header->table );
free ( msg->ext_header );
}
} else {
if ( msg->ext_header && session->ext_header != msg->ext_header ) {
free ( msg->ext_header->table );
free ( msg->ext_header );
}
}
free ( msg->header );
free ( msg );
}
}

View File

@ -23,8 +23,6 @@
#define RTP_H
#define RTP_VERSION 2
#include <inttypes.h>
// #include <pthread.h>
#include "../toxcore/Messenger.h"
@ -51,8 +49,8 @@ typedef enum {
rtp_TypeVideo
} RTPPayloadType;
/**
* Standard rtp header
/**
* Standard rtp header.
*/
typedef struct {
uint8_t flags; /* Version(2),Padding(1), Ext(1), Cc(4) */
@ -62,17 +60,14 @@ typedef struct {
uint32_t ssrc; /* SSRC */
uint32_t csrc[16]; /* CSRC's table */
uint32_t length; /* Length of the header in payload string. */
} RTPHeader;
/**
/**
* Standard rtp extension header.
*/
typedef struct {
uint16_t type; /* Extension profile */
uint16_t length; /* Number of extensions */
uint32_t *table; /* Extension's table */
} RTPExtHeader;
/**
@ -90,31 +85,32 @@ typedef struct {
* RTP control session.
*/
typedef struct {
uint8_t version;
uint8_t padding;
uint8_t extension;
uint8_t cc;
uint8_t marker;
uint8_t payload_type;
uint16_t sequnum; /* Set when sending */
uint16_t rsequnum; /* Check when recving msg */
uint32_t timestamp;
uint32_t ssrc;
uint32_t *csrc;
uint8_t version;
uint8_t padding;
uint8_t extension;
uint8_t cc;
uint8_t marker;
uint8_t payload_type;
uint16_t sequnum; /* Sending sequence number */
uint16_t rsequnum; /* Receiving sequence number */
uint32_t rtimestamp;
uint32_t ssrc;
uint32_t *csrc;
/* If some additional data must be sent via message
* apply it here. Only by allocating this member you will be
* automatically placing it within a message.
*/
RTPExtHeader *ext_header;
RTPExtHeader *ext_header;
/* Msg prefix for core to know when recving */
uint8_t prefix;
uint8_t prefix;
int dest;
int dest;
struct CSession_s *cs;
Messenger *m;
struct RTCPSession_s *rtcp;
struct CSession_s *cs;
Messenger *m;
} RTPSession;
@ -128,6 +124,11 @@ RTPSession *rtp_new ( int payload_type, Messenger *messenger, int friend_num );
*/
void rtp_kill ( RTPSession* session );
/**
* Do periodical rtp work.
*/
void rtp_do(RTPSession *session);
/**
* By default rtp is not in receiving state
*/

View File

@ -220,6 +220,9 @@ void toxav_iterate(ToxAV* av)
LOGGED_UNLOCK(av->mutex);
cs_do(i->cs);
rtp_do(i->rtps[0]);
rtp_do(i->rtps[1]);
if (i->last_self_capabilities & msi_CapRAudio) /* Receiving audio */
rc = MIN(i->cs->last_packet_frame_duration, rc);
if (i->last_self_capabilities & msi_CapRVideo) /* Receiving video */

View File

@ -185,3 +185,76 @@ int create_recursive_mutex(pthread_mutex_t *mutex)
return 0;
}
struct RingBuffer {
uint16_t size; /* Max size */
uint16_t start;
uint16_t end;
void **data;
};
bool rb_full(const RingBuffer *b)
{
return (b->end + 1) % b->size == b->start;
}
bool rb_empty(const RingBuffer *b)
{
return b->end == b->start;
}
void* rb_write(RingBuffer *b, void *p)
{
void* rc = NULL;
if ((b->end + 1) % b->size == b->start) /* full */
rc = b->data[b->start];
b->data[b->end] = p;
b->end = (b->end + 1) % b->size;
if (b->end == b->start)
b->start = (b->start + 1) % b->size;
return rc;
}
bool rb_read(RingBuffer *b, void **p)
{
if (b->end == b->start) { /* Empty */
*p = NULL;
return false;
}
*p = b->data[b->start];
b->start = (b->start + 1) % b->size;
return true;
}
void rb_clear(RingBuffer *b)
{
while (!rb_empty(b)) {
void *p;
rb_read(b, &p);
free(p);
}
}
RingBuffer *rb_new(int size)
{
RingBuffer *buf = calloc(sizeof(RingBuffer), 1);
if (!buf) return NULL;
buf->size = size + 1; /* include empty elem */
if (!(buf->data = calloc(buf->size, sizeof(void *)))) {
free(buf);
return NULL;
}
return buf;
}
void rb_free(RingBuffer *b)
{
if (b) {
rb_clear(b);
free(b->data);
free(b);
}
}

View File

@ -30,6 +30,7 @@
#include <pthread.h>
#define MIN(a,b) (((a)<(b))?(a):(b))
#define PAIR(TYPE1__, TYPE2__) struct { TYPE1__ first; TYPE2__ second; }
void unix_time_update();
uint64_t unix_time();
@ -56,4 +57,13 @@ int load_state(load_state_callback_func load_state_callback, void *outer,
/* Returns -1 if failed or 0 if success */
int create_recursive_mutex(pthread_mutex_t *mutex);
/* Ring buffer */
typedef struct RingBuffer RingBuffer;
bool rb_full(const RingBuffer *b);
bool rb_empty(const RingBuffer *b);
void* rb_write(RingBuffer* b, void* p);
bool rb_read(RingBuffer* b, void** p);
void rb_clear(RingBuffer *b);
RingBuffer *rb_new(int size);
void rb_free(RingBuffer *b);
#endif /* __UTIL_H__ */