Merge branch 'notsecure-split-video'

This commit is contained in:
irungentoo 2014-08-04 12:08:43 -04:00
commit 02ed20dc39
No known key found for this signature in database
GPG Key ID: 10349DC9BED89E98

View File

@ -71,6 +71,14 @@ const uint32_t av_VADd = 40;
static const uint8_t audio_index = 0, video_index = 1; static const uint8_t audio_index = 0, video_index = 1;
typedef struct {
uint32_t size;
uint8_t data[0];
} DECODE_PACKET;
#define VIDEO_DECODE_QUEUE_SIZE 2
#define AUDIO_DECODE_QUEUE_SIZE 16
typedef struct _CallSpecific { typedef struct _CallSpecific {
RTPSession *crtps[2]; /** Audio is first and video is second */ RTPSession *crtps[2]; /** Audio is first and video is second */
CodecState *cs;/** Each call have its own encoders and decoders. CodecState *cs;/** Each call have its own encoders and decoders.
@ -88,17 +96,16 @@ typedef struct _CallSpecific {
_Bool call_active; _Bool call_active;
pthread_mutex_t mutex; pthread_mutex_t mutex;
/* used in the "decode on another thread" system */
volatile _Bool exit, decoding;
uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write;
pthread_mutex_t decode_cond_mutex;
pthread_cond_t decode_cond;
DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE];
DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE];
} CallSpecific; } CallSpecific;
typedef struct {
int32_t call_index;
uint32_t size;
uint8_t data[0];
} DECODE_PACKET;
#define VIDEO_DECODE_QUEUE_SIZE 2
#define AUDIO_DECODE_QUEUE_SIZE 16
struct _ToxAv { struct _ToxAv {
Messenger *messenger; Messenger *messenger;
MSISession *msi_session; /** Main msi session */ MSISession *msi_session; /** Main msi session */
@ -111,14 +118,6 @@ struct _ToxAv {
void *video_callback_userdata; void *video_callback_userdata;
uint32_t max_calls; uint32_t max_calls;
/* used in the "decode on another thread" system */
volatile _Bool exit, decoding;
uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write;
pthread_mutex_t decode_cond_mutex;
pthread_cond_t decode_cond;
DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE];
DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE];
}; };
static void *toxav_decoding(void *arg); static void *toxav_decoding(void *arg);
@ -183,12 +182,6 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
av->calls = calloc(sizeof(CallSpecific), max_calls); av->calls = calloc(sizeof(CallSpecific), max_calls);
av->max_calls = max_calls; av->max_calls = max_calls;
pthread_mutex_init(&av->decode_cond_mutex, NULL);
pthread_cond_init(&av->decode_cond, NULL);
pthread_t temp;
pthread_create(&temp, NULL, toxav_decoding, av);
return av; return av;
} }
@ -201,36 +194,6 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
void toxav_kill ( ToxAv *av ) void toxav_kill ( ToxAv *av )
{ {
int i; int i;
DECODE_PACKET *p;
av->exit = 1;
pthread_mutex_lock(&av->decode_cond_mutex);
pthread_cond_signal(&av->decode_cond);
if (av->exit) {
pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex);
}
pthread_mutex_unlock(&av->decode_cond_mutex);
pthread_mutex_destroy(&av->decode_cond_mutex);
pthread_cond_destroy(&av->decode_cond);
for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
p = av->video_decode_queue[i];
if (p) {
free(p);
}
}
for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
p = av->audio_decode_queue[i];
if (p) {
free(p);
}
}
for (i = 0; i < av->max_calls; i ++) { for (i = 0; i < av->max_calls; i ++) {
if ( av->calls[i].crtps[audio_index] ) if ( av->calls[i].crtps[audio_index] )
@ -525,6 +488,23 @@ int toxav_prepare_transmission ( ToxAv *av, int32_t call_index, uint32_t jbuf_ca
if ( pthread_mutex_init(&call->mutex, NULL) != 0 ) goto error; if ( pthread_mutex_init(&call->mutex, NULL) != 0 ) goto error;
//todo: add error checks
pthread_mutex_init(&call->decode_cond_mutex, NULL);
pthread_cond_init(&call->decode_cond, NULL);
void **arg = malloc(2 * sizeof(void *));
arg[0] = av;
arg[1] = call;
pthread_t temp;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 1 << 18);
pthread_create(&temp, &attr, toxav_decoding, arg);
pthread_attr_destroy(&attr);
LOGGER_WARNING("Got here"); LOGGER_WARNING("Got here");
call->call_active = 1; call->call_active = 1;
@ -576,36 +556,36 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index )
terminate_queue(call->j_buf); terminate_queue(call->j_buf);
call->j_buf = NULL; call->j_buf = NULL;
pthread_mutex_lock(&av->decode_cond_mutex);
int i; int i;
DECODE_PACKET *p; DECODE_PACKET *p;
for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { call->exit = 1;
p = av->video_decode_queue[i]; pthread_mutex_lock(&call->decode_cond_mutex);
pthread_cond_signal(&call->decode_cond);
pthread_cond_wait(&call->decode_cond, &call->decode_cond_mutex);
pthread_mutex_unlock(&call->decode_cond_mutex);
pthread_mutex_destroy(&call->decode_cond_mutex);
pthread_cond_destroy(&call->decode_cond);
if (p && p->call_index == call_index) { for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
p = call->video_decode_queue[i];
if (p) {
free(p); free(p);
av->video_decode_queue[i] = NULL;
} }
} }
for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) { for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
p = av->audio_decode_queue[i]; p = call->audio_decode_queue[i];
if (p && p->call_index == call_index) { if (p) {
free(p); free(p);
av->audio_decode_queue[i] = NULL;
} }
} }
while (av->decoding) {} //use a pthread condition?
codec_terminate_session(call->cs); codec_terminate_session(call->cs);
call->cs = NULL; call->cs = NULL;
pthread_mutex_unlock(&av->decode_cond_mutex);
pthread_mutex_unlock(&call->mutex); pthread_mutex_unlock(&call->mutex);
pthread_mutex_destroy(&call->mutex); pthread_mutex_destroy(&call->mutex);
@ -925,9 +905,9 @@ int toxav_has_activity(ToxAv *av, int32_t call_index, int16_t *PCM, uint16_t fra
} }
static void decode_video(ToxAv *av, DECODE_PACKET *p) static void decode_video(ToxAv *av, CallSpecific *call, DECODE_PACKET *p)
{ {
CallSpecific *call = &av->calls[p->call_index]; int32_t call_index = call - av->calls;
int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US); int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US);
@ -940,7 +920,7 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p)
img = vpx_codec_get_frame(&call->cs->v_decoder, &iter); img = vpx_codec_get_frame(&call->cs->v_decoder, &iter);
if (img && av->video_callback) { if (img && av->video_callback) {
av->video_callback(av, p->call_index, img, av->video_callback_userdata); av->video_callback(av, call_index, img, av->video_callback_userdata);
} else { } else {
LOGGER_WARNING("Video packet dropped due to missing callback or no image!"); LOGGER_WARNING("Video packet dropped due to missing callback or no image!");
} }
@ -948,10 +928,9 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p)
free(p); free(p);
} }
static void decode_audio(ToxAv *av, DECODE_PACKET *p) static void decode_audio(ToxAv *av, CallSpecific *call, DECODE_PACKET *p)
{ {
int32_t call_index = p->call_index; int32_t call_index = call - av->calls;
CallSpecific *call = &av->calls[call_index];
// ToxAvCSettings csettings; // ToxAvCSettings csettings;
// toxav_get_peer_csettings(av, call_index, 0, &csettings); // toxav_get_peer_csettings(av, call_index, 0, &csettings);
@ -975,32 +954,39 @@ static void decode_audio(ToxAv *av, DECODE_PACKET *p)
static void *toxav_decoding(void *arg) static void *toxav_decoding(void *arg)
{ {
ToxAv *av = arg; void **pp = arg;
ToxAv *av = pp[0];
CallSpecific *call = pp[1];
free(pp);
while (!av->exit) { while (!call->exit) {
DECODE_PACKET *p; DECODE_PACKET *p;
_Bool video = 0; _Bool video = 0;
av->decoding = 0; pthread_mutex_lock(&call->decode_cond_mutex);
pthread_mutex_lock(&av->decode_cond_mutex);
if (call->exit) {
break;
}
uint8_t r; uint8_t r;
/* first check for available packets, otherwise wait for condition*/ /* first check for available packets, otherwise wait for condition*/
r = av->audio_decode_read; r = call->audio_decode_read;
p = av->audio_decode_queue[r]; p = call->audio_decode_queue[r];
if (!p) { if (!p) {
r = av->video_decode_read; r = call->video_decode_read;
p = av->video_decode_queue[r]; p = call->video_decode_queue[r];
if (!p) { if (!p) {
pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); pthread_cond_wait(&call->decode_cond, &call->decode_cond_mutex);
r = av->audio_decode_read; r = call->audio_decode_read;
p = av->audio_decode_queue[r]; p = call->audio_decode_queue[r];
if (!p) { if (!p) {
r = av->video_decode_read; r = call->video_decode_read;
p = av->video_decode_queue[r]; p = call->video_decode_queue[r];
video = 1; video = 1;
} }
} else { } else {
@ -1010,30 +996,28 @@ static void *toxav_decoding(void *arg)
if (video) { if (video) {
if (p) { if (p) {
av->video_decode_queue[r] = NULL; call->video_decode_queue[r] = NULL;
av->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE; call->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE;
} }
} else { } else {
av->audio_decode_queue[r] = NULL; call->audio_decode_queue[r] = NULL;
av->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE; call->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE;
} }
av->decoding = 1; pthread_mutex_unlock(&call->decode_cond_mutex);
pthread_mutex_unlock(&av->decode_cond_mutex);
if (p) { if (p) {
if (video) { if (video) {
decode_video(av, p); decode_video(av, call, p);
} else { } else {
decode_audio(av, p); decode_audio(av, call, p);
} }
} }
} }
pthread_mutex_lock(&av->decode_cond_mutex); call->exit = 0;
av->exit = 0; pthread_cond_signal(&call->decode_cond);
pthread_cond_signal(&av->decode_cond); pthread_mutex_unlock(&call->decode_cond_mutex);
pthread_mutex_unlock(&av->decode_cond_mutex);
return NULL; return NULL;
} }
@ -1058,14 +1042,12 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
p = malloc(sizeof(DECODE_PACKET)); p = malloc(sizeof(DECODE_PACKET));
if (p) { if (p) {
p->call_index = call_index;
p->size = 0; p->size = 0;
} }
} else { } else {
p = malloc(sizeof(DECODE_PACKET) + _msg->length); p = malloc(sizeof(DECODE_PACKET) + _msg->length);
if (p) { if (p) {
p->call_index = call_index;
p->size = _msg->length; p->size = _msg->length;
memcpy(p->data, _msg->data, _msg->length); memcpy(p->data, _msg->data, _msg->length);
} }
@ -1075,19 +1057,19 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
if (p) { if (p) {
/* do the decoding on another thread */ /* do the decoding on another thread */
pthread_mutex_lock(&av->decode_cond_mutex); pthread_mutex_lock(&call->decode_cond_mutex);
uint8_t w = av->audio_decode_write; uint8_t w = call->audio_decode_write;
if (av->audio_decode_queue[w] == NULL) { if (call->audio_decode_queue[w] == NULL) {
av->audio_decode_queue[w] = p; call->audio_decode_queue[w] = p;
av->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE; call->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE;
pthread_cond_signal(&av->decode_cond); pthread_cond_signal(&call->decode_cond);
} else { } else {
LOGGER_DEBUG("Dropped audio frame\n"); LOGGER_DEBUG("Dropped audio frame\n");
free(p); free(p);
} }
pthread_mutex_unlock(&av->decode_cond_mutex); pthread_mutex_unlock(&call->decode_cond_mutex);
} else { } else {
//malloc failed //malloc failed
} }
@ -1110,24 +1092,23 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit); DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit);
if (p) { if (p) {
p->call_index = call_index;
p->size = call->frame_limit; p->size = call->frame_limit;
memcpy(p->data, call->frame_buf, call->frame_limit); memcpy(p->data, call->frame_buf, call->frame_limit);
/* do the decoding on another thread */ /* do the decoding on another thread */
pthread_mutex_lock(&av->decode_cond_mutex); pthread_mutex_lock(&call->decode_cond_mutex);
uint8_t w = av->video_decode_write; uint8_t w = call->video_decode_write;
if (av->video_decode_queue[w] == NULL) { if (call->video_decode_queue[w] == NULL) {
av->video_decode_queue[w] = p; call->video_decode_queue[w] = p;
av->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE; call->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE;
pthread_cond_signal(&av->decode_cond); pthread_cond_signal(&call->decode_cond);
} else { } else {
LOGGER_DEBUG("Dropped video frame\n"); LOGGER_DEBUG("Dropped video frame\n");
free(p); free(p);
} }
pthread_mutex_unlock(&av->decode_cond_mutex); pthread_mutex_unlock(&call->decode_cond_mutex);
} else { } else {
//malloc failed //malloc failed
} }