diff --git a/build/depends.py b/build/depends.py index cdbdc826bfe..50c46890b1e 100644 --- a/build/depends.py +++ b/build/depends.py @@ -500,6 +500,7 @@ def sources(self, build): "engine/clockcontrol.cpp", "engine/readaheadmanager.cpp", "cachingreader.cpp", + "cachingreaderworker.cpp", "analyserrg.cpp", "analyserqueue.cpp", diff --git a/src/cachingreader.cpp b/src/cachingreader.cpp index cf3f6880814..8f77cde2330 100644 --- a/src/cachingreader.cpp +++ b/src/cachingreader.cpp @@ -11,72 +11,30 @@ #include "trackinfoobject.h" #include "soundsourceproxy.h" #include "sampleutil.h" - - -// There's a little math to this, but not much: 48khz stereo audio is 384kb/sec -// if using float samples. We want the chunk size to be a power of 2 so it's -// easier to memory align, and roughly 1/2 - 1/4th of a second of audio. 2**17 -// and 2**16 are nice candidates. 2**16 is 170ms of audio, which is well above -// (hopefully) the latencies people are seeing. at 10ms latency, one chunk is -// enough for 17 callbacks. We may need to tweak this later. - -// Must be divisible by 8, 4, and 2. Just pick a power of 2. -#define CHUNK_LENGTH 65536 -//#define CHUNK_LENGTH 524288 - -const int CachingReader::kChunkLength = CHUNK_LENGTH; -const int CachingReader::kSamplesPerChunk = CHUNK_LENGTH / sizeof(CSAMPLE); - -CachingReader::CachingReader(const char* _group, - ConfigObject* _config) : - m_pGroup(_group), - m_pConfig(_config), - m_chunkReadRequestFIFO(1024), - m_readerStatusFIFO(1024), - m_readerStatus(INVALID), - m_mruChunk(NULL), - m_lruChunk(NULL), - m_pRawMemoryBuffer(NULL), - m_pCurrentSoundSource(NULL), - m_iTrackSampleRate(0), - m_iTrackNumSamples(0), - m_iTrackNumSamplesCallbackSafe(0), - m_pSample(NULL) { - initialize(); -} - -CachingReader::~CachingReader() { - m_freeChunks.clear(); - m_allocatedChunks.clear(); - m_lruChunk = m_mruChunk = NULL; - - for (int i=0; i < m_chunks.size(); i++) { - Chunk* c = m_chunks[i]; - delete c; - } - - delete [] m_pSample; - - delete [] m_pRawMemoryBuffer; - m_pRawMemoryBuffer = NULL; - - delete m_pCurrentSoundSource; -} - -void CachingReader::initialize() { +#include "util/counter.h" + + +CachingReader::CachingReader(const char* group, + ConfigObject* config) + : m_pConfig(config), + m_chunkReadRequestFIFO(1024), + m_readerStatusFIFO(1024), + m_readerStatus(INVALID), + m_mruChunk(NULL), + m_lruChunk(NULL), + m_pRawMemoryBuffer(NULL), + m_iTrackNumSamplesCallbackSafe(0) { int memory_to_use = 5000000; // 5mb, TODO - Q_ASSERT(memory_to_use >= kChunkLength); + Q_ASSERT(memory_to_use >= CachingReaderWorker::kChunkLength); // Only allocate as many bytes as we will actually use. - memory_to_use -= (memory_to_use % kChunkLength); + memory_to_use -= (memory_to_use % CachingReaderWorker::kChunkLength); - m_pSample = new SAMPLE[kSamplesPerChunk]; - - int total_chunks = memory_to_use / kChunkLength; + int total_chunks = memory_to_use / CachingReaderWorker::kChunkLength; //qDebug() << "CachingReader using" << memory_to_use << "bytes."; - int rawMemoryBufferLength = kSamplesPerChunk * total_chunks; + int rawMemoryBufferLength = CachingReaderWorker::kSamplesPerChunk * total_chunks; m_pRawMemoryBuffer = new CSAMPLE[rawMemoryBufferLength]; m_allocatedChunks.reserve(total_chunks); @@ -97,8 +55,39 @@ void CachingReader::initialize() { m_chunks.push_back(c); m_freeChunks.push_back(c); - bufferStart += kSamplesPerChunk; + bufferStart += CachingReaderWorker::kSamplesPerChunk; } + + m_pWorker = new CachingReaderWorker(group, + &m_chunkReadRequestFIFO, + &m_readerStatusFIFO); + + // Forward signals from worker + connect(m_pWorker, SIGNAL(trackLoading()), + this, SIGNAL(trackLoading()), + Qt::DirectConnection); + connect(m_pWorker, SIGNAL(trackLoaded(TrackPointer, int, int)), + this, SIGNAL(trackLoaded(TrackPointer, int, int)), + Qt::DirectConnection); + connect(m_pWorker, SIGNAL(trackLoadFailed(TrackPointer, QString)), + this, SIGNAL(trackLoadFailed(TrackPointer, QString)), + Qt::DirectConnection); +} + + +CachingReader::~CachingReader() { + delete m_pWorker; + m_freeChunks.clear(); + m_allocatedChunks.clear(); + m_lruChunk = m_mruChunk = NULL; + + for (int i=0; i < m_chunks.size(); i++) { + Chunk* c = m_chunks[i]; + delete c; + } + + delete [] m_pRawMemoryBuffer; + m_pRawMemoryBuffer = NULL; } // static @@ -239,54 +228,9 @@ Chunk* CachingReader::lookupChunk(int chunk_number) { return chunk; } -void CachingReader::processChunkReadRequest(ChunkReadRequest* request, - ReaderStatusUpdate* update) { - int chunk_number = request->chunk->chunk_number; - //qDebug() << "Processing ChunkReadRequest for" << chunk_number; - update->chunk = request->chunk; - update->chunk->length = 0; - - if (m_pCurrentSoundSource == NULL || chunk_number < 0) { - update->status = CHUNK_READ_INVALID; - return; - } - - // Stereo samples - int sample_position = sampleForChunk(chunk_number); - int samples_remaining = m_iTrackNumSamples - sample_position; - int samples_to_read = math_min(kSamplesPerChunk, samples_remaining); - - // Bogus chunk number - if (samples_to_read <= 0) { - update->status = CHUNK_READ_EOF; - return; - } - - m_pCurrentSoundSource->seek(sample_position); - int samples_read = m_pCurrentSoundSource->read(samples_to_read, - m_pSample); - - // If we've run out of music, the SoundSource can return 0 samples. - // Remember that SoundSourc->getLength() (which is m_iTrackNumSamples) can - // lie to us about the length of the song! - if (samples_read <= 0) { - update->status = CHUNK_READ_EOF; - return; - } - - // TODO(XXX) This loop can't be done with a memcpy, but could be done with - // SSE. - CSAMPLE* buffer = request->chunk->data; - //qDebug() << "Reading into " << buffer; - SampleUtil::convert(buffer, m_pSample, samples_read); - update->status = CHUNK_READ_SUCCESS; - update->chunk->length = samples_read; -} - void CachingReader::newTrack(TrackPointer pTrack) { - m_newTrackMutex.lock(); - m_newTrack = pTrack; - m_newTrackMutex.unlock(); + m_pWorker->newTrack(pTrack); + m_pWorker->wake(); } void CachingReader::process() { @@ -422,15 +366,16 @@ int CachingReader::read(int sample, int num_samples, CSAMPLE* buffer) { // If the chunk is not in cache, then we must return an error. if (current == NULL) { // qDebug() << "Couldn't get chunk " << chunk_num - // << " in read() of [" << sample << "," << sample+num_samples + // << " in read() of [" << sample << "," << sample + num_samples // << "] chunks " << start_chunk << "-" << end_chunk; // Something is wrong. Break out of the loop, that should fill the // samples requested with zeroes. + Counter("CachingReader::read cache miss")++; break; } - int chunk_start_sample = sampleForChunk(chunk_num); + int chunk_start_sample = CachingReaderWorker::sampleForChunk(chunk_num); int chunk_offset = current_sample - chunk_start_sample; int chunk_remaining_samples = current->length - chunk_offset; @@ -506,13 +451,13 @@ int CachingReader::read(int sample, int num_samples, CSAMPLE* buffer) { return zerosWritten + num_samples - samples_remaining; } -void CachingReader::hintAndMaybeWake(QList& hintList) { +void CachingReader::hintAndMaybeWake(const QVector& hintList) { // If no file is loaded, skip. if (m_readerStatus != TRACK_LOADED) { return; } - QListIterator iterator(hintList); + QVectorIterator iterator(hintList); // To prevent every bit of code having to guess how many samples // forward it makes sense to keep in memory, the hinter can provide @@ -544,10 +489,10 @@ void CachingReader::hintAndMaybeWake(QList& hintList) { continue; } int start_sample = math_max(0, math_min( - m_iTrackNumSamplesCallbackSafe, hint.sample)); + m_iTrackNumSamplesCallbackSafe, hint.sample)); int start_chunk = chunkForSample(start_sample); int end_sample = math_max(0, math_min( - m_iTrackNumSamplesCallbackSafe, hint.sample + hint.length - 1)); + m_iTrackNumSamplesCallbackSafe, hint.sample + hint.length - 1)); int end_chunk = chunkForSample(end_sample); for (int current = start_chunk; current <= end_chunk; ++current) { @@ -587,100 +532,6 @@ void CachingReader::hintAndMaybeWake(QList& hintList) { // If there are chunks to be read, wake up. if (shouldWake) { - wake(); - } -} - -void CachingReader::run() { - TrackPointer pLoadTrack; - - m_newTrackMutex.lock(); - if (m_newTrack) { - pLoadTrack = m_newTrack; - m_newTrack = TrackPointer(); - } - m_newTrackMutex.unlock(); - - if (pLoadTrack) { - loadTrack(pLoadTrack); - } else { - // Read the requested chunks. - ChunkReadRequest request; - ReaderStatusUpdate status; - while (m_chunkReadRequestFIFO.read(&request, 1) == 1) { - processChunkReadRequest(&request, &status); - m_readerStatusFIFO.writeBlocking(&status, 1); - } + m_pWorker->wake(); } - - // Notify the EngineWorkerScheduler that the work we did is done. - setActive(false); -} - -void CachingReader::wake() { - //qDebug() << m_pGroup << "CachingReader::wake()"; - workReady(); -} - -void CachingReader::loadTrack(TrackPointer pTrack) { - //qDebug() << m_pGroup << "CachingReader::loadTrack() lock acquired for load."; - - // Emit that a new track is loading, stops the current track - emit(trackLoading()); - - ReaderStatusUpdate status; - status.status = TRACK_LOADED; - status.chunk = NULL; - status.trackNumSamples = 0; - - if (m_pCurrentSoundSource != NULL) { - delete m_pCurrentSoundSource; - m_pCurrentSoundSource = NULL; - } - m_iTrackSampleRate = 0; - m_iTrackNumSamples = 0; - - QString filename = pTrack->getLocation(); - - if (filename.isEmpty() || !pTrack->exists()) { - // Must unlock before emitting to avoid deadlock - qDebug() << m_pGroup << "CachingReader::loadTrack() load failed for\"" - << filename << "\", unlocked reader lock"; - status.status = TRACK_NOT_LOADED; - m_readerStatusFIFO.writeBlocking(&status, 1); - emit(trackLoadFailed( - pTrack, QString("The file '%1' could not be found.").arg(filename))); - return; - } - - m_pCurrentSoundSource = new SoundSourceProxy(pTrack); - bool openSucceeded = (m_pCurrentSoundSource->open() == OK); //Open the song for reading - m_iTrackSampleRate = m_pCurrentSoundSource->getSampleRate(); - m_iTrackNumSamples = status.trackNumSamples = - m_pCurrentSoundSource->length(); - - if (!openSucceeded || m_iTrackNumSamples == 0 || m_iTrackSampleRate == 0) { - // Must unlock before emitting to avoid deadlock - qDebug() << m_pGroup << "CachingReader::loadTrack() load failed for\"" - << filename << "\", file invalid, unlocked reader lock"; - status.status = TRACK_NOT_LOADED; - m_readerStatusFIFO.writeBlocking(&status, 1); - emit(trackLoadFailed( - pTrack, QString("The file '%1' could not be loaded.").arg(filename))); - return; - } - - m_readerStatusFIFO.writeBlocking(&status, 1); - - // Clear the chunks to read list. - ChunkReadRequest request; - while (m_chunkReadRequestFIFO.read(&request, 1) == 1) { - qDebug() << "Skipping read request for " << request.chunk->chunk_number; - status.status = CHUNK_READ_INVALID; - status.chunk = request.chunk; - m_readerStatusFIFO.writeBlocking(&status, 1); - } - - // Emit that the track is loaded. - emit(trackLoaded(pTrack, m_iTrackSampleRate, m_iTrackNumSamples)); } diff --git a/src/cachingreader.h b/src/cachingreader.h index 03c6e898d3d..4f473220cd7 100644 --- a/src/cachingreader.h +++ b/src/cachingreader.h @@ -5,25 +5,17 @@ #define CACHINGREADER_H #include -#include -#include -#include #include #include #include #include -#include #include "defs.h" #include "configobject.h" #include "trackinfoobject.h" #include "engine/engineworker.h" #include "util/fifo.h" - -class ControlObjectThread; -namespace Mixxx { - class SoundSource; -} +#include "cachingreaderworker.h" // A Hint is an indication to the CachingReader that a certain section of a // SoundSource will be used 'soon' and so it should be brought into memory by @@ -42,42 +34,6 @@ typedef struct Hint { int priority; } Hint; -// A Chunk is a section of audio that is being cached. The chunk_number can be -// used to figure out the sample number of the first sample in data by using -// sampleForChunk() -typedef struct Chunk { - int chunk_number; - int length; - CSAMPLE* data; - Chunk* prev_lru; - Chunk* next_lru; -} Chunk; - -typedef struct ChunkReadRequest { - Chunk* chunk; - - ChunkReadRequest() { chunk = NULL; } -} ChunkReadRequest; - -enum ReaderStatus { - INVALID, - TRACK_NOT_LOADED, - TRACK_LOADED, - CHUNK_READ_SUCCESS, - CHUNK_READ_EOF, - CHUNK_READ_INVALID -}; - -typedef struct ReaderStatusUpdate { - ReaderStatus status; - Chunk* chunk; - int trackNumSamples; - ReaderStatusUpdate() { - status = INVALID; - chunk = NULL; - } -} ReaderStatusUpdate; - // CachingReader provides a layer on top of a SoundSource for reading samples // from a file. A cache is provided so that repeated reads to a certain section // of a song do not cause disk seeks or unnecessary SoundSource @@ -85,7 +41,7 @@ typedef struct ReaderStatusUpdate { // cache so that areas of a file that will soon be read are present in memory // once they are needed. This can be accomplished by issueing 'hints' to the // reader of areas of a SoundSource that will be read soon. -class CachingReader : public EngineWorker { +class CachingReader : public QObject { Q_OBJECT public: @@ -104,23 +60,16 @@ class CachingReader : public EngineWorker { // that is not in the cache. If any hints do request a chunk not in cache, // then wake the reader so that it can process them. Must only be called // from the engine callback. - virtual void hintAndMaybeWake(QList& hintList); + virtual void hintAndMaybeWake(const QVector& hintList); // Request that the CachingReader load a new track. These requests are // processed in the work thread, so the reader must be woken up via wake() // for this to take effect. virtual void newTrack(TrackPointer pTrack); - // Wake the reader up so that it will process newTrack requests and hints. - virtual void wake(); - - // Run upkeep operations like loading tracks and reading from file. Run by a - // thread pool via the EngineWorkerScheduler. - virtual void run(); - - // A Chunk is a memory-resident section of audio that has been cached. Each - // chunk holds a fixed number of samples given by kSamplesPerChunk. - const static int kChunkLength, kSamplesPerChunk; + void setScheduler(EngineWorkerScheduler* pScheduler) { + m_pWorker->setScheduler(pScheduler); + } signals: // Emitted once a new track is loaded and ready to be read from. @@ -136,19 +85,9 @@ class CachingReader : public EngineWorker { // Given a sample number, return the chunk number corresponding to it. inline static int chunkForSample(int sample_number) { - return sample_number / kSamplesPerChunk; + return sample_number / CachingReaderWorker::kSamplesPerChunk; } - // Given a chunk number, return the start sample number for the chunk. - inline static int sampleForChunk(int chunk_number) { - return chunk_number * kSamplesPerChunk; - } - - // Initialize the reader by creating all the chunks from the RAM provided to - // the CachingReader. - void initialize(); - - const char* m_pGroup; const ConfigObject* m_pConfig; // Thread-safe FIFOs for communication between the engine callback and @@ -156,16 +95,7 @@ class CachingReader : public EngineWorker { FIFO m_chunkReadRequestFIFO; FIFO m_readerStatusFIFO; - // Queue of Tracks to load, and the corresponding lock. Must acquire the - // lock to touch. - QMutex m_newTrackMutex; - TrackPointer m_newTrack; - - //////////////////////////////////////////////////////////////////////////// - // The following may /only/ be called within the engine callback - //////////////////////////////////////////////////////////////////////////// - - // Looks for the provided chunk number in the index of in-memory chunks and + // Looks for the provided chunk number in the index of in-memory chunks and // returns it if it is present. If not, returns NULL. Chunk* lookupChunk(int chunk_number); @@ -201,26 +131,9 @@ class CachingReader : public EngineWorker { // The raw memory buffer which is divided up into chunks. CSAMPLE* m_pRawMemoryBuffer; - //////////////////////////////////////////////////////////////////////////// - // The following may /only/ be called within the reader thread - //////////////////////////////////////////////////////////////////////////// - - // Internal method to load a track. Emits trackLoaded when finished. - void loadTrack(TrackPointer pTrack); - - // Read the given chunk_number from the file into pChunk's data - // buffer. Fills length/sample information about Chunk* as well. - void processChunkReadRequest(ChunkReadRequest* request, - ReaderStatusUpdate* update); - - // The current sound source of the track loaded - Mixxx::SoundSource* m_pCurrentSoundSource; - int m_iTrackSampleRate; - int m_iTrackNumSamples; int m_iTrackNumSamplesCallbackSafe; - // Temporary buffer for reading from SoundSources - SAMPLE* m_pSample; + CachingReaderWorker* m_pWorker; }; diff --git a/src/cachingreaderworker.cpp b/src/cachingreaderworker.cpp new file mode 100644 index 00000000000..873e7a34c57 --- /dev/null +++ b/src/cachingreaderworker.cpp @@ -0,0 +1,208 @@ + +#include + +#include +#include + +#include "controlobject.h" +#include "controlobjectthread.h" + +#include "cachingreaderworker.h" +#include "trackinfoobject.h" +#include "soundsourceproxy.h" +#include "sampleutil.h" + + +// There's a little math to this, but not much: 48khz stereo audio is 384kb/sec +// if using float samples. We want the chunk size to be a power of 2 so it's +// easier to memory align, and roughly 1/2 - 1/4th of a second of audio. 2**17 +// and 2**16 are nice candidates. 2**16 is 170ms of audio, which is well above +// (hopefully) the latencies people are seeing. at 10ms latency, one chunk is +// enough for 17 callbacks. We may need to tweak this later. + +// Must be divisible by 8, 4, and 2. Just pick a power of 2. +#define CHUNK_LENGTH 65536 +//#define CHUNK_LENGTH 524288 + +const int CachingReaderWorker::kChunkLength = CHUNK_LENGTH; +const int CachingReaderWorker::kSamplesPerChunk = CHUNK_LENGTH / sizeof(CSAMPLE); + + + +CachingReaderWorker::CachingReaderWorker(const char* group, + FIFO* pChunkReadRequestFIFO, + FIFO* pReaderStatusFIFO) + : m_pGroup(group), + m_pChunkReadRequestFIFO(pChunkReadRequestFIFO), + m_pReaderStatusFIFO(pReaderStatusFIFO), + m_pCurrentSoundSource(NULL), + m_iTrackSampleRate(0), + m_iTrackNumSamples(0), + m_pSample(NULL) { + int memory_to_use = 5000000; // 5mb, TODO + Q_ASSERT(memory_to_use >= kChunkLength); + + // Only allocate as many bytes as we will actually use. + memory_to_use -= (memory_to_use % kChunkLength); + + m_pSample = new SAMPLE[kSamplesPerChunk]; +} + +CachingReaderWorker::~CachingReaderWorker() { + m_mutexRun.lock(); // do not delete if a run is still scheduled; + // no need for unlock, because object is gone after + + delete [] m_pSample; + + delete m_pCurrentSoundSource; +} + +void CachingReaderWorker::processChunkReadRequest(ChunkReadRequest* request, + ReaderStatusUpdate* update) { + int chunk_number = request->chunk->chunk_number; + //qDebug() << "Processing ChunkReadRequest for" << chunk_number; + update->chunk = request->chunk; + update->chunk->length = 0; + + if (m_pCurrentSoundSource == NULL || chunk_number < 0) { + update->status = CHUNK_READ_INVALID; + return; + } + + // Stereo samples + int sample_position = sampleForChunk(chunk_number); + int samples_remaining = m_iTrackNumSamples - sample_position; + int samples_to_read = math_min(kSamplesPerChunk, samples_remaining); + + // Bogus chunk number + if (samples_to_read <= 0) { + update->status = CHUNK_READ_EOF; + return; + } + + m_pCurrentSoundSource->seek(sample_position); + int samples_read = m_pCurrentSoundSource->read(samples_to_read, + m_pSample); + + // If we've run out of music, the SoundSource can return 0 samples. + // Remember that SoundSourc->getLength() (which is m_iTrackNumSamples) can + // lie to us about the length of the song! + if (samples_read <= 0) { + update->status = CHUNK_READ_EOF; + return; + } + + // TODO(XXX) This loop can't be done with a memcpy, but could be done with + // SSE. + CSAMPLE* buffer = request->chunk->data; + //qDebug() << "Reading into " << buffer; + SampleUtil::convert(buffer, m_pSample, samples_read); + update->status = CHUNK_READ_SUCCESS; + update->chunk->length = samples_read; +} + +// WARNING: Always called from the other thread +void CachingReaderWorker::newTrack(TrackPointer pTrack) { + m_newTrackMutex.lock(); + m_newTrack = pTrack; + m_newTrackMutex.unlock(); +} + +void CachingReaderWorker::run() { + TrackPointer pLoadTrack; + + m_newTrackMutex.lock(); + if (m_newTrack) { + pLoadTrack = m_newTrack; + m_newTrack = TrackPointer(); + } + m_newTrackMutex.unlock(); + + if (pLoadTrack) { + loadTrack(pLoadTrack); + } else { + // Read the requested chunks. + ChunkReadRequest request; + ReaderStatusUpdate status; + while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { + processChunkReadRequest(&request, &status); + m_pReaderStatusFIFO->writeBlocking(&status, 1); + } + } + m_mutexRun.unlock(); // Notify that the work we did is done. +} + +// WARNING: Always called from other thread +void CachingReaderWorker::wake() { + //qDebug() << m_pGroup << "CachingReaderWorker::wake()"; + if(m_mutexRun.tryLock()) { + // tryLock succeeds if not already scheduled and not under destruction + if(!workReady()){ + // not scheduled + m_mutexRun.unlock(); + } + } +} + +void CachingReaderWorker::loadTrack(TrackPointer pTrack) { + //qDebug() << m_pGroup << "CachingReaderWorker::loadTrack() lock acquired for load."; + + // Emit that a new track is loading, stops the current track + emit(trackLoading()); + + ReaderStatusUpdate status; + status.status = TRACK_LOADED; + status.chunk = NULL; + status.trackNumSamples = 0; + + if (m_pCurrentSoundSource != NULL) { + delete m_pCurrentSoundSource; + m_pCurrentSoundSource = NULL; + } + m_iTrackSampleRate = 0; + m_iTrackNumSamples = 0; + + QString filename = pTrack->getLocation(); + + if (filename.isEmpty() || !pTrack->exists()) { + // Must unlock before emitting to avoid deadlock + qDebug() << m_pGroup << "CachingReaderWorker::loadTrack() load failed for\"" + << filename << "\", unlocked reader lock"; + status.status = TRACK_NOT_LOADED; + m_pReaderStatusFIFO->writeBlocking(&status, 1); + emit(trackLoadFailed( + pTrack, QString("The file '%1' could not be found.").arg(filename))); + return; + } + + m_pCurrentSoundSource = new SoundSourceProxy(pTrack); + bool openSucceeded = (m_pCurrentSoundSource->open() == OK); //Open the song for reading + m_iTrackSampleRate = m_pCurrentSoundSource->getSampleRate(); + m_iTrackNumSamples = status.trackNumSamples = + m_pCurrentSoundSource->length(); + + if (!openSucceeded || m_iTrackNumSamples == 0 || m_iTrackSampleRate == 0) { + // Must unlock before emitting to avoid deadlock + qDebug() << m_pGroup << "CachingReaderWorker::loadTrack() load failed for\"" + << filename << "\", file invalid, unlocked reader lock"; + status.status = TRACK_NOT_LOADED; + m_pReaderStatusFIFO->writeBlocking(&status, 1); + emit(trackLoadFailed( + pTrack, QString("The file '%1' could not be loaded.").arg(filename))); + return; + } + + m_pReaderStatusFIFO->writeBlocking(&status, 1); + + // Clear the chunks to read list. + ChunkReadRequest request; + while (m_pChunkReadRequestFIFO->read(&request, 1) == 1) { + qDebug() << "Skipping read request for " << request.chunk->chunk_number; + status.status = CHUNK_READ_INVALID; + status.chunk = request.chunk; + m_pReaderStatusFIFO->writeBlocking(&status, 1); + } + + // Emit that the track is loaded. + emit(trackLoaded(pTrack, m_iTrackSampleRate, m_iTrackNumSamples)); +} diff --git a/src/cachingreaderworker.h b/src/cachingreaderworker.h new file mode 100644 index 00000000000..a5fb70c621b --- /dev/null +++ b/src/cachingreaderworker.h @@ -0,0 +1,121 @@ +#ifndef CACHINGREADERWORKER_H +#define CACHINGREADERWORKER_H + +#include +#include +#include +#include + +#include "trackinfoobject.h" +#include "engine/engineworker.h" +#include "util/fifo.h" + +namespace Mixxx { + class SoundSource; +} + +// A Chunk is a section of audio that is being cached. The chunk_number can be +// used to figure out the sample number of the first sample in data by using +// sampleForChunk() +typedef struct Chunk { + int chunk_number; + int length; + CSAMPLE* data; + Chunk* prev_lru; + Chunk* next_lru; +} Chunk; + +typedef struct ChunkReadRequest { + Chunk* chunk; + + ChunkReadRequest() { chunk = NULL; } +} ChunkReadRequest; + +enum ReaderStatus { + INVALID, + TRACK_NOT_LOADED, + TRACK_LOADED, + CHUNK_READ_SUCCESS, + CHUNK_READ_EOF, + CHUNK_READ_INVALID +}; + +typedef struct ReaderStatusUpdate { + ReaderStatus status; + Chunk* chunk; + int trackNumSamples; + ReaderStatusUpdate() { + status = INVALID; + chunk = NULL; + } +} ReaderStatusUpdate; + +class CachingReaderWorker : public EngineWorker { + Q_OBJECT + + public: + // Construct a CachingReader with the given group. + CachingReaderWorker(const char* group, + FIFO* pChunkReadRequestFIFO, + FIFO* pReaderStatusFIFO); + virtual ~CachingReaderWorker(); + + // Request to load a new track. wake() must be called afer wards. + virtual void newTrack(TrackPointer pTrack); + + // Schedule the Worker + virtual void wake(); + + // Run upkeep operations like loading tracks and reading from file. Run by a + // thread pool via the EngineWorkerScheduler. + virtual void run(); + + // A Chunk is a memory-resident section of audio that has been cached. Each + // chunk holds a fixed number of samples given by kSamplesPerChunk. + const static int kChunkLength, kSamplesPerChunk; + + // Given a chunk number, return the start sample number for the chunk. + inline static int sampleForChunk(int chunk_number) { + return chunk_number * kSamplesPerChunk; + } + + signals: + // Emitted once a new track is loaded and ready to be read from. + void trackLoading(); + void trackLoaded(TrackPointer pTrack, int iSampleRate, int iNumSamples); + void trackLoadFailed(TrackPointer pTrack, QString reason); + + private: + + const char* m_pGroup; + + // Thread-safe FIFOs for communication between the engine callback and + // reader thread. + FIFO* m_pChunkReadRequestFIFO; + FIFO* m_pReaderStatusFIFO; + + // Queue of Tracks to load, and the corresponding lock. Must acquire the + // lock to touch. + QMutex m_newTrackMutex; + TrackPointer m_newTrack; + + // Internal method to load a track. Emits trackLoaded when finished. + void loadTrack(TrackPointer pTrack); + + // Read the given chunk_number from the file into pChunk's data + // buffer. Fills length/sample information about Chunk* as well. + void processChunkReadRequest(ChunkReadRequest* request, + ReaderStatusUpdate* update); + + // The current sound source of the track loaded + Mixxx::SoundSource* m_pCurrentSoundSource; + int m_iTrackSampleRate; + int m_iTrackNumSamples; + + // Temporary buffer for reading from SoundSources + SAMPLE* m_pSample; + QMutex m_mutexRun; +}; + + +#endif /* CACHINGREADERWORKER_H */ diff --git a/src/engine/cuecontrol.cpp b/src/engine/cuecontrol.cpp index 1c43081e6d6..3ecf9c0739c 100644 --- a/src/engine/cuecontrol.cpp +++ b/src/engine/cuecontrol.cpp @@ -534,7 +534,7 @@ void CueControl::hotcuePositionChanged(HotcueControl* pControl, double newPositi } } -void CueControl::hintReader(QList& hintList) { +void CueControl::hintReader(QVector* pHintList) { QMutexLocker lock(&m_mutex); Hint cue_hint; @@ -543,7 +543,7 @@ void CueControl::hintReader(QList& hintList) { cue_hint.sample = m_pCuePoint->get(); cue_hint.length = 0; cue_hint.priority = 10; - hintList.append(cue_hint); + pHintList->append(cue_hint); } for (int i = 0; i < m_iNumHotCues; ++i) { @@ -557,7 +557,7 @@ void CueControl::hintReader(QList& hintList) { cue_hint.sample--; cue_hint.length = 0; cue_hint.priority = 10; - hintList.push_back(cue_hint); + pHintList->push_back(cue_hint); } } } diff --git a/src/engine/cuecontrol.h b/src/engine/cuecontrol.h index 71021b78cf8..4beee976943 100644 --- a/src/engine/cuecontrol.h +++ b/src/engine/cuecontrol.h @@ -86,7 +86,7 @@ class CueControl : public EngineControl { ConfigObject * _config); virtual ~CueControl(); - virtual void hintReader(QList& hintList); + virtual void hintReader(QVector* pHintList); public slots: void trackLoaded(TrackPointer pTrack); diff --git a/src/engine/enginebuffer.cpp b/src/engine/enginebuffer.cpp index 69d76aed34d..354bc081d28 100644 --- a/src/engine/enginebuffer.cpp +++ b/src/engine/enginebuffer.cpp @@ -231,6 +231,9 @@ EngineBuffer::EngineBuffer(const char * _group, ConfigObject * _con df.open(QIODevice::WriteOnly | QIODevice::Text); writer.setDevice(&df); #endif + + + m_hintList.reserve(16); // Avoid reallocation } EngineBuffer::~EngineBuffer() @@ -652,7 +655,7 @@ void EngineBuffer::process(const CSAMPLE *, const CSAMPLE * pOut, const int iBuf m_filepos_play--; } - // Perform scaling of Reader buffer into buffer. +//-> // Perform scaling of Reader buffer into buffer. CSAMPLE* output = m_pScale->getScaled(iBufferSize); double samplesRead = m_pScale->getSamplesRead(); @@ -735,9 +738,11 @@ void EngineBuffer::process(const CSAMPLE *, const CSAMPLE * pOut, const int iBuf bCurBufferPaused = true; } - // Give the Reader hints as to which chunks of the current song we - // really care about. It will try very hard to keep these in memory - hintReader(rate); + if (!bTrackLoading) { + // Give the Reader hints as to which chunks of the current song we + // really care about. It will try very hard to keep these in memory + hintReader(rate); + } const double kSmallRate = 0.005; if (m_bLastBufferPaused && !bCurBufferPaused) { @@ -847,7 +852,7 @@ void EngineBuffer::hintReader(const double dRate) { m_engineLock.lock(); m_hintList.clear(); - m_pReadAheadManager->hintReader(dRate, m_hintList); + m_pReadAheadManager->hintReader(dRate, &m_hintList); //if slipping, hint about virtual position so we're ready for it if (m_bSlipEnabled) { @@ -861,7 +866,7 @@ void EngineBuffer::hintReader(const double dRate) { QListIterator it(m_engineControls); while (it.hasNext()) { EngineControl* pControl = it.next(); - pControl->hintReader(m_hintList); + pControl->hintReader(&m_hintList); } m_pReader->hintAndMaybeWake(m_hintList); @@ -874,7 +879,6 @@ void EngineBuffer::slotLoadTrack(TrackPointer pTrack, bool play) { // trackLoading and then either with trackLoaded or trackLoadFailed signals. m_bPlayAfterLoading = play; m_pReader->newTrack(pTrack); - m_pReader->wake(); } void EngineBuffer::addControl(EngineControl* pControl) { @@ -892,7 +896,7 @@ void EngineBuffer::addControl(EngineControl* pControl) { } void EngineBuffer::bindWorkers(EngineWorkerScheduler* pWorkerScheduler) { - pWorkerScheduler->bindWorker(m_pReader); + m_pReader->setScheduler(pWorkerScheduler); } bool EngineBuffer::isTrackLoaded() { @@ -908,6 +912,7 @@ void EngineBuffer::slotEjectTrack(double v) { } } +/* void EngineBuffer::setReader(CachingReader* pReader) { disconnect(m_pReader, 0, this, 0); delete m_pReader; @@ -923,3 +928,4 @@ void EngineBuffer::setReader(CachingReader* pReader) { this, SLOT(slotTrackLoadFailed(TrackPointer, QString)), Qt::DirectConnection); } +*/ diff --git a/src/engine/enginebuffer.h b/src/engine/enginebuffer.h index 18d8dc74013..0085418707c 100644 --- a/src/engine/enginebuffer.h +++ b/src/engine/enginebuffer.h @@ -116,7 +116,7 @@ class EngineBuffer : public EngineObject TrackPointer getLoadedTrack() const; // For dependency injection of readers. - void setReader(CachingReader* pReader); + //void setReader(CachingReader* pReader); public slots: void slotControlPlay(double); @@ -186,7 +186,7 @@ class EngineBuffer : public EngineObject CachingReader* m_pReader; // List of hints to provide to the CachingReader - QList m_hintList; + QVector m_hintList; /** The current sample to play in the file. */ double m_filepos_play; diff --git a/src/engine/enginecontrol.cpp b/src/engine/enginecontrol.cpp index 2dbb66471bf..cca2b64aa85 100644 --- a/src/engine/enginecontrol.cpp +++ b/src/engine/enginecontrol.cpp @@ -47,7 +47,7 @@ void EngineControl::trackLoaded(TrackPointer) { void EngineControl::trackUnloaded(TrackPointer) { } -void EngineControl::hintReader(QList&) { +void EngineControl::hintReader(QVector*) { } void EngineControl::setEngineMaster(EngineMaster* pEngineMaster) { diff --git a/src/engine/enginecontrol.h b/src/engine/enginecontrol.h index 61f4fa2b601..9f7cd8d4cc6 100644 --- a/src/engine/enginecontrol.h +++ b/src/engine/enginecontrol.h @@ -60,7 +60,7 @@ class EngineControl : public QObject { // hintReader allows the EngineControl to provide hints to the reader to // indicate that the given portion of a song is a potential imminent seek // target. - virtual void hintReader(QList& hintList); + virtual void hintReader(QVector* pHintList); void setEngineMaster(EngineMaster* pEngineMaster); void setEngineBuffer(EngineBuffer* pEngineBuffer); diff --git a/src/engine/engineworker.cpp b/src/engine/engineworker.cpp index f3f1c9c5785..8104d700d0a 100644 --- a/src/engine/engineworker.cpp +++ b/src/engine/engineworker.cpp @@ -4,7 +4,8 @@ #include "engine/engineworker.h" #include "engine/engineworkerscheduler.h" -EngineWorker::EngineWorker() { +EngineWorker::EngineWorker() + : m_pScheduler(NULL) { setAutoDelete(false); } @@ -18,8 +19,10 @@ void EngineWorker::setScheduler(EngineWorkerScheduler* pScheduler) { m_pScheduler = pScheduler; } -void EngineWorker::workReady() { +bool EngineWorker::workReady() { if (m_pScheduler) { m_pScheduler->workerReady(this); + return true; } + return false; } diff --git a/src/engine/engineworker.h b/src/engine/engineworker.h index 95847a6111e..025f4faf138 100644 --- a/src/engine/engineworker.h +++ b/src/engine/engineworker.h @@ -23,22 +23,11 @@ class EngineWorker : public QObject, public QRunnable { virtual void run(); - // Thread-safe, sets whether this EngineWorker is active. - inline void setActive(bool bActive) { - m_isActive = bActive; - } - - // Thread-safe, returns true if this EngineWorker is active. - inline bool isActive() const { - return m_isActive > 0; - } - void setScheduler(EngineWorkerScheduler* pScheduler); - void workReady(); + bool workReady(); private: EngineWorkerScheduler* m_pScheduler; - QAtomicInt m_isActive; }; #endif /* ENGINEWORKER_H */ diff --git a/src/engine/engineworkerscheduler.cpp b/src/engine/engineworkerscheduler.cpp index e3b7ec42dbd..8eff2989750 100644 --- a/src/engine/engineworkerscheduler.cpp +++ b/src/engine/engineworkerscheduler.cpp @@ -22,10 +22,6 @@ EngineWorkerScheduler::~EngineWorkerScheduler() { wait(); } -void EngineWorkerScheduler::bindWorker(EngineWorker* pWorker) { - pWorker->setScheduler(this); -} - void EngineWorkerScheduler::workerReady(EngineWorker* pWorker) { if (pWorker) { // If the write fails, we really can't do much since we should not block @@ -43,8 +39,7 @@ void EngineWorkerScheduler::run() { while (!m_bQuit) { EngineWorker* pWorker = NULL; while (m_scheduleFIFO.read(&pWorker, 1) == 1) { - if (pWorker && !pWorker->isActive()) { - pWorker->setActive(true); + if (pWorker) { m_workerThreadPool.start(pWorker); } } diff --git a/src/engine/engineworkerscheduler.h b/src/engine/engineworkerscheduler.h index 5c165b3c4f7..2cfba5b913c 100644 --- a/src/engine/engineworkerscheduler.h +++ b/src/engine/engineworkerscheduler.h @@ -24,7 +24,6 @@ class EngineWorkerScheduler : public QThread { EngineWorkerScheduler(QObject* pParent=NULL); virtual ~EngineWorkerScheduler(); - void bindWorker(EngineWorker* pWorker); void runWorkers(); void workerReady(EngineWorker* worker); diff --git a/src/engine/loopingcontrol.cpp b/src/engine/loopingcontrol.cpp index 1c61019d7f6..ec498cdf162 100644 --- a/src/engine/loopingcontrol.cpp +++ b/src/engine/loopingcontrol.cpp @@ -273,7 +273,7 @@ double LoopingControl::getTrigger(const double dRate, return kNoTrigger; } -void LoopingControl::hintReader(QList& hintList) { +void LoopingControl::hintReader(QVector* pHintList) { Hint loop_hint; // If the loop is enabled, then this is high priority because we will loop // sometime potentially very soon! The current audio itself is priority 1, @@ -287,20 +287,20 @@ void LoopingControl::hintReader(QList& hintList) { loop_hint.priority = 2; loop_hint.sample = m_iLoopStartSample; loop_hint.length = 0; // Let it issue the default length - hintList.append(loop_hint); + pHintList->append(loop_hint); } if (m_iLoopEndSample >= 0) { loop_hint.priority = 10; loop_hint.sample = m_iLoopEndSample; loop_hint.length = -1; // Let it issue the default (backwards) length - hintList.append(loop_hint); + pHintList->append(loop_hint); } } else { if (m_iLoopStartSample >= 0) { loop_hint.priority = 10; loop_hint.sample = m_iLoopStartSample; loop_hint.length = 0; // Let it issue the default length - hintList.append(loop_hint); + pHintList->append(loop_hint); } } } diff --git a/src/engine/loopingcontrol.h b/src/engine/loopingcontrol.h index 82b53b1b7f3..d5bbc9239ff 100644 --- a/src/engine/loopingcontrol.h +++ b/src/engine/loopingcontrol.h @@ -49,7 +49,7 @@ class LoopingControl : public EngineControl { // hintReader will add to hintList hints both the loop in and loop out // sample, if set. - void hintReader(QList& hintList); + void hintReader(QVector* pHintList); void notifySeek(double dNewPlaypos); diff --git a/src/engine/readaheadmanager.cpp b/src/engine/readaheadmanager.cpp index 44fc71bcb25..b8df9a72aac 100644 --- a/src/engine/readaheadmanager.cpp +++ b/src/engine/readaheadmanager.cpp @@ -153,13 +153,13 @@ void ReadAheadManager::notifySeek(int iSeekPosition) { // } } -void ReadAheadManager::hintReader(double dRate, QList& hintList) { +void ReadAheadManager::hintReader(double dRate, QVector* pHintList) { bool in_reverse = dRate < 0; Hint current_position; // SoundTouch can read up to 2 chunks ahead. Always keep 2 chunks ahead in // cache. - int length_to_cache = 2*CachingReader::kSamplesPerChunk; + int length_to_cache = 2 * CachingReaderWorker::kSamplesPerChunk; current_position.length = length_to_cache; current_position.sample = in_reverse ? @@ -174,7 +174,7 @@ void ReadAheadManager::hintReader(double dRate, QList& hintList) { // top priority, we need to read this data immediately current_position.priority = 1; - hintList.append(current_position); + pHintList->append(current_position); } void ReadAheadManager::addReadLogEntry(double virtualPlaypositionStart, diff --git a/src/engine/readaheadmanager.h b/src/engine/readaheadmanager.h index c852fd57c82..356749f43a7 100644 --- a/src/engine/readaheadmanager.h +++ b/src/engine/readaheadmanager.h @@ -53,7 +53,7 @@ class ReadAheadManager { // hintReader allows the ReadAheadManager to provide hints to the reader to // indicate that the given portion of a song is about to be read. - virtual void hintReader(double dRate, QList& hintList); + virtual void hintReader(double dRate, QVector* hintList); virtual int getEffectiveVirtualPlaypositionFromLog(double currentVirtualPlayposition, double numConsumedSamples); diff --git a/src/soundsourcemp3.cpp b/src/soundsourcemp3.cpp index 20c56915ff0..9dd16eaee6f 100644 --- a/src/soundsourcemp3.cpp +++ b/src/soundsourcemp3.cpp @@ -207,12 +207,11 @@ long SoundSourceMp3::seek(long filepos) { return 0; } -// qDebug() << "SEEK " << filepos; + //qDebug() << "SEEK " << filepos; MadSeekFrameType* cur = NULL; - if (filepos==0) - { + if (filepos == 0) { // Seek to beginning of file // Re-init buffer: @@ -227,9 +226,7 @@ long SoundSourceMp3::seek(long filepos) { m_currentSeekFrameIndex = 0; cur = getSeekFrame(0); //frameIterator.toFront(); //Might not need to do this -- Albert June 19/2010 (during Qt3 purge) - } - else - { + } else { //qDebug() << "seek precise"; // Perform precise seek accomplished by using a frame in the seek list @@ -245,8 +242,7 @@ long SoundSourceMp3::seek(long filepos) { */ int framePos = findFrame(filepos); - if (framePos==0 || framePos>filepos || m_currentSeekFrameIndex < 5) - { + if (framePos == 0 || framePos > filepos || m_currentSeekFrameIndex < 5) { //qDebug() << "Problem finding good seek frame (wanted " << filepos << ", got " << framePos << "), starting from 0"; // Re-init buffer: @@ -259,12 +255,10 @@ long SoundSourceMp3::seek(long filepos) { rest = -1; m_currentSeekFrameIndex = 0; cur = getSeekFrame(m_currentSeekFrameIndex); - } - else - { -// qDebug() << "frame pos " << cur->pos; + } else { + //qDebug() << "frame pos " << cur->pos; - // Start four frame before wanted frame to get in sync... + // Start four frames before wanted frame to get in sync... m_currentSeekFrameIndex -= 4; cur = getSeekFrame(m_currentSeekFrameIndex); if (cur != NULL) { @@ -283,10 +277,10 @@ long SoundSourceMp3::seek(long filepos) { mad_frame_mute(Frame); // Decode the three frames before - mad_frame_decode(Frame,Stream); - mad_frame_decode(Frame,Stream); - mad_frame_decode(Frame,Stream); - mad_frame_decode(Frame,Stream); + mad_frame_decode(Frame, Stream); + mad_frame_decode(Frame, Stream); + mad_frame_decode(Frame, Stream); + mad_frame_decode(Frame, Stream); // this is also explained in the above mad-dev post mad_synth_frame(Synth, Frame); @@ -347,7 +341,6 @@ long SoundSourceMp3::seek(long filepos) { // Unfortunately we don't know the exact fileposition. The returned position is thus an // approximation only: return filepos; - } inline long unsigned SoundSourceMp3::length() { @@ -404,16 +397,16 @@ unsigned long SoundSourceMp3::discard(unsigned long samples_wanted) { Total_samples_decoded += 2*(Synth->pcm.length-rest); while (Total_samples_decoded < samples_wanted) { - if (mad_frame_decode(Frame,Stream)) { + if (mad_frame_decode(Frame, Stream)) { if (MAD_RECOVERABLE(Stream->error)) { continue; - } else if(Stream->error==MAD_ERROR_BUFLEN) { + } else if(Stream->error == MAD_ERROR_BUFLEN) { break; } else { break; } } - mad_synth_frame(Synth,Frame); + mad_synth_frame(Synth, Frame); no = math_min(Synth->pcm.length,(samples_wanted-Total_samples_decoded)/2); Total_samples_decoded += 2*no; } diff --git a/src/util/counter.h b/src/util/counter.h index bea6462fa2b..61feabaf183 100644 --- a/src/util/counter.h +++ b/src/util/counter.h @@ -13,6 +13,15 @@ class Counter { Stat::COUNT | Stat::SUM | Stat::AVERAGE | Stat::SAMPLE_VARIANCE | Stat::MIN | Stat::MAX, by); } + Counter& operator+=(int by) { + this->increment(by); + return *this; + } + inline Counter operator++(int) { // postfix + Counter result = *this; + increment(1); + return result; + } private: QString m_tag; };