well, I know why multithreading wonks out now...
parent
22ba773adc
commit
5a30768505
6
notes
6
notes
|
@ -45,10 +45,8 @@ project data {
|
|||
|
||||
TODO {
|
||||
immediate frontburner {
|
||||
- multithreaded audio
|
||||
- ^ audio engine invokes workers, then QThread::wait()s on them
|
||||
- figure out weirdness with things missing commands in multithreaded mode(!?)
|
||||
- it doesn't happen with a single worker, so it's something wonky with synchronization
|
||||
fix performance and occasional wonkiness with multithreading...
|
||||
probably by deduplicating the queue ahead of time and dropping the atomic lock
|
||||
|
||||
give node helper functions to move ports (index-wise) and collapse them (eliminate numbering holes)
|
||||
node event that fires on port connect/disconnect (to enable, say, automatic collapsing, and changing of behavior by how many connections there are)
|
||||
|
|
|
@ -13,6 +13,7 @@ using namespace Xybrid::Data;
|
|||
|
||||
#include <QDebug>
|
||||
#include <QThread>
|
||||
#include <QMutex>
|
||||
|
||||
// zero-initialize
|
||||
AudioEngine* Xybrid::Audio::audioEngine = nullptr;
|
||||
|
@ -53,7 +54,9 @@ void AudioEngine::postInit() {
|
|||
auto wk = new AudioWorker();
|
||||
wk->thread = new QThread(this);
|
||||
wk->moveToThread(wk->thread);
|
||||
connect(wk->thread, &QThread::started, wk, &AudioWorker::mainLoop);
|
||||
connect(wk->thread, &QThread::started, wk, &AudioWorker::processLoop);
|
||||
wk->thread->start();
|
||||
wk->thread->setPriority(QThread::TimeCriticalPriority);
|
||||
workers.push_back(wk);
|
||||
}
|
||||
}
|
||||
|
@ -484,12 +487,13 @@ void AudioEngine::processNodes() {
|
|||
if (workers.empty()) {
|
||||
for (auto n : queue) if (!n->try_process()) qWarning() << "Dependency check failed in single threaded mode!";
|
||||
} else {
|
||||
//wqueue.try_dequeue
|
||||
for (auto n : queue) wqueue.enqueue(n);
|
||||
for (auto w : workers) {
|
||||
w->thread->start();
|
||||
}
|
||||
for (auto w : workers) w->thread->wait();
|
||||
auto wc = static_cast<int>(workers.size());
|
||||
wsem.release(wc);
|
||||
for (auto w : workers) w->invoke();
|
||||
//thread->setPriority(QThread::HighPriority);
|
||||
wsem.acquire(wc);
|
||||
//thread->setPriority(QThread::TimeCriticalPriority);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -497,16 +501,28 @@ AudioWorker::AudioWorker(QObject* parent) : QObject(parent) {
|
|||
|
||||
}
|
||||
|
||||
void AudioWorker::mainLoop() {
|
||||
thread->setPriority(QThread::TimeCriticalPriority);
|
||||
auto& queue = audioEngine->wqueue;
|
||||
// process
|
||||
std::shared_ptr<Node> n;
|
||||
while (queue.try_dequeue(n)) {
|
||||
if (!n) continue;
|
||||
while (!n->try_process(true)) ;//QThread::yieldCurrentThread();
|
||||
}
|
||||
|
||||
thread->exit();
|
||||
|
||||
void AudioWorker::invoke() {
|
||||
if (!audioEngine->wsem.tryAcquire(1)) return;
|
||||
signal.release(1);
|
||||
}
|
||||
|
||||
void AudioWorker::processLoop() {
|
||||
thread->setPriority(QThread::TimeCriticalPriority);
|
||||
QMutex m;
|
||||
auto& queue = audioEngine->wqueue;
|
||||
auto& s = audioEngine->wsem;
|
||||
while (true) {
|
||||
//while (!signal.tryAcquire(1)) ;
|
||||
signal.acquire(1);
|
||||
|
||||
// process
|
||||
std::shared_ptr<Node> n;
|
||||
while (queue.try_dequeue(n)) {
|
||||
if (!n) continue;
|
||||
while (!n->try_process(true)) ;//QThread::yieldCurrentThread();
|
||||
}
|
||||
|
||||
s.release(1);
|
||||
//thread->setPriority(QThread::IdlePriority);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
|
||||
#include <QIODevice>
|
||||
#include <QAudioOutput>
|
||||
#include <QSemaphore>
|
||||
#include <QWaitCondition>
|
||||
|
||||
#include "inclib/concurrentqueue.h"
|
||||
|
||||
|
@ -61,6 +63,7 @@ namespace Xybrid::Audio {
|
|||
|
||||
std::vector<AudioWorker*> workers;
|
||||
moodycamel::ConcurrentQueue<std::shared_ptr<Data::Node>> wqueue;
|
||||
QSemaphore wsem;
|
||||
|
||||
std::deque<std::shared_ptr<Data::Node>> queue;
|
||||
bool queueValid;
|
||||
|
@ -119,8 +122,10 @@ namespace Xybrid::Audio {
|
|||
explicit AudioWorker(QObject* parent = nullptr);
|
||||
|
||||
QThread* thread;
|
||||
QSemaphore signal;
|
||||
|
||||
void mainLoop();
|
||||
void invoke();
|
||||
[[noreturn]] void processLoop();
|
||||
};
|
||||
|
||||
extern AudioEngine* audioEngine;
|
||||
|
|
|
@ -9,6 +9,8 @@ using namespace Xybrid::Data;
|
|||
#include "audio/audioengine.h"
|
||||
using namespace Xybrid::Audio;
|
||||
|
||||
#include "util/ycombinator.h"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include <QDebug>
|
||||
|
@ -139,8 +141,7 @@ bool Node::try_process(bool checkDependencies) {
|
|||
if (!procLock.compare_exchange_strong(lock, true)) return true; // another worker is using this, advance to the next
|
||||
|
||||
if (checkDependencies) { // check if dependencies are done
|
||||
// TODO: make this a y-combinator instead of a std::function https://stackoverflow.com/a/40873657
|
||||
std::function<bool(std::shared_ptr<Port>)> checkInput = [tick_this, &checkInput](std::shared_ptr<Port> p) {
|
||||
auto checkInput = Util::yCombinator([tick_this](auto checkInput, std::shared_ptr<Port> p) -> bool {
|
||||
for (auto c : p->connections) { // check each connection; if node valid...
|
||||
if (auto cp = c.lock(); cp) {
|
||||
if (auto n = cp->owner.lock(); n) {
|
||||
|
@ -155,7 +156,7 @@ bool Node::try_process(bool checkDependencies) {
|
|||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
});
|
||||
|
||||
for (auto t : inputs) for (auto p : t.second) if (!checkInput(p.second)) { procLock.store(false); return false; }
|
||||
|
||||
|
|
|
@ -78,9 +78,9 @@ void IOPort::setPort(Port::Type type, Port::DataType dataType, uint8_t index) {
|
|||
}
|
||||
|
||||
void IOPort::process() {
|
||||
//if (type == Port::Input) { // thread safety for passthroughs?
|
||||
//port(opposite(type), dataType, index)->pull();
|
||||
//}
|
||||
if (type == Port::Input) { // thread safety for passthroughs?
|
||||
//port(opposite(type), dataType, index)->pull();
|
||||
}
|
||||
}
|
||||
|
||||
void IOPort::onRename() {
|
||||
|
|
|
@ -63,6 +63,13 @@ void TestSynth::reset() {
|
|||
}
|
||||
|
||||
void TestSynth::process() {
|
||||
if (!name.empty()) {
|
||||
size_t cur = audioEngine->curTickId();
|
||||
static size_t last = 0;
|
||||
if (cur == last) qDebug() << "tick processed twice" << cur;
|
||||
last = cur;
|
||||
}
|
||||
|
||||
auto cp = std::static_pointer_cast<CommandPort>(port(Port::Input, Port::Command, 0));
|
||||
cp->pull();
|
||||
auto p = std::static_pointer_cast<AudioPort>(port(Port::Output, Port::Audio, 0));
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
|
||||
namespace Xybrid::Util {
|
||||
// example implementation from http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2016/p0200r0.html adjusted for style
|
||||
template <class F>
|
||||
class YCombinator {
|
||||
F f_; // base lambda
|
||||
public:
|
||||
template<class T> explicit YCombinator(T&& f) : f_(std::forward<T>(f)) {}
|
||||
|
||||
template<class... Args> decltype(auto) operator()(Args&&... args) {
|
||||
return f_(std::ref(*this), std::forward<Args>(args)...); // pass self into base along with args
|
||||
}
|
||||
};
|
||||
template<class F> decltype(auto) yCombinator(F&& f) { return YCombinator<std::decay_t<F>>(std::forward<F>(f)); }
|
||||
}
|
|
@ -93,7 +93,8 @@ HEADERS += \
|
|||
gadgets/gainbalance.h \
|
||||
util/pattern.h \
|
||||
gadgets/transpose.h \
|
||||
inclib/concurrentqueue.h
|
||||
inclib/concurrentqueue.h \
|
||||
util/ycombinator.h
|
||||
|
||||
FORMS += \
|
||||
mainwindow.ui
|
||||
|
|
Loading…
Reference in New Issue