Skip to content

Commit

Permalink
Merge pull request #2265 from uklotzde/cachingreader_chunks
Browse files Browse the repository at this point in the history
lp1842679: Fix caching of audio data
  • Loading branch information
daschuer authored Sep 15, 2019
2 parents e951464 + 69d2414 commit 4000c66
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 160 deletions.
183 changes: 106 additions & 77 deletions src/engine/cachingreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,21 @@ mixxx::Logger kLogger("CachingReader");
// TODO() Do we suffer cache misses if we use an audio buffer of above 23 ms?
const SINT kDefaultHintFrames = 1024;

// currently CachingReaderWorker::kCachingReaderChunkLength is 65536 (0x10000);
// For 80 chunks we need 5242880 (0x500000) bytes (5 MiB) of Memory
//static
// With CachingReaderChunk::kFrames = 8192 each chunk consumes
// 8192 frames * 2 channels/frame * 4-bytes per sample = 65 kB.
//
// 80 chunks -> 5120 KB = 5 MB
//
// Each deck (including sample decks) will use their own CachingReader.
// Consequently the total memory required for all allocated chunks depends
// on the number of decks. The amount of memory reserved for a single
// CachingReader must be multiplied by the number of decks to calculate
// the total amount!
//
// NOTE(uklotzde, 2019-09-05): Reduce this number to just few chunks
// (kNumberOfCachedChunksInMemory = 1, 2, 3, ...) for testing purposes
// to verify that the MRU/LRU cache works as expected. Even though
// massive drop outs are expected to occur Mixxx should run reliably!
const SINT kNumberOfCachedChunksInMemory = 80;

} // anonymous namespace
Expand All @@ -31,8 +43,20 @@ const SINT kNumberOfCachedChunksInMemory = 80;
CachingReader::CachingReader(QString group,
UserSettingsPointer config)
: m_pConfig(config),
m_chunkReadRequestFIFO(1024),
m_readerStatusFIFO(1024),
// Limit the number of in-flight requests to the worker. This should
// prevent to overload the worker when it is not able to fetch those
// requests from the FIFO timely. Otherwise outdated requests pile up
// in the FIFO and it would take a long time to process them, just to
// discard the results that most likely have already become obsolete.
// TODO(XXX): Ideally the request FIFO would be implemented as a ring
// buffer, where new requests replace old requests when full. Those
// old requests need to be returned immediately to the CachingReader
// that must take ownership and free them!!!
m_chunkReadRequestFIFO(kNumberOfCachedChunksInMemory / 4),
// The capacity of the back channel must be equal to the number of
// allocated chunks, because the worker use writeBlocking(). Otherwise
// the worker could get stuck in a hot loop!!!
m_readerStatusFIFO(kNumberOfCachedChunksInMemory),
m_readerStatus(INVALID),
m_mruCachingReaderChunk(nullptr),
m_lruCachingReaderChunk(nullptr),
Expand Down Expand Up @@ -73,19 +97,24 @@ CachingReader::~CachingReader() {
qDeleteAll(m_chunks);
}

void CachingReader::freeChunkFromList(CachingReaderChunkForOwner* pChunk) {
pChunk->removeFromList(
&m_mruCachingReaderChunk,
&m_lruCachingReaderChunk);
pChunk->free();
m_freeChunks.push_back(pChunk);
}

void CachingReader::freeChunk(CachingReaderChunkForOwner* pChunk) {
DEBUG_ASSERT(pChunk != nullptr);
DEBUG_ASSERT(pChunk);
DEBUG_ASSERT(pChunk->getState() != CachingReaderChunkForOwner::READ_PENDING);

const int removed = m_allocatedCachingReaderChunks.remove(pChunk->getIndex());
// We'll tolerate not being in allocatedCachingReaderChunks,
// because sometime you free a chunk right after you allocated it.
DEBUG_ASSERT(removed <= 1);

pChunk->removeFromList(
&m_mruCachingReaderChunk, &m_lruCachingReaderChunk);
pChunk->free();
m_freeChunks.push_back(pChunk);
freeChunkFromList(pChunk);
}

void CachingReader::freeAllChunks() {
Expand All @@ -97,15 +126,13 @@ void CachingReader::freeAllChunks() {
}

if (pChunk->getState() != CachingReaderChunkForOwner::FREE) {
pChunk->removeFromList(
&m_mruCachingReaderChunk, &m_lruCachingReaderChunk);
pChunk->free();
m_freeChunks.push_back(pChunk);
freeChunkFromList(pChunk);
}
}
DEBUG_ASSERT(!m_mruCachingReaderChunk);
DEBUG_ASSERT(!m_lruCachingReaderChunk);

m_allocatedCachingReaderChunks.clear();
m_mruCachingReaderChunk = nullptr;
}

