1
0
mirror of https://github.com/qTox/qTox.git synced 2024-03-22 14:00:36 +08:00

refactor(coreav): move threading of CoreAV to single mutex

Use a standard mutex instead of trying to build proper locking
ourselfes.

(cherry picked from commit 2bc0057bbd)
This commit is contained in:
sudden6 2019-11-14 14:48:34 +01:00 committed by Anthony Bilinski
parent 71b5c50ac6
commit f72b3397a5
No known key found for this signature in database
GPG Key ID: 2AA8E0DA1B31FB3C
2 changed files with 64 additions and 117 deletions

View File

@ -73,7 +73,6 @@ CoreAV::CoreAV(std::unique_ptr<ToxAV, ToxAVDeleter> toxav)
, toxav{std::move(toxav)}
, coreavThread{new QThread{this}}
, iterateTimer{new QTimer{this}}
, threadSwitchLock{false}
{
assert(coreavThread);
assert(iterateTimer);
@ -184,6 +183,7 @@ void CoreAV::process()
*/
bool CoreAV::isCallStarted(const Friend* f) const
{
QMutexLocker locker{&callsLock};
return f && (calls.find(f->getId()) != calls.end());
}
@ -194,6 +194,7 @@ bool CoreAV::isCallStarted(const Friend* f) const
*/
bool CoreAV::isCallStarted(const Group* g) const
{
QMutexLocker locker{&callsLock};
return g && (groupCalls.find(g->getId()) != groupCalls.end());
}
@ -204,6 +205,7 @@ bool CoreAV::isCallStarted(const Group* g) const
*/
bool CoreAV::isCallActive(const Friend* f) const
{
QMutexLocker locker{&callsLock};
auto it = calls.find(f->getId());
if (it == calls.end()) {
return false;
@ -218,6 +220,7 @@ bool CoreAV::isCallActive(const Friend* f) const
*/
bool CoreAV::isCallActive(const Group* g) const
{
QMutexLocker locker{&callsLock};
auto it = groupCalls.find(g->getId());
if (it == groupCalls.end()) {
return false;
@ -227,26 +230,14 @@ bool CoreAV::isCallActive(const Group* g) const
bool CoreAV::isCallVideoEnabled(const Friend* f) const
{
QMutexLocker locker{&callsLock};
auto it = calls.find(f->getId());
return isCallStarted(f) && it->second->getVideoEnabled();
}
bool CoreAV::answerCall(uint32_t friendNum, bool video)
{
if (QThread::currentThread() != coreavThread.get()) {
if (threadSwitchLock.test_and_set(std::memory_order_acquire)) {
qDebug() << "CoreAV::answerCall: Backed off of thread-switch lock";
return false;
}
bool ret;
QMetaObject::invokeMethod(this, "answerCall", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(bool, ret), Q_ARG(uint32_t, friendNum),
Q_ARG(bool, video));
threadSwitchLock.clear(std::memory_order_release);
return ret;
}
QMutexLocker locker{&callsLock};
qDebug() << QString("answering call %1").arg(friendNum);
auto it = calls.find(friendNum);
@ -268,20 +259,7 @@ bool CoreAV::answerCall(uint32_t friendNum, bool video)
bool CoreAV::startCall(uint32_t friendNum, bool video)
{
if (QThread::currentThread() != coreavThread.get()) {
if (threadSwitchLock.test_and_set(std::memory_order_acquire)) {
qDebug() << "CoreAV::startCall: Backed off of thread-switch lock";
return false;
}
bool ret;
QMetaObject::invokeMethod(this, "startCall", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(bool, ret), Q_ARG(uint32_t, friendNum),
Q_ARG(bool, video));
threadSwitchLock.clear(std::memory_order_release);
return ret;
}
QMutexLocker locker{&callsLock};
qDebug() << QString("Starting call with %1").arg(friendNum);
auto it = calls.find(friendNum);
@ -306,19 +284,7 @@ bool CoreAV::startCall(uint32_t friendNum, bool video)
bool CoreAV::cancelCall(uint32_t friendNum)
{
if (QThread::currentThread() != coreavThread.get()) {
if (threadSwitchLock.test_and_set(std::memory_order_acquire)) {
qDebug() << "CoreAV::cancelCall: Backed off of thread-switch lock";
return false;
}
bool ret;
QMetaObject::invokeMethod(this, "cancelCall", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(bool, ret), Q_ARG(uint32_t, friendNum));
threadSwitchLock.clear(std::memory_order_release);
return ret;
}
QMutexLocker locker{&callsLock};
qDebug() << QString("Cancelling call with %1").arg(friendNum);
if (!toxav_call_control(toxav.get(), friendNum, TOXAV_CALL_CONTROL_CANCEL, nullptr)) {
@ -333,13 +299,7 @@ bool CoreAV::cancelCall(uint32_t friendNum)
void CoreAV::timeoutCall(uint32_t friendNum)
{
// Non-blocking switch to the CoreAV thread, we really don't want to be coming
// blocking-queued from the UI thread while we emit blocking-queued to it
if (QThread::currentThread() != coreavThread.get()) {
QMetaObject::invokeMethod(this, "timeoutCall", Qt::QueuedConnection,
Q_ARG(uint32_t, friendNum));
return;
}
QMutexLocker locker{&callsLock};
if (!cancelCall(friendNum)) {
qWarning() << QString("Failed to timeout call with %1").arg(friendNum);
@ -360,6 +320,8 @@ void CoreAV::timeoutCall(uint32_t friendNum)
bool CoreAV::sendCallAudio(uint32_t callId, const int16_t* pcm, size_t samples, uint8_t chans,
uint32_t rate) const
{
QMutexLocker locker{&callsLock};
auto it = calls.find(callId);
if (it == calls.end()) {
return false;
@ -394,6 +356,8 @@ bool CoreAV::sendCallAudio(uint32_t callId, const int16_t* pcm, size_t samples,
void CoreAV::sendCallVideo(uint32_t callId, std::shared_ptr<VideoFrame> vframe)
{
QMutexLocker locker{&callsLock};
// We might be running in the FFmpeg thread and holding the CameraSource lock
// So be careful not to deadlock with anything while toxav locks in toxav_video_send_frame
auto it = calls.find(callId);
@ -446,6 +410,8 @@ void CoreAV::sendCallVideo(uint32_t callId, std::shared_ptr<VideoFrame> vframe)
*/
void CoreAV::toggleMuteCallInput(const Friend* f)
{
QMutexLocker locker{&callsLock};
auto it = calls.find(f->getId());
if (f && (it != calls.end())) {
ToxCall& call = *it->second;
@ -459,6 +425,8 @@ void CoreAV::toggleMuteCallInput(const Friend* f)
*/
void CoreAV::toggleMuteCallOutput(const Friend* f)
{
QMutexLocker locker{&callsLock};
auto it = calls.find(f->getId());
if (f && (it != calls.end())) {
ToxCall& call = *it->second;
@ -482,8 +450,11 @@ void CoreAV::groupCallCallback(void* tox, uint32_t group, uint32_t peer, const i
unsigned samples, uint8_t channels, uint32_t sample_rate, void* core)
{
Q_UNUSED(tox);
Core* c = static_cast<Core*>(core);
CoreAV* cav = c->getAv();
QMutexLocker locker{&cav->callsLock};
const ToxPk peerPk = c->getGroupPeerPk(group, peer);
const Settings& s = Settings::getInstance();
// don't play the audio if it comes from a muted peer
@ -493,8 +464,6 @@ void CoreAV::groupCallCallback(void* tox, uint32_t group, uint32_t peer, const i
emit c->groupPeerAudioPlaying(group, peerPk);
CoreAV* cav = c->getAv();
auto it = cav->groupCalls.find(group);
if (it == cav->groupCalls.end()) {
return;
@ -516,6 +485,8 @@ void CoreAV::groupCallCallback(void* tox, uint32_t group, uint32_t peer, const i
*/
void CoreAV::invalidateGroupCallPeerSource(int group, ToxPk peerPk)
{
QMutexLocker locker{&callsLock};
auto it = groupCalls.find(group);
if (it == groupCalls.end()) {
return;
@ -530,6 +501,8 @@ void CoreAV::invalidateGroupCallPeerSource(int group, ToxPk peerPk)
*/
VideoSource* CoreAV::getVideoSourceFromCall(int friendNum) const
{
QMutexLocker locker{&callsLock};
auto it = calls.find(friendNum);
if (it == calls.end()) {
qWarning() << "CoreAV::getVideoSourceFromCall: No such call, did it die before we finished "
@ -547,6 +520,8 @@ VideoSource* CoreAV::getVideoSourceFromCall(int friendNum) const
*/
void CoreAV::joinGroupCall(int groupId)
{
QMutexLocker locker{&callsLock};
qDebug() << QString("Joining group call %1").arg(groupId);
// Audio backend must be set before starting a call
@ -568,6 +543,8 @@ void CoreAV::joinGroupCall(int groupId)
*/
void CoreAV::leaveGroupCall(int groupId)
{
QMutexLocker locker{&callsLock};
qDebug() << QString("Leaving group call %1").arg(groupId);
groupCalls.erase(groupId);
@ -576,6 +553,8 @@ void CoreAV::leaveGroupCall(int groupId)
bool CoreAV::sendGroupCallAudio(int groupId, const int16_t* pcm, size_t samples, uint8_t chans,
uint32_t rate) const
{
QMutexLocker locker{&callsLock};
std::map<int, ToxGroupCallPtr>::const_iterator it = groupCalls.find(groupId);
if (it == groupCalls.end()) {
return false;
@ -598,6 +577,8 @@ bool CoreAV::sendGroupCallAudio(int groupId, const int16_t* pcm, size_t samples,
*/
void CoreAV::muteCallInput(const Group* g, bool mute)
{
QMutexLocker locker{&callsLock};
auto it = groupCalls.find(g->getId());
if (g && (it != groupCalls.end())) {
it->second->setMuteMic(mute);
@ -611,6 +592,8 @@ void CoreAV::muteCallInput(const Group* g, bool mute)
*/
void CoreAV::muteCallOutput(const Group* g, bool mute)
{
QMutexLocker locker{&callsLock};
auto it = groupCalls.find(g->getId());
if (g && (it != groupCalls.end())) {
it->second->setMuteVol(mute);
@ -624,6 +607,8 @@ void CoreAV::muteCallOutput(const Group* g, bool mute)
*/
bool CoreAV::isGroupCallInputMuted(const Group* g) const
{
QMutexLocker locker{&callsLock};
if (!g) {
return false;
}
@ -640,6 +625,8 @@ bool CoreAV::isGroupCallInputMuted(const Group* g) const
*/
bool CoreAV::isGroupCallOutputMuted(const Group* g) const
{
QMutexLocker locker{&callsLock};
if (!g) {
return false;
}
@ -656,6 +643,8 @@ bool CoreAV::isGroupCallOutputMuted(const Group* g) const
*/
bool CoreAV::isCallInputMuted(const Friend* f) const
{
QMutexLocker locker{&callsLock};
if (!f) {
return false;
}
@ -671,6 +660,8 @@ bool CoreAV::isCallInputMuted(const Friend* f) const
*/
bool CoreAV::isCallOutputMuted(const Friend* f) const
{
QMutexLocker locker{&callsLock};
if (!f) {
return false;
}
@ -685,6 +676,8 @@ bool CoreAV::isCallOutputMuted(const Friend* f) const
*/
void CoreAV::sendNoVideo()
{
QMutexLocker locker{&callsLock};
// We don't change the audio bitrate, but we signal that we're not sending video anymore
qDebug() << "CoreAV: Signaling end of video sending";
for (auto& kv : calls) {
@ -698,22 +691,7 @@ void CoreAV::callCallback(ToxAV* toxav, uint32_t friendNum, bool audio, bool vid
{
CoreAV* self = static_cast<CoreAV*>(vSelf);
// Run this slow callback asynchronously on the AV thread to avoid deadlocks with what our
// caller (toxcore) holds
// Also run the code to switch to the CoreAV thread in yet another thread, in case CoreAV
// has threadSwitchLock and wants a toxcore lock that our call stack is holding...
if (QThread::currentThread() != self->coreavThread.get()) {
QtConcurrent::run([=]() {
// We assume the original caller doesn't come from the CoreAV thread here
while (self->threadSwitchLock.test_and_set(std::memory_order_acquire))
QThread::yieldCurrentThread(); // Shouldn't spin for long, we have priority
QMetaObject::invokeMethod(self, "callCallback", Qt::QueuedConnection,
Q_ARG(ToxAV*, toxav), Q_ARG(uint32_t, friendNum),
Q_ARG(bool, audio), Q_ARG(bool, video), Q_ARG(void*, vSelf));
});
return;
}
QMutexLocker locker{&self->callsLock};
// Audio backend must be set before receiving a call
assert(self->audio != nullptr);
@ -722,10 +700,6 @@ void CoreAV::callCallback(ToxAV* toxav, uint32_t friendNum, bool audio, bool vid
auto it = self->calls.emplace(friendNum, std::move(call));
if (it.second == false) {
/// Hanging up from a callback is supposed to be UB,
/// but since currently the toxav callbacks are fired from the toxcore thread,
/// we'll always reach this point through a non-blocking queud connection, so not in the
/// callback.
qWarning() << QString("Rejecting call invite from %1, we're already in that call!").arg(friendNum);
toxav_call_control(toxav, friendNum, TOXAV_CALL_CONTROL_CANCEL, nullptr);
return;
@ -739,37 +713,20 @@ void CoreAV::callCallback(ToxAV* toxav, uint32_t friendNum, bool audio, bool vid
if (video)
state |= TOXAV_FRIEND_CALL_STATE_SENDING_V | TOXAV_FRIEND_CALL_STATE_ACCEPTING_V;
it.first->second->setState(static_cast<TOXAV_FRIEND_CALL_STATE>(state));
// note: changed to self->
emit reinterpret_cast<CoreAV*>(self)->avInvite(friendNum, video);
self->threadSwitchLock.clear(std::memory_order_release);
}
void CoreAV::stateCallback(ToxAV* toxav, uint32_t friendNum, uint32_t state, void* vSelf)
{
Q_UNUSED(toxav);
CoreAV* self = static_cast<CoreAV*>(vSelf);
// Run this slow callback asynchronously on the AV thread to avoid deadlocks with what our
// caller (toxcore) holds
// Also run the code to switch to the CoreAV thread in yet another thread, in case CoreAV
// has threadSwitchLock and wants a toxcore lock that our call stack is holding...
if (QThread::currentThread() != self->coreavThread.get()) {
QtConcurrent::run([=]() {
// We assume the original caller doesn't come from the CoreAV thread here
while (self->threadSwitchLock.test_and_set(std::memory_order_acquire))
QThread::yieldCurrentThread(); // Shouldn't spin for long, we have priority
QMetaObject::invokeMethod(self, "stateCallback", Qt::QueuedConnection,
Q_ARG(ToxAV*, toxav), Q_ARG(uint32_t, friendNum),
Q_ARG(uint32_t, state), Q_ARG(void*, vSelf));
});
return;
}
QMutexLocker locker{&self->callsLock};
auto it = self->calls.find(friendNum);
if (it == self->calls.end()) {
qWarning() << QString("stateCallback called, but call %1 is already dead").arg(friendNum);
self->threadSwitchLock.clear(std::memory_order_release);
return;
}
@ -778,12 +735,10 @@ void CoreAV::stateCallback(ToxAV* toxav, uint32_t friendNum, uint32_t state, voi
if (state & TOXAV_FRIEND_CALL_STATE_ERROR) {
qWarning() << "Call with friend" << friendNum << "died of unnatural causes!";
self->calls.erase(friendNum);
// why not self->
emit self->avEnd(friendNum, true);
} else if (state & TOXAV_FRIEND_CALL_STATE_FINISHED) {
qDebug() << "Call with friend" << friendNum << "finished quietly";
self->calls.erase(friendNum);
// why not self->
emit self->avEnd(friendNum);
} else {
// If our state was null, we started the call and were still ringing
@ -811,50 +766,36 @@ void CoreAV::stateCallback(ToxAV* toxav, uint32_t friendNum, uint32_t state, voi
call.setState(static_cast<TOXAV_FRIEND_CALL_STATE>(state));
}
self->threadSwitchLock.clear(std::memory_order_release);
}
// This is only a dummy implementation for now
void CoreAV::bitrateCallback(ToxAV* toxav, uint32_t friendNum, uint32_t arate, uint32_t vrate,
void* vSelf)
{
CoreAV* self = static_cast<CoreAV*>(vSelf);
// Run this slow path callback asynchronously on the AV thread to avoid deadlocks
if (QThread::currentThread() != self->coreavThread.get()) {
return (void)QMetaObject::invokeMethod(self, "bitrateCallback", Qt::QueuedConnection,
Q_ARG(ToxAV*, toxav), Q_ARG(uint32_t, friendNum),
Q_ARG(uint32_t, arate), Q_ARG(uint32_t, vrate),
Q_ARG(void*, vSelf));
}
Q_UNUSED(self);
Q_UNUSED(toxav);
qDebug() << "Recommended bitrate with" << friendNum << " is now " << arate << "/" << vrate
<< ", ignoring it";
}
// This is only a dummy implementation for now
void CoreAV::audioBitrateCallback(ToxAV* toxav, uint32_t friendNum, uint32_t rate, void* vSelf)
{
CoreAV* self = static_cast<CoreAV*>(vSelf);
// Run this slow path callback asynchronously on the AV thread to avoid deadlocks
if (QThread::currentThread() != self->coreavThread.get()) {
return (void)QMetaObject::invokeMethod(self, "audioBitrateCallback", Qt::QueuedConnection,
Q_ARG(ToxAV*, toxav), Q_ARG(uint32_t, friendNum),
Q_ARG(uint32_t, rate), Q_ARG(void*, vSelf));
}
Q_UNUSED(self);
Q_UNUSED(toxav);
qDebug() << "Recommended audio bitrate with" << friendNum << " is now " << rate << ", ignoring it";
}
// This is only a dummy implementation for now
void CoreAV::videoBitrateCallback(ToxAV* toxav, uint32_t friendNum, uint32_t rate, void* vSelf)
{
CoreAV* self = static_cast<CoreAV*>(vSelf);
// Run this slow path callback asynchronously on the AV thread to avoid deadlocks
if (QThread::currentThread() != self->coreavThread.get()) {
return (void)QMetaObject::invokeMethod(self, "videoBitrateCallback", Qt::QueuedConnection,
Q_ARG(ToxAV*, toxav), Q_ARG(uint32_t, friendNum),
Q_ARG(uint32_t, rate), Q_ARG(void*, vSelf));
}
Q_UNUSED(self);
Q_UNUSED(toxav);
qDebug() << "Recommended video bitrate with" << friendNum << " is now " << rate << ", ignoring it";
}
@ -863,6 +804,8 @@ void CoreAV::audioFrameCallback(ToxAV*, uint32_t friendNum, const int16_t* pcm,
uint8_t channels, uint32_t samplingRate, void* vSelf)
{
CoreAV* self = static_cast<CoreAV*>(vSelf);
QMutexLocker locker{&self->callsLock};
auto it = self->calls.find(friendNum);
if (it == self->calls.end()) {
return;
@ -882,6 +825,7 @@ void CoreAV::videoFrameCallback(ToxAV*, uint32_t friendNum, uint16_t w, uint16_t
int32_t ystride, int32_t ustride, int32_t vstride, void* vSelf)
{
auto self = static_cast<CoreAV*>(vSelf);
QMutexLocker locker{&self->callsLock};
auto it = self->calls.find(friendNum);
if (it == self->calls.end()) {

View File

@ -23,6 +23,7 @@
#include "src/core/toxcall.h"
#include <QObject>
#include <QMutex>
#include <atomic>
#include <memory>
#include <tox/toxav.h>
@ -145,7 +146,9 @@ private:
* @note Need to use STL container here, because Qt containers need a copy constructor.
*/
std::map<int, ToxGroupCallPtr> groupCalls;
std::atomic_flag threadSwitchLock;
// protect 'calls' and 'groupCalls' from being modified by ToxAV and Tox threads
mutable QMutex callsLock{QMutex::Recursive};
};
#endif // COREAV_H