streamlined multithreading a bit

portability/boost
zetaPRIME 2019-01-14 04:00:24 -05:00
parent f0311ab6dd
commit cb2aeddbde
6 changed files with 18 additions and 43 deletions

9
notes
View File

@ -45,9 +45,6 @@ project data {
TODO {
immediate frontburner {
( - can eliminate the moodycamel queue) and atomic lock on nodes now that the main queue is deduplicated
buffer underruns are being caused by some sync wonkiness between multiple workers
make the audio engine thread itself act as a worker during tick
give node helper functions to move ports (index-wise) and collapse them (eliminate numbering holes)
@ -55,8 +52,14 @@ TODO {
}
return the latency/buffer size to 100ms once multithreading is streamlined
# fix how qt5.12 broke header text (removed elide for now)
bugs to fix {
graph connections sometimes spawn in duplicated :|
buffer underruns are being caused by some sync wonkiness between multiple workers
}
misc features needed before proper release {
song metadata (title, artist, comment, default bpm)

View File

@ -94,6 +94,7 @@ void AudioEngine::initAudio(bool startNow) {
output->setCategory("Xybrid");
output->setObjectName("Xybrid"); // if Qt ever implements naming the stream this way, WE'LL BE READY
output->setBufferSize(static_cast<int>(sampleRate*4*100.0/1000.0)); // 100ms
output->setBufferSize(static_cast<int>(sampleRate*4* 110.0 /1000.0)); // a bit more if acting up
}
if (startNow) output->start();
@ -233,9 +234,6 @@ void AudioEngine::buildQueue() {
}
}
// set queue size
//wqueue = moodycamel::ConcurrentQueue<std::shared_ptr<Node>>(queue.size());
/*/{
auto dbg = qDebug() << "Queue:";
for (auto n : queue) {
@ -498,9 +496,7 @@ void AudioEngine::processNodes() {
if (workers.empty()) {
for (auto n : queue) if (!n->try_process()) qWarning() << "Dependency check failed in single threaded mode!";
} else {
//for (auto n : queue) wqueue.enqueue(n);
//wqueue.enqueue_bulk(queue.begin(), queue.size());
queueIndex.store(0);
queueIndex.store(0); // reset sync index
auto wc = static_cast<int>(workers.size());
wsem.release(wc);
for (auto w : workers) w->invoke();

View File

@ -12,8 +12,6 @@
#include <QSemaphore>
#include <QWaitCondition>
#include "inclib/concurrentqueue.h"
class QThread;
namespace Xybrid::Data {
class Project;
@ -62,7 +60,6 @@ namespace Xybrid::Audio {
std::shared_ptr<Data::Project> project;
std::vector<AudioWorker*> workers;
//moodycamel::ConcurrentQueue<std::shared_ptr<Data::Node>> wqueue;
QSemaphore wsem;
std::vector<std::shared_ptr<Data::Node>> queue;

View File

@ -24,9 +24,7 @@ Port::~Port() {
}
}
Port::Port(const Port &) : std::enable_shared_from_this<Port>() {
procLock = false;
}
Port::Port(const Port &) : std::enable_shared_from_this<Port>() { }
bool Port::canConnectTo(DataType d) const {
return d == dataType();
@ -137,12 +135,6 @@ bool Node::try_process(bool checkDependencies) {
size_t tick_this = audioEngine->curTickId();
if (tick_last == tick_this) return true; // already processed
bool lock = false;
if (!procLock.compare_exchange_strong(lock, true)) {
qDebug() << "node order dupe!!";
return true; // another worker is using this, advance to the next
}
if (checkDependencies) { // check if dependencies are done
auto checkInput = Util::yCombinator([tick_this](auto checkInput, std::shared_ptr<Port> p) -> bool {
for (auto c : p->connections) { // check each connection; if node valid...
@ -161,7 +153,7 @@ bool Node::try_process(bool checkDependencies) {
return true;
});
for (auto t : inputs) for (auto p : t.second) if (!checkInput(p.second)) { procLock.store(false); return false; }
for (auto t : inputs) for (auto p : t.second) if (!checkInput(p.second)) return false;
/*for (auto& t : inputs) {
for (auto& p : t.second) {
@ -181,14 +173,9 @@ bool Node::try_process(bool checkDependencies) {
}
/*auto qd = qDebug() << "processing" << QString::fromStdString(pluginName());
if (!name.empty()) qd << "named" << QString::fromStdString(name);
if (auto p = parent.lock(); p && !p->name.empty()) qd << "within" << QString::fromStdString(p->name);*/
process();
tick_last = tick_this;
procLock.store(false);
return true;
}

View File

@ -7,6 +7,7 @@
#include <string>
#include <atomic>
#include <QMutex>
#include <QPointer>
#include <QObject>
@ -49,7 +50,7 @@ namespace Xybrid::Data {
std::weak_ptr<Port> passthroughTo;
Type type;
uint8_t index;
std::atomic_bool procLock = false;
QMutex lock;
size_t tickUpdatedOn = static_cast<size_t>(-1);
QPointer<UI::PortObject> obj;
@ -76,7 +77,6 @@ namespace Xybrid::Data {
friend class Audio::AudioEngine;
friend class Audio::AudioWorker;
size_t tick_last = 0;
std::atomic_bool procLock = false;
bool try_process(bool checkDependencies = true);
public:
Project* project;
@ -117,7 +117,6 @@ namespace Xybrid::Data {
virtual void onParent(std::shared_ptr<Graph>) { }
virtual void onDoubleClick() { }
// something something customChrome?
};
}

View File

@ -9,12 +9,8 @@ using namespace Xybrid::Audio;
void AudioPort::pull() {
auto t = audioEngine->curTickId();
if (tickUpdatedOn == t) return;
bool lock = false;
if (!procLock.compare_exchange_strong(lock, true)) {
while (procLock.load()) ; // spinlock; this is a very fast operation
return; // and done
}
//qDebug() << "just locked";
lock.lock();
if (tickUpdatedOn == t) { lock.unlock(); return; } // someone else got here before us
size_t ts = audioEngine->curTickSize();
size_t s = sizeof(float) * ts;
@ -54,17 +50,14 @@ void AudioPort::pull() {
}
done:
tickUpdatedOn = t;
procLock.store(false);
lock.unlock();
}
void CommandPort::pull() {
auto t = audioEngine->curTickId();
if (tickUpdatedOn == t) return;
bool lock = false;
if (!procLock.compare_exchange_strong(lock, true)) {
while (procLock.load()) ; // spinlock; this is a very fast operation
return; // and done
}
lock.lock();
if (tickUpdatedOn == t) { lock.unlock(); return; } // someone else got here before us
dataSize = 0;
if (type == Input) {
@ -84,7 +77,7 @@ void CommandPort::pull() {
} // don't need an else case, size is already zero
tickUpdatedOn = t;
procLock.store(false);
lock.unlock();
}
void CommandPort::push(std::vector<uint8_t> v) {