Merge branch 'notsecure-split-video'

This commit is contained in:
irungentoo 2014-08-01 13:21:03 -04:00
commit 055640611f
No known key found for this signature in database
GPG Key ID: 10349DC9BED89E98
2 changed files with 258 additions and 33 deletions

View File

@ -127,6 +127,7 @@ static void callback_audio(ToxAv *av, int32_t call_index, int16_t *data, int len
static void callback_video(ToxAv *av, int32_t call_index, vpx_image_t *img)
{
}
void register_callbacks(ToxAv *av, void *data)
{
toxav_register_callstate_callback(av, callback_call_started, av_OnStart, data);

View File

@ -90,6 +90,14 @@ typedef struct _CallSpecific {
pthread_mutex_t mutex;
} CallSpecific;
typedef struct {
int32_t call_index;
uint32_t size;
uint8_t data[0];
} DECODE_PACKET;
#define VIDEO_DECODE_QUEUE_SIZE 2
#define AUDIO_DECODE_QUEUE_SIZE 8
struct _ToxAv {
Messenger *messenger;
@ -100,8 +108,18 @@ struct _ToxAv {
void (*video_callback)(ToxAv *, int32_t, vpx_image_t *);
uint32_t max_calls;
/* used in the "decode on another thread" system */
volatile _Bool exit, decoding;
uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write;
pthread_mutex_t decode_cond_mutex;
pthread_cond_t decode_cond;
DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE];
DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE];
};
static void *toxav_decoding(void *arg);
static MSICSettings msicsettings_cast (const ToxAvCSettings *from)
{
MSICSettings csettings;
@ -162,6 +180,12 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
av->calls = calloc(sizeof(CallSpecific), max_calls);
av->max_calls = max_calls;
pthread_mutex_init(&av->decode_cond_mutex, NULL);
pthread_cond_init(&av->decode_cond, NULL);
pthread_t temp;
pthread_create(&temp, NULL, toxav_decoding, av);
return av;
}
@ -173,9 +197,39 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
*/
void toxav_kill ( ToxAv *av )
{
int i = 0;
int i;
DECODE_PACKET *p;
for (; i < av->max_calls; i ++) {
av->exit = 1;
pthread_mutex_lock(&av->decode_cond_mutex);
pthread_cond_signal(&av->decode_cond);
if (av->exit) {
pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex);
}
pthread_mutex_unlock(&av->decode_cond_mutex);
pthread_mutex_destroy(&av->decode_cond_mutex);
pthread_cond_destroy(&av->decode_cond);
for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
p = av->video_decode_queue[i];
if (p) {
free(p);
}
}
for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
p = av->audio_decode_queue[i];
if (p) {
free(p);
}
}
for (i = 0; i < av->max_calls; i ++) {
if ( av->calls[i].crtps[audio_index] )
rtp_terminate_session(av->calls[i].crtps[audio_index], av->msi_session->messenger_handle);
@ -514,9 +568,37 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index )
call->crtps[video_index] = NULL;
terminate_queue(call->j_buf);
call->j_buf = NULL;
pthread_mutex_lock(&av->decode_cond_mutex);
int i;
DECODE_PACKET *p;
for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
p = av->video_decode_queue[i];
if (p && p->call_index == call_index) {
free(p);
av->video_decode_queue[i] = NULL;
}
}
for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
p = av->audio_decode_queue[i];
if (p && p->call_index == call_index) {
free(p);
av->audio_decode_queue[i] = NULL;
}
}
while (av->decoding) {} //use a pthread condition?
codec_terminate_session(call->cs);
call->cs = NULL;
pthread_mutex_unlock(&av->decode_cond_mutex);
pthread_mutex_unlock(&call->mutex);
pthread_mutex_destroy(&call->mutex);
@ -835,6 +917,120 @@ int toxav_has_activity(ToxAv *av, int32_t call_index, int16_t *PCM, uint16_t fra
return energy_VAD(av->calls[call_index].cs, PCM, frame_size, ref_energy);
}
static void decode_video(ToxAv *av, DECODE_PACKET *p)
{
CallSpecific *call = &av->calls[p->call_index];
int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US);
if (rc != VPX_CODEC_OK) {
LOGGER_ERROR("Error decoding video: %u %s\n", i, vpx_codec_err_to_string(rc));
}
vpx_codec_iter_t iter = NULL;
vpx_image_t *img;
img = vpx_codec_get_frame(&call->cs->v_decoder, &iter);
if (img && av->video_callback) {
av->video_callback(av, p->call_index, img);
} else {
LOGGER_WARNING("Video packet dropped due to missing callback or no image!");
}
free(p);
}
static void decode_audio(ToxAv *av, DECODE_PACKET *p)
{
int32_t call_index = p->call_index;
CallSpecific *call = &av->calls[call_index];
// ToxAvCSettings csettings;
// toxav_get_peer_csettings(av, call_index, 0, &csettings);
int frame_size = 10000; /* FIXME: not static? */
int16_t dest[frame_size];
int dec_size = opus_decode(call->cs->audio_decoder, p->data, p->size, dest, frame_size, (p->size == 0));
free(p);
if (dec_size < 0) {
LOGGER_WARNING("Decoding error: %s", opus_strerror(dec_size));
return;
}
if ( av->audio_callback )
av->audio_callback(av, call_index, dest, dec_size);
else
LOGGER_WARNING("Audio packet dropped due to missing callback!");
}
static void *toxav_decoding(void *arg)
{
ToxAv *av = arg;
while (!av->exit) {
DECODE_PACKET *p;
_Bool video = 0;
av->decoding = 0;
pthread_mutex_lock(&av->decode_cond_mutex);
uint8_t r;
/* first check for available packets, otherwise wait for condition*/
r = av->audio_decode_read;
p = av->audio_decode_queue[r];
if (!p) {
r = av->video_decode_read;
p = av->video_decode_queue[r];
if (!p) {
pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex);
r = av->audio_decode_read;
p = av->audio_decode_queue[r];
if (!p) {
r = av->video_decode_read;
p = av->video_decode_queue[r];
video = 1;
}
} else {
video = 1;
}
}
if (video) {
if (p) {
av->video_decode_queue[r] = NULL;
av->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE;
}
} else {
av->audio_decode_queue[r] = NULL;
av->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE;
}
av->decoding = 1;
pthread_mutex_unlock(&av->decode_cond_mutex);
if (p) {
if (video) {
decode_video(av, p);
} else {
decode_audio(av, p);
}
}
}
pthread_mutex_lock(&av->decode_cond_mutex);
av->exit = 0;
pthread_cond_signal(&av->decode_cond);
pthread_mutex_unlock(&av->decode_cond_mutex);
return NULL;
}
void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
{
ToxAv *av = _session->av;
@ -846,31 +1042,48 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
if (_session->payload_type == type_audio % 128) {
queue(call->j_buf, _msg);
int success = 0, dec_size;
ToxAvCSettings csettings;
toxav_get_peer_csettings(av, call_index, 0, &csettings);
int frame_size = 10000; /* FIXME: not static? */
int16_t dest[frame_size];
int success = 0;
while ((_msg = dequeue(call->j_buf, &success)) || success == 2) {
DECODE_PACKET *p;
if (success == 2) {
dec_size = opus_decode(call->cs->audio_decoder, NULL, 0, dest, frame_size, 1);
p = malloc(sizeof(DECODE_PACKET));
if (p) {
p->call_index = call_index;
p->size = 0;
}
} else {
dec_size = opus_decode(call->cs->audio_decoder, _msg->data, _msg->length, dest, frame_size, 0);
p = malloc(sizeof(DECODE_PACKET) + _msg->length);
if (p) {
p->call_index = call_index;
p->size = _msg->length;
memcpy(p->data, _msg->data, _msg->length);
}
rtp_free_msg(NULL, _msg);
}
if (dec_size < 0) {
LOGGER_WARNING("Decoding error: %s", opus_strerror(dec_size));
continue;
}
if (p) {
/* do the decoding on another thread */
pthread_mutex_lock(&av->decode_cond_mutex);
uint8_t w = av->audio_decode_write;
if ( av->audio_callback )
av->audio_callback(av, call_index, dest, dec_size);
else
LOGGER_WARNING("Audio packet dropped due to missing callback!");
if (av->audio_decode_queue[w] == NULL) {
av->audio_decode_queue[w] = p;
av->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE;
pthread_cond_signal(&av->decode_cond);
} else {
printf("dropped audio frame\n");
free(p);
}
pthread_mutex_unlock(&av->decode_cond_mutex);
} else {
//malloc failed
}
}
} else {
@ -887,14 +1100,34 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
/* piece of current frame */
} else if (i > 0 && i < 128) {
/* recieved a piece of a frame ahead, flush current frame and start reading this new frame */
int rc = vpx_codec_decode(&call->cs->v_decoder, call->frame_buf, call->frame_limit, NULL, MAX_DECODE_TIME_US);
DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit);
if (p) {
p->call_index = call_index;
p->size = call->frame_limit;
memcpy(p->data, call->frame_buf, call->frame_limit);
/* do the decoding on another thread */
pthread_mutex_lock(&av->decode_cond_mutex);
uint8_t w = av->video_decode_write;
if (av->video_decode_queue[w] == NULL) {
av->video_decode_queue[w] = p;
av->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE;
pthread_cond_signal(&av->decode_cond);
} else {
printf("dropped video frame\n");
free(p);
}
pthread_mutex_unlock(&av->decode_cond_mutex);
} else {
//malloc failed
}
call->frame_id = packet[0];
memset(call->frame_buf, 0, call->frame_limit);
call->frame_limit = 0;
if (rc != VPX_CODEC_OK) {
LOGGER_ERROR("Error decoding video: %u %s\n", i, vpx_codec_err_to_string(rc));
}
} else {
/* old packet, dont read */
LOGGER_DEBUG("Old packet: %u\n", i);
@ -919,15 +1152,6 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
end:
;
vpx_codec_iter_t iter = NULL;
vpx_image_t *img;
img = vpx_codec_get_frame(&call->cs->v_decoder, &iter);
if (img && av->video_callback) {
av->video_callback(av, call_index, img);
} else
LOGGER_WARNING("Video packet dropped due to missing callback or no image!");
rtp_free_msg(NULL, _msg);
}
}