From d3e66d73a7b961d424ebfaa3680212f24c241f27 Mon Sep 17 00:00:00 2001 From: notsecure Date: Mon, 4 Aug 2014 09:50:32 -0400 Subject: [PATCH] one decoding thread per call --- toxav/toxav.c | 207 ++++++++++++++++++++++---------------------------- 1 file changed, 92 insertions(+), 115 deletions(-) diff --git a/toxav/toxav.c b/toxav/toxav.c index 870dd111..f7b35934 100644 --- a/toxav/toxav.c +++ b/toxav/toxav.c @@ -71,6 +71,14 @@ const uint32_t av_VADd = 40; static const uint8_t audio_index = 0, video_index = 1; +typedef struct { + uint32_t size; + uint8_t data[0]; +} DECODE_PACKET; + +#define VIDEO_DECODE_QUEUE_SIZE 2 +#define AUDIO_DECODE_QUEUE_SIZE 16 + typedef struct _CallSpecific { RTPSession *crtps[2]; /** Audio is first and video is second */ CodecState *cs;/** Each call have its own encoders and decoders. @@ -88,17 +96,16 @@ typedef struct _CallSpecific { _Bool call_active; pthread_mutex_t mutex; + + /* used in the "decode on another thread" system */ + volatile _Bool exit, decoding; + uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write; + pthread_mutex_t decode_cond_mutex; + pthread_cond_t decode_cond; + DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE]; + DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE]; } CallSpecific; -typedef struct { - int32_t call_index; - uint32_t size; - uint8_t data[0]; -} DECODE_PACKET; - -#define VIDEO_DECODE_QUEUE_SIZE 2 -#define AUDIO_DECODE_QUEUE_SIZE 16 - struct _ToxAv { Messenger *messenger; MSISession *msi_session; /** Main msi session */ @@ -111,14 +118,6 @@ struct _ToxAv { void *video_callback_userdata; uint32_t max_calls; - - /* used in the "decode on another thread" system */ - volatile _Bool exit, decoding; - uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write; - pthread_mutex_t decode_cond_mutex; - pthread_cond_t decode_cond; - DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE]; - DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE]; }; static void *toxav_decoding(void *arg); @@ -183,12 +182,6 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls) av->calls = calloc(sizeof(CallSpecific), max_calls); av->max_calls = max_calls; - pthread_mutex_init(&av->decode_cond_mutex, NULL); - pthread_cond_init(&av->decode_cond, NULL); - - pthread_t temp; - pthread_create(&temp, NULL, toxav_decoding, av); - return av; } @@ -201,36 +194,6 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls) void toxav_kill ( ToxAv *av ) { int i; - DECODE_PACKET *p; - - av->exit = 1; - pthread_mutex_lock(&av->decode_cond_mutex); - pthread_cond_signal(&av->decode_cond); - - if (av->exit) { - pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); - } - - pthread_mutex_unlock(&av->decode_cond_mutex); - - pthread_mutex_destroy(&av->decode_cond_mutex); - pthread_cond_destroy(&av->decode_cond); - - for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { - p = av->video_decode_queue[i]; - - if (p) { - free(p); - } - } - - for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) { - p = av->audio_decode_queue[i]; - - if (p) { - free(p); - } - } for (i = 0; i < av->max_calls; i ++) { if ( av->calls[i].crtps[audio_index] ) @@ -525,6 +488,23 @@ int toxav_prepare_transmission ( ToxAv *av, int32_t call_index, uint32_t jbuf_ca if ( pthread_mutex_init(&call->mutex, NULL) != 0 ) goto error; + //todo: add error checks + pthread_mutex_init(&call->decode_cond_mutex, NULL); + pthread_cond_init(&call->decode_cond, NULL); + + void **arg = malloc(2 * sizeof(void*)); + arg[0] = av; + arg[1] = call; + + pthread_t temp; + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, 1 << 18); + pthread_create(&temp, &attr, toxav_decoding, arg); + pthread_attr_destroy(&attr); + + LOGGER_WARNING("Got here"); call->call_active = 1; @@ -576,36 +556,34 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index ) terminate_queue(call->j_buf); call->j_buf = NULL; - - pthread_mutex_lock(&av->decode_cond_mutex); int i; DECODE_PACKET *p; - for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { - p = av->video_decode_queue[i]; + call->exit = 1; + pthread_mutex_lock(&call->decode_cond_mutex); + pthread_cond_signal(&call->decode_cond); + pthread_cond_wait(&call->decode_cond, &call->decode_cond_mutex); + pthread_mutex_unlock(&call->decode_cond_mutex); + pthread_mutex_destroy(&call->decode_cond_mutex); + pthread_cond_destroy(&call->decode_cond); - if (p && p->call_index == call_index) { + for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { + p = call->video_decode_queue[i]; + if (p) { free(p); - av->video_decode_queue[i] = NULL; } } for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) { - p = av->audio_decode_queue[i]; - - if (p && p->call_index == call_index) { + p = call->audio_decode_queue[i]; + if (p) { free(p); - av->audio_decode_queue[i] = NULL; } } - while (av->decoding) {} //use a pthread condition? - codec_terminate_session(call->cs); call->cs = NULL; - pthread_mutex_unlock(&av->decode_cond_mutex); - pthread_mutex_unlock(&call->mutex); pthread_mutex_destroy(&call->mutex); @@ -925,9 +903,9 @@ int toxav_has_activity(ToxAv *av, int32_t call_index, int16_t *PCM, uint16_t fra } -static void decode_video(ToxAv *av, DECODE_PACKET *p) +static void decode_video(ToxAv *av, CallSpecific *call, DECODE_PACKET *p) { - CallSpecific *call = &av->calls[p->call_index]; + int32_t call_index = call - av->calls; int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US); @@ -940,7 +918,7 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p) img = vpx_codec_get_frame(&call->cs->v_decoder, &iter); if (img && av->video_callback) { - av->video_callback(av, p->call_index, img, av->video_callback_userdata); + av->video_callback(av, call_index, img, av->video_callback_userdata); } else { LOGGER_WARNING("Video packet dropped due to missing callback or no image!"); } @@ -948,10 +926,9 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p) free(p); } -static void decode_audio(ToxAv *av, DECODE_PACKET *p) +static void decode_audio(ToxAv *av, CallSpecific *call, DECODE_PACKET *p) { - int32_t call_index = p->call_index; - CallSpecific *call = &av->calls[call_index]; + int32_t call_index = call - av->calls; // ToxAvCSettings csettings; // toxav_get_peer_csettings(av, call_index, 0, &csettings); @@ -975,32 +952,37 @@ static void decode_audio(ToxAv *av, DECODE_PACKET *p) static void *toxav_decoding(void *arg) { - ToxAv *av = arg; + void **pp = arg; + ToxAv *av = pp[0]; + CallSpecific *call = pp[1]; + free(pp); - while (!av->exit) { + while (!call->exit) { DECODE_PACKET *p; _Bool video = 0; - av->decoding = 0; - pthread_mutex_lock(&av->decode_cond_mutex); + pthread_mutex_lock(&call->decode_cond_mutex); + if(call->exit) { + break; + } uint8_t r; /* first check for available packets, otherwise wait for condition*/ - r = av->audio_decode_read; - p = av->audio_decode_queue[r]; + r = call->audio_decode_read; + p = call->audio_decode_queue[r]; if (!p) { - r = av->video_decode_read; - p = av->video_decode_queue[r]; + r = call->video_decode_read; + p = call->video_decode_queue[r]; if (!p) { - pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); - r = av->audio_decode_read; - p = av->audio_decode_queue[r]; + pthread_cond_wait(&call->decode_cond, &call->decode_cond_mutex); + r = call->audio_decode_read; + p = call->audio_decode_queue[r]; if (!p) { - r = av->video_decode_read; - p = av->video_decode_queue[r]; + r = call->video_decode_read; + p = call->video_decode_queue[r]; video = 1; } } else { @@ -1010,30 +992,28 @@ static void *toxav_decoding(void *arg) if (video) { if (p) { - av->video_decode_queue[r] = NULL; - av->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE; + call->video_decode_queue[r] = NULL; + call->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE; } } else { - av->audio_decode_queue[r] = NULL; - av->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE; + call->audio_decode_queue[r] = NULL; + call->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE; } - av->decoding = 1; - pthread_mutex_unlock(&av->decode_cond_mutex); + pthread_mutex_unlock(&call->decode_cond_mutex); if (p) { if (video) { - decode_video(av, p); + decode_video(av, call, p); } else { - decode_audio(av, p); + decode_audio(av, call, p); } } } - pthread_mutex_lock(&av->decode_cond_mutex); - av->exit = 0; - pthread_cond_signal(&av->decode_cond); - pthread_mutex_unlock(&av->decode_cond_mutex); + call->exit = 0; + pthread_cond_signal(&call->decode_cond); + pthread_mutex_unlock(&call->decode_cond_mutex); return NULL; } @@ -1058,14 +1038,12 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) p = malloc(sizeof(DECODE_PACKET)); if (p) { - p->call_index = call_index; p->size = 0; } } else { p = malloc(sizeof(DECODE_PACKET) + _msg->length); if (p) { - p->call_index = call_index; p->size = _msg->length; memcpy(p->data, _msg->data, _msg->length); } @@ -1075,19 +1053,19 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) if (p) { /* do the decoding on another thread */ - pthread_mutex_lock(&av->decode_cond_mutex); - uint8_t w = av->audio_decode_write; + pthread_mutex_lock(&call->decode_cond_mutex); + uint8_t w = call->audio_decode_write; - if (av->audio_decode_queue[w] == NULL) { - av->audio_decode_queue[w] = p; - av->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE; - pthread_cond_signal(&av->decode_cond); + if (call->audio_decode_queue[w] == NULL) { + call->audio_decode_queue[w] = p; + call->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE; + pthread_cond_signal(&call->decode_cond); } else { LOGGER_DEBUG("Dropped audio frame\n"); free(p); } - pthread_mutex_unlock(&av->decode_cond_mutex); + pthread_mutex_unlock(&call->decode_cond_mutex); } else { //malloc failed } @@ -1110,24 +1088,23 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit); if (p) { - p->call_index = call_index; p->size = call->frame_limit; memcpy(p->data, call->frame_buf, call->frame_limit); /* do the decoding on another thread */ - pthread_mutex_lock(&av->decode_cond_mutex); - uint8_t w = av->video_decode_write; + pthread_mutex_lock(&call->decode_cond_mutex); + uint8_t w = call->video_decode_write; - if (av->video_decode_queue[w] == NULL) { - av->video_decode_queue[w] = p; - av->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE; - pthread_cond_signal(&av->decode_cond); + if (call->video_decode_queue[w] == NULL) { + call->video_decode_queue[w] = p; + call->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE; + pthread_cond_signal(&call->decode_cond); } else { LOGGER_DEBUG("Dropped video frame\n"); free(p); } - pthread_mutex_unlock(&av->decode_cond_mutex); + pthread_mutex_unlock(&call->decode_cond_mutex); } else { //malloc failed }