diff --git a/src/engine/cachingreader.cpp b/src/engine/cachingreader.cpp index 10ee36fbf0a..b3673d403b4 100644 --- a/src/engine/cachingreader.cpp +++ b/src/engine/cachingreader.cpp @@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group, // The capacity of the back channel must be equal to the number of // allocated chunks, because the worker use writeBlocking(). Otherwise // the worker could get stuck in a hot loop!!! - m_stateFIFO(kNumberOfCachedChunksInMemory), + m_readerStatusUpdateFIFO(kNumberOfCachedChunksInMemory), m_state(State::Idle), m_mruCachingReaderChunk(nullptr), m_lruCachingReaderChunk(nullptr), m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory), - m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) { + m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusUpdateFIFO) { m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory); // Divide up the allocated raw memory buffer into total_chunks @@ -78,16 +78,15 @@ CachingReader::CachingReader(QString group, m_freeChunks.push_back(c); } - // Forward signals from worker - connect(&m_worker, SIGNAL(trackLoading()), - this, SIGNAL(trackLoading()), - Qt::DirectConnection); - connect(&m_worker, SIGNAL(trackLoaded(TrackPointer, int, int)), - this, SIGNAL(trackLoaded(TrackPointer, int, int)), - Qt::DirectConnection); - connect(&m_worker, SIGNAL(trackLoadFailed(TrackPointer, QString)), - this, SIGNAL(trackLoadFailed(TrackPointer, QString)), - Qt::DirectConnection); + // Handle signals from worker thread + connect(&m_worker, + &CachingReaderWorker::trackLoaded, + this, + &CachingReader::onTrackLoaded); + connect(&m_worker, + &CachingReaderWorker::trackLoadFailed, + this, + &CachingReader::onTrackLoadFailed); m_worker.start(QThread::HighPriority); } @@ -201,23 +200,54 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex } void CachingReader::newTrack(TrackPointer pTrack) { + if (m_state == State::TrackLoading) { + m_pNewTrack = pTrack; + process(); + return; + } + m_pNewTrack.reset(); // Feed the track to the worker as soon as possible // to get ready while the reader switches its internal // state. There are no race conditions, because the // reader polls the worker. - m_worker.newTrack(pTrack); - m_worker.workReady(); + m_worker.newTrack(std::move(pTrack)); // Don't accept any new read requests until the current // track has been unloaded and the new track has been // loaded. m_state = State::TrackLoading; // Free all chunks with sample data from the current track. freeAllChunks(); + // Emit that a new track is loading, stops the current track + emit trackLoading(); +} + +void CachingReader::onTrackLoaded( + TrackPointer pTrack, + int iSampleRate, + int iNumSamples) { + // Consume all pending messages + process(); + // Forward signal depending on the current state + if (m_state == State::TrackLoaded) { + emit trackLoaded(std::move(pTrack), iSampleRate, iNumSamples); + } else { + kLogger.warning() + << "No track loaded"; + } +} + +void CachingReader::onTrackLoadFailed( + TrackPointer pTrack, + QString reason) { + // Consume all pending messages + process(); + // Forward signal independent of the current state + emit trackLoadFailed(std::move(pTrack), reason); } void CachingReader::process() { ReaderStatusUpdate update; - while (m_stateFIFO.read(&update, 1) == 1) { + while (m_readerStatusUpdateFIFO.read(&update, 1) == 1) { DEBUG_ASSERT(m_state != State::Idle); auto pChunk = update.takeFromWorker(); if (pChunk) { @@ -265,6 +295,11 @@ void CachingReader::process() { m_readableFrameIndexRange = update.readableFrameIndexRange(); } } + // To terminate the recursion a pending new track must + // only be loaded if no track is currently loading! + if (m_pNewTrack && m_state != State::TrackLoading) { + newTrack(std::move(m_pNewTrack)); + } } CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples, bool reverse, CSAMPLE* buffer) { @@ -542,6 +577,7 @@ void CachingReader::hintAndMaybeWake(const HintVector& hintList) { << "Requesting read of chunk" << request.chunk; } + DEBUG_ASSERT(m_state == State::TrackLoaded); if (m_chunkReadRequestFIFO.write(&request, 1) != 1) { kLogger.warning() << "Failed to submit read request for chunk" diff --git a/src/engine/cachingreader.h b/src/engine/cachingreader.h index 1e9ec831fbf..794832be2da 100644 --- a/src/engine/cachingreader.h +++ b/src/engine/cachingreader.h @@ -116,15 +116,21 @@ class CachingReader : public QObject { // Emitted once a new track is loaded and ready to be read from. void trackLoading(); void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); + + // Forwarded from the worker void trackLoadFailed(TrackPointer pTrack, QString reason); + private slots: + void onTrackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); + void onTrackLoadFailed(TrackPointer pTrack, QString reason); + private: const UserSettingsPointer m_pConfig; // Thread-safe FIFOs for communication between the engine callback and // reader thread. FIFO m_chunkReadRequestFIFO; - FIFO m_stateFIFO; + FIFO m_readerStatusUpdateFIFO; // Looks for the provided chunk number in the index of in-memory chunks and // returns it if it is present. If not, returns nullptr. If it is present then @@ -180,6 +186,9 @@ class CachingReader : public QObject { mixxx::IndexRange m_readableFrameIndexRange; CachingReaderWorker m_worker; + + // Pending next track while loading a previous track + TrackPointer m_pNewTrack; }; diff --git a/src/engine/cachingreaderworker.cpp b/src/engine/cachingreaderworker.cpp index dae4f303f3e..ac86cf729a1 100644 --- a/src/engine/cachingreaderworker.cpp +++ b/src/engine/cachingreaderworker.cpp @@ -25,13 +25,10 @@ CachingReaderWorker::CachingReaderWorker( m_tag(QString("CachingReaderWorker %1").arg(m_group)), m_pChunkReadRequestFIFO(pChunkReadRequestFIFO), m_pReaderStatusFIFO(pReaderStatusFIFO), - m_newTrackAvailable(false), + m_newTrackFifo(MpscFifoConcurrency::SingleProducer), m_stop(0) { } -CachingReaderWorker::~CachingReaderWorker() { -} - ReaderStatusUpdate CachingReaderWorker::processReadRequest( const CachingReaderChunkReadRequest& request) { CachingReaderChunk* pChunk = request.chunk; @@ -85,11 +82,12 @@ ReaderStatusUpdate CachingReaderWorker::processReadRequest( return result; } -// WARNING: Always called from a different thread (GUI) void CachingReaderWorker::newTrack(TrackPointer pTrack) { - QMutexLocker locker(&m_newTrackMutex); - m_pNewTrack = pTrack; - m_newTrackAvailable = true; + VERIFY_OR_DEBUG_ASSERT(m_newTrackFifo.enqueue(pTrack)) { + kLogger.critical() + << "No capacity to accept a new track"; + } + workReady(); } void CachingReaderWorker::run() { @@ -97,18 +95,12 @@ void CachingReaderWorker::run() { QThread::currentThread()->setObjectName(QString("CachingReaderWorker %1").arg(++id)); Event::start(m_tag); + TrackPointer pNewTrack; while (!load_atomic(m_stop)) { // Request is initialized by reading from FIFO CachingReaderChunkReadRequest request; - if (m_newTrackAvailable) { - TrackPointer pLoadTrack; - { // locking scope - QMutexLocker locker(&m_newTrackMutex); - pLoadTrack = m_pNewTrack; - m_pNewTrack.reset(); - m_newTrackAvailable = false; - } // implicitly unlocks the mutex - loadTrack(pLoadTrack); + if (m_newTrackFifo.dequeue(&pNewTrack)) { + loadTrack(std::move(pNewTrack)); } else if (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { // Read the requested chunk and send the result const ReaderStatusUpdate update(processReadRequest(request)); @@ -121,7 +113,7 @@ void CachingReaderWorker::run() { } } -void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { +void CachingReaderWorker::loadTrack(TrackPointer pTrack) { // Discard all pending read requests CachingReaderChunkReadRequest request; while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { @@ -135,21 +127,18 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { if (!pTrack) { // If no new track is available then we are done - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); return; } - // Emit that a new track is loading, stops the current track - emit trackLoading(); - QString filename = pTrack->getLocation(); if (filename.isEmpty() || !pTrack->exists()) { kLogger.warning() << m_group << "File not found" << filename; - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); emit trackLoadFailed( pTrack, QString("The file '%1' could not be found.") @@ -165,7 +154,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { << m_group << "Failed to open file" << filename; - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); emit trackLoadFailed( pTrack, QString("The file '%1' could not be loaded").arg(filename)); @@ -182,7 +171,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) { << m_group << "Failed to open empty file" << filename; - const auto update = ReaderStatusUpdate::trackNotLoaded(); + const auto update = ReaderStatusUpdate::trackUnloaded(); m_pReaderStatusFIFO->writeBlocking(&update, 1); emit trackLoadFailed( pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename)); diff --git a/src/engine/cachingreaderworker.h b/src/engine/cachingreaderworker.h index a4c05a2b237..ab11c4d9397 100644 --- a/src/engine/cachingreaderworker.h +++ b/src/engine/cachingreaderworker.h @@ -1,17 +1,13 @@ -#ifndef ENGINE_CACHINGREADERWORKER_H -#define ENGINE_CACHINGREADERWORKER_H +#pragma once -#include -#include #include -#include -#include #include "engine/cachingreaderchunk.h" -#include "track/track.h" #include "engine/engineworker.h" #include "sources/audiosource.h" +#include "track/track.h" #include "util/fifo.h" +#include "util/mpscfifo.h" // POD with trivial ctor/dtor/copy for passing through FIFO @@ -69,7 +65,7 @@ typedef struct ReaderStatusUpdate { return update; } - static ReaderStatusUpdate trackNotLoaded() { + static ReaderStatusUpdate trackUnloaded() { ReaderStatusUpdate update; update.init(TRACK_UNLOADED, nullptr, mixxx::IndexRange()); return update; @@ -101,20 +97,20 @@ class CachingReaderWorker : public EngineWorker { CachingReaderWorker(QString group, FIFO* pChunkReadRequestFIFO, FIFO* pReaderStatusFIFO); - virtual ~CachingReaderWorker(); + ~CachingReaderWorker() override = default; - // Request to load a new track. wake() must be called afterwards. - virtual void newTrack(TrackPointer pTrack); + // Request to load a new track + // Invoked from a different but always the same, single thread!! + void newTrack(TrackPointer pTrack); // Run upkeep operations like loading tracks and reading from file. Run by a // thread pool via the EngineWorkerScheduler. - virtual void run(); + void run() override; void quitWait(); signals: // Emitted once a new track is loaded and ready to be read from. - void trackLoading(); void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); void trackLoadFailed(TrackPointer pTrack, QString reason); @@ -127,14 +123,10 @@ class CachingReaderWorker : public EngineWorker { FIFO* m_pChunkReadRequestFIFO; FIFO* m_pReaderStatusFIFO; - // Queue of Tracks to load, and the corresponding lock. Must acquire the - // lock to touch. - QMutex m_newTrackMutex; - bool m_newTrackAvailable; - TrackPointer m_pNewTrack; + MpscFifo m_newTrackFifo; // Internal method to load a track. Emits trackLoaded when finished. - void loadTrack(const TrackPointer& pTrack); + void loadTrack(TrackPointer pTrack); ReaderStatusUpdate processReadRequest( const CachingReaderChunkReadRequest& request); @@ -155,6 +147,3 @@ class CachingReaderWorker : public EngineWorker { QAtomicInt m_stop; }; - - -#endif /* ENGINE_CACHINGREADERWORKER_H */ diff --git a/src/test/mpscfifotest.cpp b/src/test/mpscfifotest.cpp new file mode 100644 index 00000000000..05516d271cb --- /dev/null +++ b/src/test/mpscfifotest.cpp @@ -0,0 +1,170 @@ +#include + +#include +#include +#include +#include + +#include "util/mpscfifo.h" + +namespace { + +TEST(MpscFifoTest, CapacityOne) { + MpscFifo fifo; + int dequeued = -1; + + EXPECT_TRUE(fifo.enqueue(1)); + EXPECT_FALSE(fifo.enqueue(2)); + EXPECT_FALSE(fifo.enqueue(2)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(1, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(2)); + EXPECT_FALSE(fifo.enqueue(3)); + EXPECT_FALSE(fifo.enqueue(3)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(2, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(3)); + EXPECT_FALSE(fifo.enqueue(4)); + EXPECT_FALSE(fifo.enqueue(4)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(3, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + EXPECT_FALSE(fifo.dequeue(&dequeued)); +} + +TEST(MpscFifoTest, CapacityTwo) { + MpscFifo fifo; + int dequeued = -1; + + EXPECT_TRUE(fifo.enqueue(1)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(1, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(2)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(2, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(3)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(3, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(4)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(4, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(5)); + EXPECT_TRUE(fifo.enqueue(6)); + EXPECT_FALSE(fifo.enqueue(7)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(5, dequeued); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(6, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + EXPECT_TRUE(fifo.enqueue(7)); + EXPECT_TRUE(fifo.enqueue(8)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(7, dequeued); + EXPECT_TRUE(fifo.enqueue(9)); + EXPECT_FALSE(fifo.enqueue(10)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(8, dequeued); + EXPECT_TRUE(fifo.enqueue(10)); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(9, dequeued); + EXPECT_TRUE(fifo.dequeue(&dequeued)); + EXPECT_EQ(10, dequeued); + EXPECT_FALSE(fifo.dequeue(&dequeued)); +} + +template +class FifoWriter: public QThread { + public: + explicit FifoWriter(MpscFifo* fifo, QAtomicInt* nextValue, int maxValue) + : m_fifo(fifo), + m_nextValue(nextValue), + m_maxValue(maxValue) { + } + virtual ~FifoWriter() = default; + + void run() override { + for (;;) { + int nextEnqueued = m_nextValue->fetchAndAddOrdered(1); + ASSERT_GE(nextEnqueued, 0); + if (nextEnqueued >= m_maxValue) { + return; + } + while (!m_fifo->enqueue(nextEnqueued)) { + QThread::usleep(10); + // repeat and try again + } + } + } + + private: + MpscFifo* const m_fifo; + QAtomicInt* const m_nextValue; + const int m_maxValue; +}; + +// Using 500k values/rounds this test should take no longer +// than 200 ms. If the test doesn't finish at all this will +// also indicate a bug. +TEST(MpscFifoTest, ConcurrentWriters) { + const int kCapacity = 20; + const int kMaxValue = 500000; + + MpscFifo fifo; + + QAtomicInt nextEnqueueValue(0); + FifoWriter writer1(&fifo, &nextEnqueueValue, kMaxValue); + FifoWriter writer2(&fifo, &nextEnqueueValue, kMaxValue); + FifoWriter writer3(&fifo, &nextEnqueueValue, kMaxValue); + + writer1.start(); + writer2.start(); + writer3.start(); + + // Values are enqueued slightly out-of-order and must be + // buffered and reordered by the reader to check their + // validity. + QSet dequeuedBuffer; + int minValue = 0; + while (minValue < kMaxValue) { + int dequeued; + if (fifo.dequeue(&dequeued)) { + // Check that we haven't dequeued the same value twice! + ASSERT_GE(dequeued, minValue); + ASSERT_FALSE(dequeuedBuffer.contains(dequeued)); + if (dequeued == minValue) { + ++minValue; + while (dequeuedBuffer.remove(minValue)) { + ++minValue; + } + } else { + dequeuedBuffer.insert(dequeued); + } + } + } + int dequeued; + EXPECT_FALSE(fifo.dequeue(&dequeued)); + + writer1.wait(); + writer2.wait(); + writer3.wait(); + + EXPECT_TRUE(nextEnqueueValue.load() >= kMaxValue); + EXPECT_FALSE(fifo.dequeue(&dequeued)); +} + +} // namespace diff --git a/src/test/signalpathtest.h b/src/test/signalpathtest.h index f5b9adc9f01..05c31135ac3 100644 --- a/src/test/signalpathtest.h +++ b/src/test/signalpathtest.h @@ -129,7 +129,7 @@ class BaseSignalPathTest : public MixxxTest { ProcessBuffer(); EngineDeck* pEngineDeck = pDeck->getEngineDeck(); while (!pEngineDeck->getEngineBuffer()->isTrackLoaded()) { - QTest::qSleep(1); // millis + QCoreApplication::processEvents(); } } diff --git a/src/util/mpscfifo.h b/src/util/mpscfifo.h new file mode 100644 index 00000000000..b1afa3f13b7 --- /dev/null +++ b/src/util/mpscfifo.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include + +#include "util/assert.h" +#include "util/memory.h" + +enum class MpscFifoConcurrency { + SingleProducer, + MultipleProducers, +}; + +// FIFO for multiple producers/writers and a single consumer/reader. Reading +// is lock-free while concurrent writers are synchronized by a mutex. The +// mutex can be disabled by explicitly creating a single producer instance. +template +class MpscFifo { + public: + explicit MpscFifo( + MpscFifoConcurrency concurrency = MpscFifoConcurrency::MultipleProducers) + : m_writeCount(0), + // Initially no items have been written, so all (= capacity) + // available items have been read and none are available. + m_readCount(capacity), + m_writeIndex(0), + m_readIndex(0) { + static_assert(capacity >= 1, "capacity too low"); + static_assert((capacity + 1) > 0, "capacity too high"); + if (concurrency == MpscFifoConcurrency::MultipleProducers) { + m_writeMutex = std::make_unique(); + } + } + + // Writers from multiple threads may enqueue items concurrently. + // The argument is passed by value, because it is consumed by + // this operation on success. The situation that the queue is + // full and the operation fails by returning false is not expected + // to happen frequently. + bool enqueue(T value) { + if (m_writeCount.fetchAndAddAcquire(1) >= capacity) { + // No slots available for writing -> Undo changes and abort + m_writeCount.fetchAndAddRelease(-1); + return false; + } + { + QMutexLocker locked(m_writeMutex.get()); + DEBUG_ASSERT(m_writeIndex >= 0); + DEBUG_ASSERT(m_writeIndex <= capacity); + m_buffer[m_writeIndex] = std::move(value); + m_writeIndex = nextIndex(m_writeIndex); + } + // Finally allow the reader to access the enqueued buffer slot + m_readCount.fetchAndAddRelease(-1); + return true; + } + + // Only a single reader at a time is allowed to dequeue items. + // TODO(C++17): Use std::optional as the return value + bool dequeue(T* value) { + if (m_readCount.fetchAndAddAcquire(1) >= capacity) { + // No slots available for reading -> Undo changes and abort + m_readCount.fetchAndAddRelease(-1); + return false; + } + DEBUG_ASSERT(m_readIndex >= 0); + DEBUG_ASSERT(m_readIndex <= capacity); + *value = std::move(m_buffer[m_readIndex]); + m_readIndex = nextIndex(m_readIndex); + // Finally allow writers to overwrite the dequeued buffer slot + m_writeCount.fetchAndAddRelease(-1); + return true; + } + + private: + static int nextIndex(int index) { + return (index + 1) % (capacity + 1); + } + + // One additional slot is needed to decouple writers from the single reader + T m_buffer[capacity + 1]; + + // Both writers and the reader have a different view on the utilization of + // the queue. The writers and readers respectively use acquire or release + // memory ordering semantics according to their role for lock-free + // coordination. + QAtomicInt m_writeCount; + QAtomicInt m_readCount; + + // Only a single writer is allowed at a time. Otherwise stale reads may + // occur if a writer is delayed while accessing m_buffer! + std::unique_ptr m_writeMutex; + + int m_writeIndex; + int m_readIndex; +};