CachingReaderChunkForOwner* CachingReader::allocateChunk(SINT chunkIndex) {
Expand All @@ -115,62 +142,59 @@ CachingReaderChunkForOwner* CachingReader::allocateChunk(SINT chunkIndex) {
CachingReaderChunkForOwner* pChunk = m_freeChunks.takeFirst();
pChunk->init(chunkIndex);

//kLogger.debug() << "Allocating chunk" << pChunk << pChunk->getIndex();
m_allocatedCachingReaderChunks.insert(chunkIndex, pChunk);

return pChunk;
}

CachingReaderChunkForOwner* CachingReader::allocateChunkExpireLRU(SINT chunkIndex) {
CachingReaderChunkForOwner* pChunk = allocateChunk(chunkIndex);
auto pChunk = allocateChunk(chunkIndex);
if (!pChunk) {
if (m_lruCachingReaderChunk == nullptr) {
kLogger.warning() << "ERROR: No LRU chunk to free in allocateChunkExpireLRU.";
return nullptr;
if (m_lruCachingReaderChunk) {
freeChunk(m_lruCachingReaderChunk);
pChunk = allocateChunk(chunkIndex);
} else {
kLogger.warning() << "No cached LRU chunk available for freeing";
}
freeChunk(m_lruCachingReaderChunk);
pChunk = allocateChunk(chunkIndex);
}
//kLogger.debug() << "allocateChunkExpireLRU" << chunk << pChunk;
if (kLogger.traceEnabled()) {
kLogger.trace() << "allocateChunkExpireLRU" << chunkIndex << pChunk;
}
return pChunk;
}

CachingReaderChunkForOwner* CachingReader::lookupChunk(SINT chunkIndex) {
// Defaults to nullptr if it's not in the hash.
CachingReaderChunkForOwner* chunk = m_allocatedCachingReaderChunks.value(chunkIndex, nullptr);

// Make sure the allocated number matches the indexed chunk number.
DEBUG_ASSERT(chunk == nullptr || chunkIndex == chunk->getIndex());

return chunk;
auto pChunk = m_allocatedCachingReaderChunks.value(chunkIndex, nullptr);
DEBUG_ASSERT(!pChunk || pChunk->getIndex() == chunkIndex);
return pChunk;
}

void CachingReader::freshenChunk(CachingReaderChunkForOwner* pChunk) {
DEBUG_ASSERT(pChunk != nullptr);
DEBUG_ASSERT(pChunk->getState() != CachingReaderChunkForOwner::READ_PENDING);

// Remove the chunk from the LRU list
pChunk->removeFromList(&m_mruCachingReaderChunk, &m_lruCachingReaderChunk);

// Adjust the least-recently-used item before inserting the
// chunk as the new most-recently-used item.
if (m_lruCachingReaderChunk == nullptr) {
if (m_mruCachingReaderChunk == nullptr) {
m_lruCachingReaderChunk = pChunk;
} else {
m_lruCachingReaderChunk = m_mruCachingReaderChunk;
}
DEBUG_ASSERT(pChunk);
DEBUG_ASSERT(pChunk->getState() == CachingReaderChunkForOwner::READY);
if (kLogger.traceEnabled()) {
kLogger.trace()
<< "freshenChunk()"
<< pChunk->getIndex()
<< pChunk;
}

// Insert the chunk as the new most-recently-used item.
pChunk->insertIntoListBefore(m_mruCachingReaderChunk);
m_mruCachingReaderChunk = pChunk;
// Remove the chunk from the MRU/LRU list
pChunk->removeFromList(
&m_mruCachingReaderChunk,
&m_lruCachingReaderChunk);

// Reinsert has new head of MRU list
pChunk->insertIntoListBefore(
&m_mruCachingReaderChunk,
&m_lruCachingReaderChunk,
m_mruCachingReaderChunk);
}

CachingReaderChunkForOwner* CachingReader::lookupChunkAndFreshen(SINT chunkIndex) {
CachingReaderChunkForOwner* pChunk = lookupChunk(chunkIndex);
if (pChunk &&
(pChunk->getState() != CachingReaderChunkForOwner::READ_PENDING)) {
auto pChunk = lookupChunk(chunkIndex);
if (pChunk && (pChunk->getState() == CachingReaderChunkForOwner::READY)) {
freshenChunk(pChunk);
}
return pChunk;
Expand All @@ -182,15 +206,11 @@ void CachingReader::newTrack(TrackPointer pTrack) {
}

void CachingReader::process() {
ReaderStatusUpdate status;
while (m_readerStatusFIFO.read(&status, 1) == 1) {
CachingReaderChunkForOwner* pChunk = static_cast<CachingReaderChunkForOwner*>(status.chunk);
ReaderStatusUpdate update;
while (m_readerStatusFIFO.read(&update, 1) == 1) {
auto pChunk = update.takeFromWorker();
if (pChunk) {
// Take over control of the chunk from the worker.
// This has to be done before freeing all chunks
// after a new track has been loaded (see below)!
pChunk->takeFromWorker();
if (status.status == CHUNK_READ_SUCCESS) {
if (update.status == CHUNK_READ_SUCCESS) {
// Insert or freshen the chunk in the MRU/LRU list after
// obtaining ownership from the worker.
freshenChunk(pChunk);
Expand All @@ -199,20 +219,20 @@ void CachingReader::process() {
freeChunk(pChunk);
}
}
if (status.status == TRACK_NOT_LOADED) {
m_readerStatus = status.status;
} else if (status.status == TRACK_LOADED) {
m_readerStatus = status.status;
if (update.status == TRACK_NOT_LOADED) {
m_readerStatus = update.status;
} else if (update.status == TRACK_LOADED) {
m_readerStatus = update.status;
// Reset the max. readable frame index
m_readableFrameIndexRange = status.readableFrameIndexRange();
m_readableFrameIndexRange = update.readableFrameIndexRange();
// Free all chunks with sample data from a previous track
freeAllChunks();
}
if (m_readerStatus == TRACK_LOADED) {
// Adjust the readable frame index range after loading or reading
m_readableFrameIndexRange = intersect(
m_readableFrameIndexRange,
status.readableFrameIndexRange());
update.readableFrameIndexRange());
} else {
// Reset the readable frame index range
m_readableFrameIndexRange = mixxx::IndexRange();
Expand All @@ -228,9 +248,10 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,
// Refuse to read from an invalid number of samples
(numSamples % CachingReaderChunk::kChannels == 0) && (numSamples >= 0)) {
kLogger.critical()
<< "read() invalid arguments:"
<< "Invalid arguments for read():"
<< "startSample =" << startSample
<< "numSamples =" << numSamples;
<< "numSamples =" << numSamples
<< "reverse =" << reverse;
return ReadResult::UNAVAILABLE;
}
VERIFY_OR_DEBUG_ASSERT(buffer) {
Expand Down Expand Up @@ -281,10 +302,12 @@ CachingReader::ReadResult CachingReader::read(SINT startSample, SINT numSamples,
remainingFrameIndexRange.start(),
m_readableFrameIndexRange.start());
DEBUG_ASSERT(prerollFrameIndexRange.length() <= remainingFrameIndexRange.length());
kLogger.debug()
<< "Prepending"
<< prerollFrameIndexRange.length()
<< "frames of silence";
if (kLogger.traceEnabled()) {
kLogger.trace()
<< "Prepending"
<< prerollFrameIndexRange.length()
<< "frames of silence";
}
const SINT prerollFrames = prerollFrameIndexRange.length();
const SINT prerollSamples = CachingReaderChunk::frames2samples(prerollFrames);
DEBUG_ASSERT(samplesRemaining >= prerollSamples);
Expand Down Expand Up @@ -473,27 +496,33 @@ void CachingReader::hintAndMaybeWake(const HintVector& hintList) {
const int lastChunkIndex = CachingReaderChunk::indexForFrame(readableFrameIndexRange.end() - 1);
for (int chunkIndex = firstChunkIndex; chunkIndex <= lastChunkIndex; ++chunkIndex) {
CachingReaderChunkForOwner* pChunk = lookupChunk(chunkIndex);
if (pChunk == nullptr) {
if (!pChunk) {
shouldWake = true;
pChunk = allocateChunkExpireLRU(chunkIndex);
if (pChunk == nullptr) {
kLogger.warning() << "ERROR: Couldn't allocate spare CachingReaderChunk to make CachingReaderChunkReadRequest.";
if (!pChunk) {
kLogger.warning()
<< "Failed to allocate chunk"
<< chunkIndex
<< "for read request";
continue;
}
// Do not insert the allocated chunk into the MRU/LRU list,
// because it will be handed over to the worker immediately
CachingReaderChunkReadRequest request;
request.giveToWorker(pChunk);
// kLogger.debug() << "Requesting read of chunk" << current << "into" << pChunk;
// kLogger.debug() << "Requesting read into " << request.chunk->data;
if (kLogger.traceEnabled()) {
kLogger.trace()
<< "Requesting read of chunk"
<< request.chunk;
}
if (m_chunkReadRequestFIFO.write(&request, 1) != 1) {
kLogger.warning() << "ERROR: Could not submit read request for "
<< chunkIndex;
kLogger.warning()
<< "Failed to submit read request for chunk"
<< chunkIndex;
// Revoke the chunk from the worker and free it
pChunk->takeFromWorker();
freeChunk(pChunk);
}
//kLogger.debug() << "Checking chunk " << current << " shouldWake:" << shouldWake << " chunksToRead" << m_chunksToRead.size();
} else if (pChunk->getState() == CachingReaderChunkForOwner::READY) {
// This will cause the chunk to be 'freshened' in the cache. The
// chunk will be moved to the end of the LRU list.
Expand Down
1 change: 1 addition & 0 deletions src/engine/cachingreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class CachingReader : public QObject {

// Returns a CachingReaderChunk to the free list
void freeChunk(CachingReaderChunkForOwner* pChunk);
void freeChunkFromList(CachingReaderChunkForOwner* pChunk);

// Returns all allocated chunks to the free list
void freeAllChunks();
Expand Down
Loading

0 comments on commit 4000c66

Please sign in to comment.