Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions between caching reader and worker (again) #2308

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions src/engine/cachingreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ CachingReader::CachingReader(QString group,
// The capacity of the back channel must be equal to the number of
// allocated chunks, because the worker use writeBlocking(). Otherwise
// the worker could get stuck in a hot loop!!!
m_stateFIFO(kNumberOfCachedChunksInMemory),
m_readerStatusUpdateFIFO(kNumberOfCachedChunksInMemory),
m_state(State::Idle),
m_mruCachingReaderChunk(nullptr),
m_lruCachingReaderChunk(nullptr),
m_sampleBuffer(CachingReaderChunk::kSamples * kNumberOfCachedChunksInMemory),
m_worker(group, &m_chunkReadRequestFIFO, &m_stateFIFO) {
m_worker(group, &m_chunkReadRequestFIFO, &m_readerStatusUpdateFIFO) {

m_allocatedCachingReaderChunks.reserve(kNumberOfCachedChunksInMemory);
// Divide up the allocated raw memory buffer into total_chunks
Expand All @@ -78,16 +78,15 @@ CachingReader::CachingReader(QString group,
m_freeChunks.push_back(c);
}

// Forward signals from worker
connect(&m_worker, SIGNAL(trackLoading()),
this, SIGNAL(trackLoading()),
Qt::DirectConnection);
connect(&m_worker, SIGNAL(trackLoaded(TrackPointer, int, int)),
this, SIGNAL(trackLoaded(TrackPointer, int, int)),
Qt::DirectConnection);
connect(&m_worker, SIGNAL(trackLoadFailed(TrackPointer, QString)),
this, SIGNAL(trackLoadFailed(TrackPointer, QString)),
Qt::DirectConnection);
// Handle signals from worker thread
connect(&m_worker,
&CachingReaderWorker::trackLoaded,
this,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work without a direct connection?
Do we have a Qt Main Loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have an event loop then we must not use signals. Period.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

&CachingReader::onTrackLoaded);
connect(&m_worker,
&CachingReaderWorker::trackLoadFailed,
this,
&CachingReader::onTrackLoadFailed);

m_worker.start(QThread::HighPriority);
}
Expand Down Expand Up @@ -201,23 +200,54 @@ CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex
}

void CachingReader::newTrack(TrackPointer pTrack) {
if (m_state == State::TrackLoading) {
m_pNewTrack = pTrack;
process();
return;
}
m_pNewTrack.reset();
// Feed the track to the worker as soon as possible
// to get ready while the reader switches its internal
// state. There are no race conditions, because the
// reader polls the worker.
m_worker.newTrack(pTrack);
m_worker.workReady();
m_worker.newTrack(std::move(pTrack));
// Don't accept any new read requests until the current
// track has been unloaded and the new track has been
// loaded.
m_state = State::TrackLoading;
// Free all chunks with sample data from the current track.
freeAllChunks();
// Emit that a new track is loading, stops the current track
emit trackLoading();
}

void CachingReader::onTrackLoaded(
TrackPointer pTrack,
int iSampleRate,
int iNumSamples) {
// Consume all pending messages
process();
// Forward signal depending on the current state
if (m_state == State::TrackLoaded) {
emit trackLoaded(std::move(pTrack), iSampleRate, iNumSamples);
} else {
kLogger.warning()
<< "No track loaded";
}
}

void CachingReader::onTrackLoadFailed(
TrackPointer pTrack,
QString reason) {
// Consume all pending messages
process();
// Forward signal independent of the current state
emit trackLoadFailed(std::move(pTrack), reason);
}

void CachingReader::process() {
ReaderStatusUpdate update;
while (m_stateFIFO.read(&update, 1) == 1) {
while (m_readerStatusUpdateFIFO.read(&update, 1) == 1) {
DEBUG_ASSERT(m_state != State::Idle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion may occur after ejecting a track and loading a new one.
Was this the one you have experienced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments to the wrong/closed PR: #2305 (review)

This is the debug assertion that helped me discover the race condition. I refuse to remove it because it is correct unless someone provides a counterexample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked the old version in 3e9e2ae before the fixes and I still don't see a reason why this debug assertion is not valid?

auto pChunk = update.takeFromWorker();
if (pChunk) {
Expand Down Expand Up @@ -265,6 +295,11 @@ void CachingReader::process() {
m_readableFrameIndexRange = update.readableFrameIndexRange();
}
}
// To terminate the recursion a pending new track must
// only be loaded if no track is currently loading!
if (m_pNewTrack && m_state != State::TrackLoading) {
newTrack(std::move(m_pNewTrack));
}
}

CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples, bool reverse, CSAMPLE* buffer) {
Expand Down Expand Up @@ -542,6 +577,7 @@ void CachingReader::hintAndMaybeWake(const HintVector& hintList) {
<< "Requesting read of chunk"
<< request.chunk;
}
DEBUG_ASSERT(m_state == State::TrackLoaded);
if (m_chunkReadRequestFIFO.write(&request, 1) != 1) {
kLogger.warning()
<< "Failed to submit read request for chunk"
Expand Down
11 changes: 10 additions & 1 deletion src/engine/cachingreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,21 @@ class CachingReader : public QObject {
// Emitted once a new track is loaded and ready to be read from.
void trackLoading();
void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples);

// Forwarded from the worker
void trackLoadFailed(TrackPointer pTrack, QString reason);

private slots:
void onTrackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples);
void onTrackLoadFailed(TrackPointer pTrack, QString reason);

private:
const UserSettingsPointer m_pConfig;

// Thread-safe FIFOs for communication between the engine callback and
// reader thread.
FIFO<CachingReaderChunkReadRequest> m_chunkReadRequestFIFO;
FIFO<ReaderStatusUpdate> m_stateFIFO;
FIFO<ReaderStatusUpdate> m_readerStatusUpdateFIFO;

// Looks for the provided chunk number in the index of in-memory chunks and
// returns it if it is present. If not, returns nullptr. If it is present then
Expand Down Expand Up @@ -180,6 +186,9 @@ class CachingReader : public QObject {
mixxx::IndexRange m_readableFrameIndexRange;

CachingReaderWorker m_worker;

// Pending next track while loading a previous track
TrackPointer m_pNewTrack;
};


Expand Down
39 changes: 14 additions & 25 deletions src/engine/cachingreaderworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ CachingReaderWorker::CachingReaderWorker(
m_tag(QString("CachingReaderWorker %1").arg(m_group)),
m_pChunkReadRequestFIFO(pChunkReadRequestFIFO),
m_pReaderStatusFIFO(pReaderStatusFIFO),
m_newTrackAvailable(false),
m_newTrackFifo(MpscFifoConcurrency::SingleProducer),
m_stop(0) {
}

CachingReaderWorker::~CachingReaderWorker() {
}

ReaderStatusUpdate CachingReaderWorker::processReadRequest(
const CachingReaderChunkReadRequest& request) {
CachingReaderChunk* pChunk = request.chunk;
Expand Down Expand Up @@ -85,30 +82,25 @@ ReaderStatusUpdate CachingReaderWorker::processReadRequest(
return result;
}

// WARNING: Always called from a different thread (GUI)
void CachingReaderWorker::newTrack(TrackPointer pTrack) {
QMutexLocker locker(&m_newTrackMutex);
m_pNewTrack = pTrack;
m_newTrackAvailable = true;
VERIFY_OR_DEBUG_ASSERT(m_newTrackFifo.enqueue(pTrack)) {
kLogger.critical()
<< "No capacity to accept a new track";
}
workReady();
}

void CachingReaderWorker::run() {
unsigned static id = 0; //the id of this thread, for debugging purposes
QThread::currentThread()->setObjectName(QString("CachingReaderWorker %1").arg(++id));

Event::start(m_tag);
TrackPointer pNewTrack;
while (!load_atomic(m_stop)) {
// Request is initialized by reading from FIFO
CachingReaderChunkReadRequest request;
if (m_newTrackAvailable) {
TrackPointer pLoadTrack;
{ // locking scope
QMutexLocker locker(&m_newTrackMutex);
pLoadTrack = m_pNewTrack;
m_pNewTrack.reset();
m_newTrackAvailable = false;
} // implicitly unlocks the mutex
loadTrack(pLoadTrack);
if (m_newTrackFifo.dequeue(&pNewTrack)) {
loadTrack(std::move(pNewTrack));
} else if (m_pChunkReadRequestFIFO->read(&request, 1) == 1) {
// Read the requested chunk and send the result
const ReaderStatusUpdate update(processReadRequest(request));
Expand All @@ -121,7 +113,7 @@ void CachingReaderWorker::run() {
}
}

void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
void CachingReaderWorker::loadTrack(TrackPointer pTrack) {
// Discard all pending read requests
CachingReaderChunkReadRequest request;
while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) {
Expand All @@ -135,21 +127,18 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {

if (!pTrack) {
// If no new track is available then we are done
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
return;
}

// Emit that a new track is loading, stops the current track
emit trackLoading();

QString filename = pTrack->getLocation();
if (filename.isEmpty() || !pTrack->exists()) {
kLogger.warning()
<< m_group
<< "File not found"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be found.")
Expand All @@ -165,7 +154,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
<< m_group
<< "Failed to open file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' could not be loaded").arg(filename));
Expand All @@ -182,7 +171,7 @@ void CachingReaderWorker::loadTrack(const TrackPointer& pTrack) {
<< m_group
<< "Failed to open empty file"
<< filename;
const auto update = ReaderStatusUpdate::trackNotLoaded();
const auto update = ReaderStatusUpdate::trackUnloaded();
m_pReaderStatusFIFO->writeBlocking(&update, 1);
emit trackLoadFailed(
pTrack, QString("The file '%1' is empty and could not be loaded").arg(filename));
Expand Down
33 changes: 11 additions & 22 deletions src/engine/cachingreaderworker.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
#ifndef ENGINE_CACHINGREADERWORKER_H
#define ENGINE_CACHINGREADERWORKER_H
#pragma once

#include <QtDebug>
#include <QMutex>
#include <QSemaphore>
#include <QThread>
#include <QString>

#include "engine/cachingreaderchunk.h"
#include "track/track.h"
#include "engine/engineworker.h"
#include "sources/audiosource.h"
#include "track/track.h"
#include "util/fifo.h"
#include "util/mpscfifo.h"


// POD with trivial ctor/dtor/copy for passing through FIFO
Expand Down Expand Up @@ -69,7 +65,7 @@ typedef struct ReaderStatusUpdate {
return update;
}

static ReaderStatusUpdate trackNotLoaded() {
static ReaderStatusUpdate trackUnloaded() {
ReaderStatusUpdate update;
update.init(TRACK_UNLOADED, nullptr, mixxx::IndexRange());
return update;
Expand Down Expand Up @@ -101,20 +97,20 @@ class CachingReaderWorker : public EngineWorker {
CachingReaderWorker(QString group,
FIFO<CachingReaderChunkReadRequest>* pChunkReadRequestFIFO,
FIFO<ReaderStatusUpdate>* pReaderStatusFIFO);
virtual ~CachingReaderWorker();
~CachingReaderWorker() override = default;

// Request to load a new track. wake() must be called afterwards.
virtual void newTrack(TrackPointer pTrack);
// Request to load a new track
// Invoked from a different but always the same, single thread!!
void newTrack(TrackPointer pTrack);

// Run upkeep operations like loading tracks and reading from file. Run by a
// thread pool via the EngineWorkerScheduler.
virtual void run();
void run() override;

void quitWait();

signals:
// Emitted once a new track is loaded and ready to be read from.
void trackLoading();
void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples);
void trackLoadFailed(TrackPointer pTrack, QString reason);

Expand All @@ -127,14 +123,10 @@ class CachingReaderWorker : public EngineWorker {
FIFO<CachingReaderChunkReadRequest>* m_pChunkReadRequestFIFO;
FIFO<ReaderStatusUpdate>* m_pReaderStatusFIFO;

// Queue of Tracks to load, and the corresponding lock. Must acquire the
// lock to touch.
QMutex m_newTrackMutex;
bool m_newTrackAvailable;
TrackPointer m_pNewTrack;
MpscFifo<TrackPointer, 1> m_newTrackFifo;

// Internal method to load a track. Emits trackLoaded when finished.
void loadTrack(const TrackPointer& pTrack);
void loadTrack(TrackPointer pTrack);

ReaderStatusUpdate processReadRequest(
const CachingReaderChunkReadRequest& request);
Expand All @@ -155,6 +147,3 @@ class CachingReaderWorker : public EngineWorker {

QAtomicInt m_stop;
};


#endif /* ENGINE_CACHINGREADERWORKER_H */
Loading