Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded analysis #1069

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c4aab97
Initial implementation of multithreaded analysis
JosepMaJAZ Dec 3, 2016
4a2a142
Fixed end detection and updated signals to show multiple thread progress
JosepMaJAZ Dec 3, 2016
c06ab24
bugfix: one division too much
JosepMaJAZ Dec 3, 2016
3c5c680
Inform the UI to update when stopping the manager
JosepMaJAZ Dec 3, 2016
90882d8
ups. forgot to change this when fixing the division
JosepMaJAZ Dec 3, 2016
0d28354
Fix abatch waveform analysis
JosepMaJAZ Dec 4, 2016
8b450dc
documentation of methods
JosepMaJAZ Dec 10, 2016
b581f88
Preferences option to set the max number of threads for analysis
JosepMaJAZ Dec 11, 2016
711a3f9
build fixes and changes from PR https://github.com/mixxxdj/mixxx/pull…
JosepMaJAZ Dec 11, 2016
569493e
second round of fixes
JosepMaJAZ Dec 11, 2016
b478ecc
renaming of background/foreground/batch to default/priority/force, ad…
JosepMaJAZ Dec 11, 2016
4bf1152
bugfixes related to changing maxthreads
JosepMaJAZ Dec 15, 2016
1613fb6
build fix, oups!
JosepMaJAZ Dec 16, 2016
6326a69
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 18, 2017
0b06821
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 21, 2017
fe99a7d
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 21, 2017
cfb5542
leave one idle thread whenever there's a priority thread running. Thi…
JosepMaJAZ Jan 29, 2017
5410b84
merging to master
JosepMaJAZ Jan 29, 2017
bb09bb7
corrected VAMP_PATH multi-initialization. Corrected trackfinished cal…
JosepMaJAZ Jan 29, 2017
8a0e695
restart the paused default worker once all the priority workers finish
JosepMaJAZ Feb 12, 2017
2d7ab48
merge from master and bugfix on library.cpp (ouch! how did this work?…
JosepMaJAZ Apr 8, 2017
0d6cd9a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Apr 9, 2017
39dc961
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ May 10, 2017
a6a7994
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ May 27, 2017
645648d
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Jun 2, 2017
c6f138f
merge with current master
JosepMaJAZ Sep 9, 2017
24ba937
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Sep 30, 2017
82efeea
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Oct 14, 2017
982ce5a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Nov 19, 2017
7b09641
Several fixes and retouches. I believe this fixes the rare crashes th…
JosepMaJAZ Nov 19, 2017
a1c8aa1
linux compilation fixes plus removed calculation of threads from the …
JosepMaJAZ Nov 23, 2017
9fe94ff
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Nov 24, 2017
614d40f
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Dec 6, 2017
6da72ef
changes for sourceaudioapi v2 compatibility
JosepMaJAZ Dec 6, 2017
d010df7
merge remote branch and fix conflict
JosepMaJAZ Dec 14, 2017
2731050
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Dec 21, 2017
a467944
build fix
JosepMaJAZ Dec 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions build/depends.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ def enabled_modules(build):
def enabled_imageformats(build):
qt5 = Qt.qt5_enabled(build)
qt_imageformats = [
'qgif', 'qico', 'qjpeg', 'qmng', 'qtga', 'qtiff', 'qsvg'
'qgif', 'qico', 'qjpeg', 'qtga', 'qtiff', 'qsvg'
]
if qt5:
qt_imageformats.extend(['qdds', 'qicns', 'qjp2', 'qwbmp', 'qwebp'])
qt_imageformats.extend(['qdds', 'qicns', 'qwbmp', 'qwebp'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: These two changes were done for compatibility when building with QT5. If needed, they can be restored

return qt_imageformats

def satisfy(self):
Expand Down Expand Up @@ -737,7 +737,8 @@ def sources(self, build):
"engine/cachingreaderchunk.cpp",
"engine/cachingreaderworker.cpp",

"analyzer/analyzerqueue.cpp",
"analyzer/analyzermanager.cpp",
"analyzer/analyzerworker.cpp",
"analyzer/analyzerwaveform.cpp",
"analyzer/analyzergain.cpp",
"analyzer/analyzerebur128.cpp",
Expand Down
7 changes: 4 additions & 3 deletions src/analyzer/analyzerbeats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "track/beatutils.h"
#include "track/track.h"

AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig)
AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig, bool batch)
: m_pConfig(pConfig),
m_pVamp(NULL),
m_bPreferencesReanalyzeOldBpm(false),
Expand All @@ -27,7 +27,8 @@ AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig)
m_iSampleRate(0),
m_iTotalSamples(0),
m_iMinBpm(0),
m_iMaxBpm(9999) {
m_iMaxBpm(9999),
m_batch(batch) {
}

AnalyzerBeats::~AnalyzerBeats() {
Expand All @@ -38,7 +39,7 @@ bool AnalyzerBeats::initialize(TrackPointer tio, int sampleRate, int totalSample
return false;
}

bool bPreferencesBeatDetectionEnabled = static_cast<bool>(
bool bPreferencesBeatDetectionEnabled = m_batch || static_cast<bool>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does "m_batch" overrides the preferences option?
Could it be renamed to "forceBeatDetectionEnabled"?

The name bPreferencesBeatDetectionEnabled does not fit anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know either why such option existed. I simply changed it to a constructor parameter rather than changing it on analysisfeature. I am open to talk about the need or meaning of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok than I think m_batch should become m_forceBeatDetection and bPreferencesBeatDetectionEnabled should become beatDetectionEnabled.

m_pConfig->getValueString(
ConfigKey(BPM_CONFIG_KEY, BPM_DETECTION_ENABLED)).toInt());
if (!bPreferencesBeatDetectionEnabled) {
Expand Down
3 changes: 2 additions & 1 deletion src/analyzer/analyzerbeats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class AnalyzerBeats: public Analyzer {
public:
AnalyzerBeats(UserSettingsPointer pConfig);
AnalyzerBeats(UserSettingsPointer pConfig, bool batch);
virtual ~AnalyzerBeats();

bool initialize(TrackPointer tio, int sampleRate, int totalSamples) override;
Expand All @@ -40,6 +40,7 @@ class AnalyzerBeats: public Analyzer {

int m_iSampleRate, m_iTotalSamples;
int m_iMinBpm, m_iMaxBpm;
bool m_batch;
};

#endif /* ANALYZER_ANALYZERBEATS_H */
299 changes: 299 additions & 0 deletions src/analyzer/analyzermanager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
#include "analyzer/analyzermanager.h"

#include <typeinfo>
#include <QThread>

#include <QtDebug>
#include <QMutexLocker>
#include <QListIterator>

#include "library/trackcollection.h"
#include "mixer/playerinfo.h"
#include "track/track.h"
#include "util/compatibility.h"
#include "util/event.h"
#include "util/timer.h"
#include "util/trace.h"

AnalyzerManager::AnalyzerManager(UserSettingsPointer pConfig) :
m_pConfig(pConfig),
m_nextWorkerId(0),
m_batchTrackQueue(),
m_prioTrackQueue(),
m_backgroundWorkers(),
m_foregroundWorkers(),
m_pausedWorkers() {

int maxThreads = m_pConfig->getValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"));
int ideal = QThread::idealThreadCount();
if (QThread::idealThreadCount() < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have test data, that the QT idealThreadCount actually is ideal in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If idealthreadcount is -1 (or generally, less than 1), then the program can do two things:
a)Accept what the user configured in the configuration
b)Use only one thread.

I do "b" if the configured value does not seem sane, and do a if it seems sane.

On the preferences dialog, there is a code similar to this, but with the difference that, if idealthreadcount is -1 ( < 1), the user can select between 1 and 8, and the default value is 1.

if (maxThreads > 0 && maxThreads <= 32) {
qDebug() << "Cannot detect idealThreadCount. maxThreads is: " << maxThreads;
ideal = maxThreads;
}
else {
qWarning() << "Cannot detect idealThreadCount. Using the sane value of 1";
ideal = 1;
}
}
if (maxThreads <= 0 || maxThreads > ideal) {
qWarning() << "maxThreads value is incorrect. Changing it to " << ideal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also print the invalid value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not add issue a warning if the Mixxx starts the first time with this new preference option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will print that too.
About the initial value, I don't know where first-time values are set up. Could you enlighten me?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work:
int maxThreads = m_pConfig->getValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"), ideal);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, Yup, that would work too, although I meant where is the configuration initizalized for the first time, so that I could assign the ideal value to it at that time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With exact this command. If "[Library]", "MaxAnalysisThreads" is not set ideal is returned.

//Assume the value is incorrect, so fix it.
maxThreads = ideal;
}
m_MaxThreads = maxThreads;
}

AnalyzerManager::~AnalyzerManager() {
stop(true);
}

bool AnalyzerManager::isActive() {
int total = m_foregroundWorkers.size() +
m_backgroundWorkers.size() + m_pausedWorkers.size();
return total > 0;
}
bool AnalyzerManager::isBackgroundWorkerActive() {
int total = m_backgroundWorkers.size() + m_pausedWorkers.size();
return total > 0;
}

void AnalyzerManager::stop(bool shutdown) {
m_batchTrackQueue.clear();
QListIterator<AnalyzerWorker*> it(m_backgroundWorkers);
while (it.hasNext()) {
AnalyzerWorker* worker = it.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
QListIterator<AnalyzerWorker*> it3(m_pausedWorkers);
while (it3.hasNext()) {
AnalyzerWorker* worker = it3.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
if (shutdown) {
m_prioTrackQueue.clear();
QListIterator<AnalyzerWorker*> it2(m_foregroundWorkers);
while (it2.hasNext()) {
AnalyzerWorker* worker = it2.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
//TODO: ensure that they are all forcibly stopped.
}
}
// Analyze it with a foreground worker. (foreground as in interactive, i.e. not a batch worker).
void AnalyzerManager::analyseTrackNow(TrackPointer tio) {
if (m_batchTrackQueue.contains(tio)) {
m_batchTrackQueue.removeAll(tio);
}
//TODO: There's one scenario that we still miss: load on a deck a track that is currently
//being analyzed by the background worker. We cannot reuse the background worker, but we should discard its work.
if (!m_prioTrackQueue.contains(tio)) {
m_prioTrackQueue.append(tio);
if (m_foregroundWorkers.size() < m_MaxThreads) {
createNewWorker(false);
if (m_foregroundWorkers.size() + m_backgroundWorkers.size() > m_MaxThreads) {
AnalyzerWorker * backwork = m_backgroundWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_backgroundWorkers.removeAll(backwork);
}
}
}
}
// This is called from the GUI for batch analysis.
void AnalyzerManager::queueAnalyseTrack(TrackPointer tio) {
if (!m_batchTrackQueue.contains(tio)) {
m_batchTrackQueue.append(tio);
if (m_pausedWorkers.size() + m_backgroundWorkers.size() < m_MaxThreads) {
createNewWorker(true);
}
}
}

// This slot is called from the decks and samplers when the track is loaded.
void AnalyzerManager::slotAnalyseTrack(TrackPointer tio) {
analyseTrackNow(tio);
}


//slot
void AnalyzerManager::slotUpdateProgress(int workerIdx, struct AnalyzerWorker::progress_info* progressInfo) {
//Updates to wave overview and player status text comes from a signal emited from the track by calling setAnalyzerProgress.
progressInfo->current_track->setAnalyzerProgress(progressInfo->track_progress);
//These update the Analysis feature and analysis view.
emit(trackProgress(workerIdx, progressInfo->track_progress / 10));
if (progressInfo->track_progress == 1000) {
//Right now no one is listening to trackDone, but it's here just in case.
emit(trackDone(progressInfo->current_track));
//Report that a track analysis has finished, and how many are still remaining.
emit(trackFinished(m_backgroundWorkers.size() + m_batchTrackQueue.size() - 1));
}
//TODO: Which is the consequence of not calling clear?
#if QT_VERSION < QT_VERSION_CHECK(5, 0, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use reset(). This conditional compilation is no longer necessary since we introduced a dedicated class for TrackPointer that handles the differences between QSharedPointer and std::shared_pointer internally.

progressInfo->current_track.clear();
#else
progressInfo->current_track.reset();
#endif
progressInfo->sema.release();
}

void AnalyzerManager::slotNextTrack(AnalyzerWorker* worker) {
//TODO: The old scan checked in isLoadedTrackWaiting for pTrack->getAnalyzerProgress()
// and either tried to load a previuos scan, or discarded the track if it had already been
// analyzed. I don't fully understand the scenario and I am not doing that right now.

//This is used when the maxThreads change. Extra workers are paused until active workers end.
//Then, those are terminated and the paused workers are resumed.
TrackPointer track = TrackPointer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" = TrackPointer();" is redundant.

AnalyzerWorker* forepaused=nullptr;
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker is also user outside the loop. Please pick a new name to avoid confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch! 👍 Indeed, i added that when adding the refresh of the threads from the preferences, but I overlooked the names.

if (!worker->isBatch()) { forepaused=worker; break; }
}
if (!forepaused) {
if (worker->isBatch()) {
if (m_backgroundWorkers.size() + m_pausedWorkers.size() <= m_MaxThreads) {
//The while loop is done in the event that the track which was added to the queue is no
//longer available.
while (!track && !m_batchTrackQueue.isEmpty()) {
track = m_batchTrackQueue.dequeue();
}
}
}
else {
while (!track && !m_prioTrackQueue.isEmpty()) {
track = m_prioTrackQueue.dequeue();
}
}
}
if (track) {
worker->nextTrack(track);
}
else {
worker->endProcess();
//Removing from active lists, so that "isActive" can return the correct value.
m_backgroundWorkers.removeAll(worker);
m_foregroundWorkers.removeAll(worker);
m_endingWorkers.append(worker);

if (forepaused) {
forepaused->resume();
m_pausedWorkers.removeOne(forepaused);
m_foregroundWorkers.append(forepaused);
}
else if (!m_pausedWorkers.isEmpty()) {
AnalyzerWorker* otherworker = m_pausedWorkers.first();
otherworker->resume();
m_pausedWorkers.removeOne(otherworker);
if (otherworker->isBatch()) {
m_backgroundWorkers.append(otherworker);
}
else {
m_foregroundWorkers.append(otherworker);
}
}
}
//Check if background workers are empty.
if (!isBackgroundWorkerActive()) {
emit(queueEmpty());
}
}
void AnalyzerManager::slotWorkerFinished(AnalyzerWorker* worker) {
m_endingWorkers.removeAll(worker);
m_backgroundWorkers.removeAll(worker);
m_foregroundWorkers.removeAll(worker);
m_pausedWorkers.removeAll(worker);
if (!isBackgroundWorkerActive()) {
emit(queueEmpty());
}
}
void AnalyzerManager::slotPaused(AnalyzerWorker* worker) {
//No useful code to execute right now.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Q_UNUSED(worker); to eliminate a warning

}
void AnalyzerManager::slotErrorString(QString errMsg) {
//TODO: This is currently unused.
qWarning() << "Testing with :" << errMsg;
}


void AnalyzerManager::slotMaxThreadsChanged(int threads) {
// If it is Active, adapt the amount of workers. If it is not active, it will just update the variable.
if (threads < m_MaxThreads) {
//Pause workers
while (!m_backgroundWorkers.isEmpty()
&& m_foregroundWorkers.size() + m_backgroundWorkers.size() > threads) {
AnalyzerWorker * backwork = m_backgroundWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_backgroundWorkers.removeAll(backwork);
}
while (m_foregroundWorkers.size() > threads) {
AnalyzerWorker * backwork = m_foregroundWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_foregroundWorkers.removeAll(backwork);
}
}
else {
//resume workers
int pendingworkers=threads-m_MaxThreads;
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
if (!worker->isBatch() && pendingworkers > 0) {
worker->resume();
m_pausedWorkers.removeOne(worker);
m_foregroundWorkers.append(worker);
--pendingworkers;
}
}
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
if (worker->isBatch() && pendingworkers > 0) {
worker->resume();
m_pausedWorkers.removeOne(worker);
m_backgroundWorkers.append(worker);
--pendingworkers;
}
}
//Create new workers, if tracks in queue.
pendingworkers = math_min(pendingworkers,m_batchTrackQueue.size());
for ( ;pendingworkers > 0; --pendingworkers) {
createNewWorker(true);
}
}
m_MaxThreads=threads;
}

AnalyzerWorker* AnalyzerManager::createNewWorker(bool batchJob) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this: bool priority

QThread* thread = new QThread();
AnalyzerWorker* worker = new AnalyzerWorker(m_pConfig, ++m_nextWorkerId, batchJob);
worker->moveToThread(thread);
//Auto startup and auto cleanup of worker and thread.
connect(thread, SIGNAL(started()), worker, SLOT(slotProcess()));
connect(worker, SIGNAL(finished()), thread, SLOT(quit()));
connect(worker, SIGNAL(finished()), worker, SLOT(deleteLater()));
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
//Connect with manager.
connect(worker, SIGNAL(updateProgress(int, struct AnalyzerWorker::progress_info*)), this, SLOT(slotUpdateProgress(int, struct AnalyzerWorker::progress_info*)));
connect(worker, SIGNAL(waitingForNextTrack(AnalyzerWorker*)), this, SLOT(slotNextTrack(AnalyzerWorker*)));
connect(worker, SIGNAL(paused(AnalyzerWorker*)), this, SLOT(slotPaused(AnalyzerWorker*)));
connect(worker, SIGNAL(workerFinished(AnalyzerWorker*)), this, SLOT(slotWorkerFinished(AnalyzerWorker*)));
connect(worker, SIGNAL(error(QString)), this, SLOT(slotErrorString(QString)));
thread->start(QThread::LowPriority);
if (batchJob) {
m_backgroundWorkers.append(worker);
}
else {
m_foregroundWorkers.append(worker);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes a forground worker a "forground" worker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is "priorityWorkers"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bad naming from my side, but I couldn't think of a better one at that time.
Strictly speaking, everything is run in background (i.e. the user does not need to wait for any of them to stop).
background, in this case, means that they are used for the job of the analysis queue (analysis feature of the library), while foreground means that they run an analysis by an immediate action of the user (just loaded a track).

Maybe I should have used worker and priorityworker. Take in mind that the priority is not the only difference, since there are some Mixxx settings that affect them in a different way. (namely the waveform creation and the bpm analysis)

}
return worker;
}



Loading