From e6f1a131ee0af268498253b3e3374a9836f97fca Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Mon, 30 Sep 2019 22:40:27 +0200 Subject: [PATCH 1/6] Revert renaming of member --- src/engine/cachingreader.cpp | 6 +++--- src/engine/cachingreader.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/engine/cachingreader.cpp b/src/engine/cachingreader.cpp index 10ee36fbf0a..15a8ba190dc 100644 --- a/src/engine/cachingreader.cpp +++ b/src/engine/cachingreader.cpp @@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group, // The capacity of the back channel must be equal to the number of // allocated chunks, because the worker use writeBlocking(). Otherwise // the worker could get stuck in a hot loop!!! - m_stateFIFO(kNumberOfCachedChunksInMemory), + m_readerStatusUpdateFIFO(kNumberOfCachedChunksInMemory), m_state(State::Idle), m_mruCachingReaderChunk(nullptr), m_lruCachingReaderChunk(nullptr), m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory), - m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) { + m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusUpdateFIFO) { m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory); // Divide up the allocated raw memory buffer into total_chunks @@ -217,7 +217,7 @@ void CachingReader::newTrack(TrackPointer pTrack) { void CachingReader::process() { ReaderStatusUpdate update; - while (m_stateFIFO.read(&update, 1) == 1) { + while (m_readerStatusUpdateFIFO.read(&update, 1) == 1) { DEBUG_ASSERT(m_state != State::Idle); auto pChunk = update.takeFromWorker(); if (pChunk) { diff --git a/src/engine/cachingreader.h b/src/engine/cachingreader.h index 1e9ec831fbf..4636006c7e3 100644 --- a/src/engine/cachingreader.h +++ b/src/engine/cachingreader.h @@ -124,7 +124,7 @@ class CachingReader : public QObject { // Thread-safe FIFOs for communication between the engine callback and // reader thread. FIFO m_chunkReadRequestFIFO; - FIFO m_stateFIFO; + FIFO m_readerStatusUpdateFIFO; // Looks for the provided chunk number in the index of in-memory chunks and // returns it if it is present. If not, returns nullptr. If it is present then From 3e9e2ae2e138f75a49626a1608d3d1664ee5a079 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Mon, 30 Sep 2019 22:43:47 +0200 Subject: [PATCH 2/6] Rename factory function --- src/engine/cachingreaderworker.cpp | 8 ++++---- src/engine/cachingreaderworker.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/engine/cachingreaderworker.cpp b/src/engine/cachingreaderworker.cpp index dae4f303f3e..493a4402536 100644 --- a/src/engine/cachingreaderworker.cpp +++ b/src/engine/cachingreaderworker.cpp @@ -135,7 +135,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { if (!pTrack) { // If no new track is available then we are done - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); return; } @@ -149,7 +149,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { << m_group << "File not found" << filename; - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); emit trackLoadFailed( pTrack, QString("The file '%1' could not be found.") @@ -165,7 +165,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { << m_group << "Failed to open file" << filename; - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); emit trackLoadFailed( pTrack, QString("The file '%1' could not be loaded").arg(filename)); @@ -182,7 +182,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { << m_group << "Failed to open empty file" << filename; - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); emit trackLoadFailed( pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename)); diff --git a/src/engine/cachingreaderworker.h b/src/engine/cachingreaderworker.h index a4c05a2b237..bf1fa8ded75 100644 --- a/src/engine/cachingreaderworker.h +++ b/src/engine/cachingreaderworker.h @@ -69,7 +69,7 @@ typedef struct ReaderStatusUpdate { return update; } - static ReaderStatusUpdate trackNotLoaded() { + static ReaderStatusUpdate trackUnloaded() { ReaderStatusUpdate update; update.init(TRACK_UNLOADED, nullptr, mixxx::IndexRange()); return update; From da5875db04e828755fc5e9630ddb2213be15a4a2 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 1 Oct 2019 00:04:58 +0200 Subject: [PATCH 3/6] Fix race conditions between caching reader and worker --- src/engine/cachingreader.cpp | 54 +++++++++++++++++++++++------- src/engine/cachingreader.h | 6 ++++ src/engine/cachingreaderworker.cpp | 23 ++++++------- src/engine/cachingreaderworker.h | 13 ++++--- 4 files changed, 64 insertions(+), 32 deletions(-) diff --git a/src/engine/cachingreader.cpp b/src/engine/cachingreader.cpp index 15a8ba190dc..2ab562f3596 100644 --- a/src/engine/cachingreader.cpp +++ b/src/engine/cachingreader.cpp @@ -78,16 +78,15 @@ CachingReader::CachingReader(QString group, m_freeChunks.push_back(c); } - // Forward signals from worker - connect(&m_worker, SIGNAL(trackLoading()), - this, SIGNAL(trackLoading()), - Qt::DirectConnection); - connect(&m_worker, SIGNAL(trackLoaded(TrackPointer, int, int)), - this, SIGNAL(trackLoaded(TrackPointer, int, int)), - Qt::DirectConnection); - connect(&m_worker, SIGNAL(trackLoadFailed(TrackPointer, QString)), - this, SIGNAL(trackLoadFailed(TrackPointer, QString)), - Qt::DirectConnection); + // Handle signals from worker thread + connect(&m_worker, + &CachingReaderWorker::trackLoaded, + this, + &CachingReader::onTrackLoaded); + connect(&m_worker, + &CachingReaderWorker::trackLoadFailed, + this, + &CachingReader::onTrackLoadFailed); m_worker.start(QThread::HighPriority); } @@ -201,18 +200,48 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex } void CachingReader::newTrack(TrackPointer pTrack) { + while (m_state == State::TrackLoading) { + // Spin until the previous track has been either + // loaded or unloaded by the worker + process(); + } // Feed the track to the worker as soon as possible // to get ready while the reader switches its internal // state. There are no race conditions, because the // reader polls the worker. - m_worker.newTrack(pTrack); - m_worker.workReady(); + m_worker.newTrack(std::move(pTrack)); // Don't accept any new read requests until the current // track has been unloaded and the new track has been // loaded. m_state = State::TrackLoading; // Free all chunks with sample data from the current track. freeAllChunks(); + // Emit that a new track is loading, stops the current track + emit trackLoading(); +} + +void CachingReader::onTrackLoaded( + TrackPointer pTrack, + int iSampleRate, + int iNumSamples) { + // Consume all pending messages + process(); + // Forward signal depending on the current state + if (m_state == State::TrackLoaded) { + emit trackLoaded(std::move(pTrack), iSampleRate, iNumSamples); + } else { + kLogger.warning() + << "No track loaded"; + } +} + +void CachingReader::onTrackLoadFailed( + TrackPointer pTrack, + QString reason) { + // Consume all pending messages + process(); + // Forward signal independent of the current state + emit trackLoadFailed(std::move(pTrack), reason); } void CachingReader::process() { @@ -542,6 +571,7 @@ void CachingReader::hintAndMaybeWake(const HintVector& hintList) { << "Requesting read of chunk" << request.chunk; } + DEBUG_ASSERT(m_state == State::TrackLoaded); if (m_chunkReadRequestFIFO.write(&request, 1) != 1) { kLogger.warning() << "Failed to submit read request for chunk" diff --git a/src/engine/cachingreader.h b/src/engine/cachingreader.h index 4636006c7e3..eba7738ea0f 100644 --- a/src/engine/cachingreader.h +++ b/src/engine/cachingreader.h @@ -116,8 +116,14 @@ class CachingReader : public QObject { // Emitted once a new track is loaded and ready to be read from. void trackLoading(); void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); + + // Forwarded from the worker void trackLoadFailed(TrackPointer pTrack, QString reason); + private slots: + void onTrackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); + void onTrackLoadFailed(TrackPointer pTrack, QString reason); + private: const UserSettingsPointer m_pConfig; diff --git a/src/engine/cachingreaderworker.cpp b/src/engine/cachingreaderworker.cpp index 493a4402536..1ed53206278 100644 --- a/src/engine/cachingreaderworker.cpp +++ b/src/engine/cachingreaderworker.cpp @@ -29,9 +29,6 @@ CachingReaderWorker::CachingReaderWorker( m_stop(0) { } -CachingReaderWorker::~CachingReaderWorker() { -} - ReaderStatusUpdate CachingReaderWorker::processReadRequest( const CachingReaderChunkReadRequest& request) { CachingReaderChunk* pChunk = request.chunk; @@ -87,9 +84,12 @@ ReaderStatusUpdate CachingReaderWorker::processReadRequest( // WARNING: Always called from a different thread (GUI) void CachingReaderWorker::newTrack(TrackPointer pTrack) { - QMutexLocker locker(&m_newTrackMutex); - m_pNewTrack = pTrack; - m_newTrackAvailable = true; + { + QMutexLocker locker(&m_newTrackMutex); + m_pNewTrack = std::move(pTrack); + m_newTrackAvailable = true; + } + workReady(); } void CachingReaderWorker::run() { @@ -104,11 +104,11 @@ void CachingReaderWorker::run() { TrackPointer pLoadTrack; { // locking scope QMutexLocker locker(&m_newTrackMutex); - pLoadTrack = m_pNewTrack; - m_pNewTrack.reset(); + pLoadTrack = std::move(m_pNewTrack); + DEBUG_ASSERT(!m_pNewTrack); m_newTrackAvailable = false; } // implicitly unlocks the mutex - loadTrack(pLoadTrack); + loadTrack(std::move(pLoadTrack)); } else if (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { // Read the requested chunk and send the result const ReaderStatusUpdate update(processReadRequest(request)); @@ -121,7 +121,7 @@ void CachingReaderWorker::run() { } } -void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { +void CachingReaderWorker::loadTrack(TrackPointer pTrack) { // Discard all pending read requests CachingReaderChunkReadRequest request; while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { @@ -140,9 +140,6 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { return; } - // Emit that a new track is loading, stops the current track - emit trackLoading(); - QString filename = pTrack->getLocation(); if (filename.isEmpty() || !pTrack->exists()) { kLogger.warning() diff --git a/src/engine/cachingreaderworker.h b/src/engine/cachingreaderworker.h index bf1fa8ded75..6f4059696d4 100644 --- a/src/engine/cachingreaderworker.h +++ b/src/engine/cachingreaderworker.h @@ -101,20 +101,19 @@ class CachingReaderWorker : public EngineWorker { CachingReaderWorker(QString group, FIFO* pChunkReadRequestFIFO, FIFO* pReaderStatusFIFO); - virtual ~CachingReaderWorker(); + ~CachingReaderWorker() override = default; - // Request to load a new track. wake() must be called afterwards. - virtual void newTrack(TrackPointer pTrack); + // Request to load a new track + void newTrack(TrackPointer pTrack); // Run upkeep operations like loading tracks and reading from file. Run by a // thread pool via the EngineWorkerScheduler. - virtual void run(); + void run() override; void quitWait(); signals: // Emitted once a new track is loaded and ready to be read from. - void trackLoading(); void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); void trackLoadFailed(TrackPointer pTrack, QString reason); @@ -130,11 +129,11 @@ class CachingReaderWorker : public EngineWorker { // Queue of Tracks to load, and the corresponding lock. Must acquire the // lock to touch. QMutex m_newTrackMutex; - bool m_newTrackAvailable; TrackPointer m_pNewTrack; + bool m_newTrackAvailable; // Internal method to load a track. Emits trackLoaded when finished. - void loadTrack(const TrackPointer& pTrack); + void loadTrack(TrackPointer pTrack); ReaderStatusUpdate processReadRequest( const CachingReaderChunkReadRequest& request); From 322b44472ded95a47bb2e00112c3d9955c6933d2 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 1 Oct 2019 08:59:00 +0200 Subject: [PATCH 4/6] Fix BaseSignalPathTest --- src/test/signalpathtest.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/signalpathtest.h b/src/test/signalpathtest.h index f5b9adc9f01..05c31135ac3 100644 --- a/src/test/signalpathtest.h +++ b/src/test/signalpathtest.h @@ -129,7 +129,7 @@ class BaseSignalPathTest : public MixxxTest { ProcessBuffer(); EngineDeck* pEngineDeck = pDeck->getEngineDeck(); while (!pEngineDeck->getEngineBuffer()->isTrackLoaded()) { - QTest::qSleep(1); // millis + QCoreApplication::processEvents(); } } From 69485cf5015cf267cce1ba7ad7ae8d0591e9ccac Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 1 Oct 2019 11:49:22 +0200 Subject: [PATCH 5/6] Replace hot loop with buffering and event handler --- src/engine/cachingreader.cpp | 12 +++++++++--- src/engine/cachingreader.h | 3 +++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/engine/cachingreader.cpp b/src/engine/cachingreader.cpp index 2ab562f3596..b3673d403b4 100644 --- a/src/engine/cachingreader.cpp +++ b/src/engine/cachingreader.cpp @@ -200,11 +200,12 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex } void CachingReader::newTrack(TrackPointer pTrack) { - while (m_state == State::TrackLoading) { - // Spin until the previous track has been either - // loaded or unloaded by the worker + if (m_state == State::TrackLoading) { + m_pNewTrack = pTrack; process(); + return; } + m_pNewTrack.reset(); // Feed the track to the worker as soon as possible // to get ready while the reader switches its internal // state. There are no race conditions, because the @@ -294,6 +295,11 @@ void CachingReader::process() { m_readableFrameIndexRange = update.readableFrameIndexRange(); } } + // To terminate the recursion a pending new track must + // only be loaded if no track is currently loading! + if (m_pNewTrack && m_state != State::TrackLoading) { + newTrack(std::move(m_pNewTrack)); + } } CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples, bool reverse, CSAMPLE* buffer) { diff --git a/src/engine/cachingreader.h b/src/engine/cachingreader.h index eba7738ea0f..794832be2da 100644 --- a/src/engine/cachingreader.h +++ b/src/engine/cachingreader.h @@ -186,6 +186,9 @@ class CachingReader : public QObject { mixxx::IndexRange m_readableFrameIndexRange; CachingReaderWorker m_worker; + + // Pending next track while loading a previous track + TrackPointer m_pNewTrack; }; From cd71c6b6aae933db1334b5b0118217afefdfd079 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 2 Oct 2019 09:03:02 +0200 Subject: [PATCH 6/6] Remove QMutex lock between caching reader and worker --- src/engine/cachingreaderworker.cpp | 22 ++-- src/engine/cachingreaderworker.h | 20 +--- src/test/mpscfifotest.cpp | 170 +++++++++++++++++++++++++++++ src/util/mpscfifo.h | 96 ++++++++++++++++ 4 files changed, 278 insertions(+), 30 deletions(-) create mode 100644 src/test/mpscfifotest.cpp create mode 100644 src/util/mpscfifo.h diff --git a/src/engine/cachingreaderworker.cpp b/src/engine/cachingreaderworker.cpp index 1ed53206278..ac86cf729a1 100644 --- a/src/engine/cachingreaderworker.cpp +++ b/src/engine/cachingreaderworker.cpp @@ -25,7 +25,7 @@ CachingReaderWorker::CachingReaderWorker( m_tag(QString("CachingReaderWorker %1").arg(m_group)), m_pChunkReadRequestFIFO(pChunkReadRequestFIFO), m_pReaderStatusFIFO(pReaderStatusFIFO), - m_newTrackAvailable(false), + m_newTrackFifo(MpscFifoConcurrency::SingleProducer), m_stop(0) { } @@ -82,12 +82,10 @@ ReaderStatusUpdate CachingReaderWorker::processReadRequest( return result; } -// WARNING: Always called from a different thread (GUI) void CachingReaderWorker::newTrack(TrackPointer pTrack) { - { - QMutexLocker locker(&m_newTrackMutex); - m_pNewTrack = std::move(pTrack); - m_newTrackAvailable = true; + VERIFY_OR_DEBUG_ASSERT(m_newTrackFifo.enqueue(pTrack)) { + kLogger.critical() + << "No capacity to accept a new track"; } workReady(); } @@ -97,18 +95,12 @@ void CachingReaderWorker::run() { QThread::currentThread()->setObjectName(QString("CachingReaderWorker %1").arg(++id)); Event::start(m_tag); + TrackPointer pNewTrack; while (!load_atomic(m_stop)) { // Request is initialized by reading from FIFO CachingReaderChunkReadRequest request; - if (m_newTrackAvailable) { - TrackPointer pLoadTrack; - { // locking scope - QMutexLocker locker(&m_newTrackMutex); - pLoadTrack = std::move(m_pNewTrack); - DEBUG_ASSERT(!m_pNewTrack); - m_newTrackAvailable = false; - } // implicitly unlocks the mutex - loadTrack(std::move(pLoadTrack)); + if (m_newTrackFifo.dequeue(&pNewTrack)) { + loadTrack(std::move(pNewTrack)); } else if (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { // Read the requested chunk and send the result const ReaderStatusUpdate update(processReadRequest(request)); diff --git a/src/engine/cachingreaderworker.h b/src/engine/cachingreaderworker.h index 6f4059696d4..ab11c4d9397 100644 --- a/src/engine/cachingreaderworker.h +++ b/src/engine/cachingreaderworker.h @@ -1,17 +1,13 @@ -#ifndef ENGINE_CACHINGREADERWORKER_H -#define ENGINE_CACHINGREADERWORKER_H +#pragma once -#include -#include #include -#include -#include #include "engine/cachingreaderchunk.h" -#include "track/track.h" #include "engine/engineworker.h" #include "sources/audiosource.h" +#include "track/track.h" #include "util/fifo.h" +#include "util/mpscfifo.h" // POD with trivial ctor/dtor/copy for passing through FIFO @@ -104,6 +100,7 @@ class CachingReaderWorker : public EngineWorker { ~CachingReaderWorker() override = default; // Request to load a new track + // Invoked from a different but always the same, single thread!! void newTrack(TrackPointer pTrack); // Run upkeep operations like loading tracks and reading from file. Run by a @@ -126,11 +123,7 @@ class CachingReaderWorker : public EngineWorker { FIFO* m_pChunkReadRequestFIFO; FIFO* m_pReaderStatusFIFO; - // Queue of Tracks to load, and the corresponding lock. Must acquire the - // lock to touch. - QMutex m_newTrackMutex; - TrackPointer m_pNewTrack; - bool m_newTrackAvailable; + MpscFifo m_newTrackFifo; // Internal method to load a track. Emits trackLoaded when finished. void loadTrack(TrackPointer pTrack); @@ -154,6 +147,3 @@ class CachingReaderWorker : public EngineWorker { QAtomicInt m_stop; }; - - -#endif /* ENGINE_CACHINGREADERWORKER_H */ diff --git a/src/test/mpscfifotest.cpp b/src/test/mpscfifotest.cpp new file mode 100644 index 00000000000..05516d271cb --- /dev/null +++ b/src/test/mpscfifotest.cpp @@ -0,0 +1,170 @@ +#include + +#include +#include +#include +#include + +#include "util/mpscfifo.h" + +namespace { + +TEST(MpscFifoTest, CapacityOne) { + MpscFifo fifo; + int dequeued = -1; + + EXPECT_TRUE(fifo.enqueue(1)); + EXPECT_FALSE(fifo.enqueue(2)); + EXPECT_FALSE(fifo.enqueue(2)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(1, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(2)); + EXPECT_FALSE(fifo.enqueue(3)); + EXPECT_FALSE(fifo.enqueue(3)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(2, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(3)); + EXPECT_FALSE(fifo.enqueue(4)); + EXPECT_FALSE(fifo.enqueue(4)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(3, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + EXPECT_FALSE(fifo.dequeue(&dequeued)); +} + +TEST(MpscFifoTest, CapacityTwo) { + MpscFifo fifo; + int dequeued = -1; + + EXPECT_TRUE(fifo.enqueue(1)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(1, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(2)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(2, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(3)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(3, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(4)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(4, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(5)); + EXPECT_TRUE(fifo.enqueue(6)); + EXPECT_FALSE(fifo.enqueue(7)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(5, dequeued); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(6, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(7)); + EXPECT_TRUE(fifo.enqueue(8)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(7, dequeued); + EXPECT_TRUE(fifo.enqueue(9)); + EXPECT_FALSE(fifo.enqueue(10)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(8, dequeued); + EXPECT_TRUE(fifo.enqueue(10)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(9, dequeued); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(10, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); +} + +template +class FifoWriter: public QThread { + public: + explicit FifoWriter(MpscFifo* fifo, QAtomicInt* nextValue, int maxValue) + : m_fifo(fifo), + m_nextValue(nextValue), + m_maxValue(maxValue) { + } + virtual ~FifoWriter() = default; + + void run() override { + for (;;) { + int nextEnqueued = m_nextValue->fetchAndAddOrdered(1); + ASSERT_GE(nextEnqueued, 0); + if (nextEnqueued >= m_maxValue) { + return; + } + while (!m_fifo->enqueue(nextEnqueued)) { + QThread::usleep(10); + // repeat and try again + } + } + } + + private: + MpscFifo* const m_fifo; + QAtomicInt* const m_nextValue; + const int m_maxValue; +}; + +// Using 500k values/rounds this test should take no longer +// than 200 ms. If the test doesn't finish at all this will +// also indicate a bug. +TEST(MpscFifoTest, ConcurrentWriters) { + const int kCapacity = 20; + const int kMaxValue = 500000; + + MpscFifo fifo; + + QAtomicInt nextEnqueueValue(0); + FifoWriter writer1(&fifo, &nextEnqueueValue, kMaxValue); + FifoWriter writer2(&fifo, &nextEnqueueValue, kMaxValue); + FifoWriter writer3(&fifo, &nextEnqueueValue, kMaxValue); + + writer1.start(); + writer2.start(); + writer3.start(); + + // Values are enqueued slightly out-of-order and must be + // buffered and reordered by the reader to check their + // validity. + QSet dequeuedBuffer; + int minValue = 0; + while (minValue < kMaxValue) { + int dequeued; + if (fifo.dequeue(&dequeued)) { + // Check that we haven't dequeued the same value twice! + ASSERT_GE(dequeued, minValue); + ASSERT_FALSE(dequeuedBuffer.contains(dequeued)); + if (dequeued == minValue) { + ++minValue; + while (dequeuedBuffer.remove(minValue)) { + ++minValue; + } + } else { + dequeuedBuffer.insert(dequeued); + } + } + } + int dequeued; + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + writer1.wait(); + writer2.wait(); + writer3.wait(); + + EXPECT_TRUE(nextEnqueueValue.load() >= kMaxValue); + EXPECT_FALSE(fifo.dequeue(&dequeued)); +} + +} // namespace diff --git a/src/util/mpscfifo.h b/src/util/mpscfifo.h new file mode 100644 index 00000000000..b1afa3f13b7 --- /dev/null +++ b/src/util/mpscfifo.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include + +#include "util/assert.h" +#include "util/memory.h" + +enum class MpscFifoConcurrency { + SingleProducer, + MultipleProducers, +}; + +// FIFO for multiple producers/writers and a single consumer/reader. Reading +// is lock-free while concurrent writers are synchronized by a mutex. The +// mutex can be disabled by explicitly creating a single producer instance. +template +class MpscFifo { + public: + explicit MpscFifo( + MpscFifoConcurrency concurrency = MpscFifoConcurrency::MultipleProducers) + : m_writeCount(0), + // Initially no items have been written, so all (= capacity) + // available items have been read and none are available. + m_readCount(capacity), + m_writeIndex(0), + m_readIndex(0) { + static_assert(capacity >= 1, "capacity too low"); + static_assert((capacity + 1) > 0, "capacity too high"); + if (concurrency == MpscFifoConcurrency::MultipleProducers) { + m_writeMutex = std::make_unique(); + } + } + + // Writers from multiple threads may enqueue items concurrently. + // The argument is passed by value, because it is consumed by + // this operation on success. The situation that the queue is + // full and the operation fails by returning false is not expected + // to happen frequently. + bool enqueue(T value) { + if (m_writeCount.fetchAndAddAcquire(1) >= capacity) { + // No slots available for writing -> Undo changes and abort + m_writeCount.fetchAndAddRelease(-1); + return false; + } + { + QMutexLocker locked(m_writeMutex.get()); + DEBUG_ASSERT(m_writeIndex >= 0); + DEBUG_ASSERT(m_writeIndex <= capacity); + m_buffer[m_writeIndex] = std::move(value); + m_writeIndex = nextIndex(m_writeIndex); + } + // Finally allow the reader to access the enqueued buffer slot + m_readCount.fetchAndAddRelease(-1); + return true; + } + + // Only a single reader at a time is allowed to dequeue items. + // TODO(C++17): Use std::optional as the return value + bool dequeue(T* value) { + if (m_readCount.fetchAndAddAcquire(1) >= capacity) { + // No slots available for reading -> Undo changes and abort + m_readCount.fetchAndAddRelease(-1); + return false; + } + DEBUG_ASSERT(m_readIndex >= 0); + DEBUG_ASSERT(m_readIndex <= capacity); + *value = std::move(m_buffer[m_readIndex]); + m_readIndex = nextIndex(m_readIndex); + // Finally allow writers to overwrite the dequeued buffer slot + m_writeCount.fetchAndAddRelease(-1); + return true; + } + + private: + static int nextIndex(int index) { + return (index + 1) % (capacity + 1); + } + + // One additional slot is needed to decouple writers from the single reader + T m_buffer[capacity + 1]; + + // Both writers and the reader have a different view on the utilization of + // the queue. The writers and readers respectively use acquire or release + // memory ordering semantics according to their role for lock-free + // coordination. + QAtomicInt m_writeCount; + QAtomicInt m_readCount; + + // Only a single writer is allowed at a time. Otherwise stale reads may + // occur if a writer is delayed while accessing m_buffer! + std::unique_ptr m_writeMutex; + + int m_writeIndex; + int m_readIndex; +};