Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded analysis #1069

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c4aab97
Initial implementation of multithreaded analysis
JosepMaJAZ Dec 3, 2016
4a2a142
Fixed end detection and updated signals to show multiple thread progress
JosepMaJAZ Dec 3, 2016
c06ab24
bugfix: one division too much
JosepMaJAZ Dec 3, 2016
3c5c680
Inform the UI to update when stopping the manager
JosepMaJAZ Dec 3, 2016
90882d8
ups. forgot to change this when fixing the division
JosepMaJAZ Dec 3, 2016
0d28354
Fix abatch waveform analysis
JosepMaJAZ Dec 4, 2016
8b450dc
documentation of methods
JosepMaJAZ Dec 10, 2016
b581f88
Preferences option to set the max number of threads for analysis
JosepMaJAZ Dec 11, 2016
711a3f9
build fixes and changes from PR https://github.com/mixxxdj/mixxx/pull…
JosepMaJAZ Dec 11, 2016
569493e
second round of fixes
JosepMaJAZ Dec 11, 2016
b478ecc
renaming of background/foreground/batch to default/priority/force, ad…
JosepMaJAZ Dec 11, 2016
4bf1152
bugfixes related to changing maxthreads
JosepMaJAZ Dec 15, 2016
1613fb6
build fix, oups!
JosepMaJAZ Dec 16, 2016
6326a69
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 18, 2017
0b06821
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 21, 2017
fe99a7d
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 21, 2017
cfb5542
leave one idle thread whenever there's a priority thread running. Thi…
JosepMaJAZ Jan 29, 2017
5410b84
merging to master
JosepMaJAZ Jan 29, 2017
bb09bb7
corrected VAMP_PATH multi-initialization. Corrected trackfinished cal…
JosepMaJAZ Jan 29, 2017
8a0e695
restart the paused default worker once all the priority workers finish
JosepMaJAZ Feb 12, 2017
2d7ab48
merge from master and bugfix on library.cpp (ouch! how did this work?…
JosepMaJAZ Apr 8, 2017
0d6cd9a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Apr 9, 2017
39dc961
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ May 10, 2017
a6a7994
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ May 27, 2017
645648d
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Jun 2, 2017
c6f138f
merge with current master
JosepMaJAZ Sep 9, 2017
24ba937
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Sep 30, 2017
82efeea
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Oct 14, 2017
982ce5a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Nov 19, 2017
7b09641
Several fixes and retouches. I believe this fixes the rare crashes th…
JosepMaJAZ Nov 19, 2017
a1c8aa1
linux compilation fixes plus removed calculation of threads from the …
JosepMaJAZ Nov 23, 2017
9fe94ff
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Nov 24, 2017
614d40f
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Dec 6, 2017
6da72ef
changes for sourceaudioapi v2 compatibility
JosepMaJAZ Dec 6, 2017
d010df7
merge remote branch and fix conflict
JosepMaJAZ Dec 14, 2017
2731050
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Dec 21, 2017
a467944
build fix
JosepMaJAZ Dec 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build/depends.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ def sources(self, build):
"engine/cachingreaderchunk.cpp",
"engine/cachingreaderworker.cpp",

"analyzer/analyzerqueue.cpp",
"analyzer/analyzermanager.cpp",
"analyzer/analyzerworker.cpp",
"analyzer/analyzerwaveform.cpp",
"analyzer/analyzergain.cpp",
"analyzer/analyzerebur128.cpp",
Expand Down
9 changes: 5 additions & 4 deletions src/analyzer/analyzerbeats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "track/beatutils.h"
#include "track/track.h"

AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig)
AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig, bool forceBeatDetection)
: m_pConfig(pConfig),
m_pVamp(NULL),
m_bPreferencesReanalyzeOldBpm(false),
Expand All @@ -27,7 +27,8 @@ AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig)
m_iSampleRate(0),
m_iTotalSamples(0),
m_iMinBpm(0),
m_iMaxBpm(9999) {
m_iMaxBpm(9999),
m_forceBeatDetection(forceBeatDetection) {
}

AnalyzerBeats::~AnalyzerBeats() {
Expand All @@ -38,9 +39,9 @@ bool AnalyzerBeats::initialize(TrackPointer tio, int sampleRate, int totalSample
return false;
}

bool bPreferencesBeatDetectionEnabled = m_pConfig->getValue<bool>(
bool beatDetectionEnabled = m_forceBeatDetection || m_pConfig->getValue<bool>(
ConfigKey(BPM_CONFIG_KEY, BPM_DETECTION_ENABLED));
if (!bPreferencesBeatDetectionEnabled) {
if (!beatDetectionEnabled) {
qDebug() << "Beat calculation is deactivated";
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion src/analyzer/analyzerbeats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class AnalyzerBeats: public Analyzer {
public:
AnalyzerBeats(UserSettingsPointer pConfig);
AnalyzerBeats(UserSettingsPointer pConfig, bool forceBeatDetection);
virtual ~AnalyzerBeats();

bool initialize(TrackPointer tio, int sampleRate, int totalSamples) override;
Expand All @@ -40,6 +40,7 @@ class AnalyzerBeats: public Analyzer {

int m_iSampleRate, m_iTotalSamples;
int m_iMinBpm, m_iMaxBpm;
bool m_forceBeatDetection;
};

#endif /* ANALYZER_ANALYZERBEATS_H */
315 changes: 315 additions & 0 deletions src/analyzer/analyzermanager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
#include "analyzer/analyzermanager.h"

#include <typeinfo>
#include <QThread>

#include <QtDebug>
#include <QMutexLocker>
#include <QListIterator>

#include "library/trackcollection.h"
#include "mixer/playerinfo.h"
#include "track/track.h"
#include "util/compatibility.h"
#include "util/event.h"
#include "util/timer.h"
#include "util/trace.h"

AnalyzerManager::AnalyzerManager(UserSettingsPointer pConfig,
mixxx::DbConnectionPoolPtr pDbConnectionPool) :
m_pConfig(pConfig),
m_nextWorkerId(0),
m_defaultTrackQueue(),
m_prioTrackQueue(),
m_defaultWorkers(),
m_priorityWorkers(),
m_pausedWorkers(),
m_pDbConnectionPool(std::move(pDbConnectionPool)) {

int ideal = QThread::idealThreadCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideal can be moved into the if branch below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. yup... Initially I was using the ideal in the if, instead of the hardcoded 32, but then i had the use case where i get -1 for ideal and i wasn't sure how to handle that.
Now that I think about it, i believe i will need to change this again, because now it does not work as I've explained above (it does not prevent opening 8 threads on a 4thread CPU)

int maxThreads = m_pConfig->getValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"), ideal);
if (ideal < 1) {
if (maxThreads > 0 && maxThreads <= 32) {
qDebug() << "Cannot detect idealThreadCount. maxThreads is: " << maxThreads;
ideal = maxThreads;
}
else {
qWarning() << "Cannot detect idealThreadCount and maxThreads is incorrect: " << maxThreads <<". Using the sane value of 1";
ideal = 1;
}
}
if (maxThreads <= 0 || maxThreads > ideal) {
qWarning() << "maxThreads value is incorrect: " << maxThreads << ". Changing it to " << ideal;
//Assume the value is incorrect, so fix it.
maxThreads = ideal;
}
m_MaxThreads = maxThreads;
}

AnalyzerManager::~AnalyzerManager() {
stop(true);
}

bool AnalyzerManager::isActive() {
int total = m_priorityWorkers.size() +
m_defaultWorkers.size() + m_pausedWorkers.size();
return total > 0;
}
bool AnalyzerManager::isDefaultQueueActive() {
int total = m_defaultWorkers.size() + m_pausedWorkers.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is + here correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, sorry for the noise.

return total > 0;
}

void AnalyzerManager::stop(bool shutdown) {
m_defaultTrackQueue.clear();
QListIterator<AnalyzerWorker*> it(m_defaultWorkers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Range based for loops more compact and easier to grasp than while loops with subsequently numbered local variables.

while (it.hasNext()) {
AnalyzerWorker* worker = it.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
QListIterator<AnalyzerWorker*> it3(m_pausedWorkers);
while (it3.hasNext()) {
AnalyzerWorker* worker = it3.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
if (shutdown) {
m_prioTrackQueue.clear();
QListIterator<AnalyzerWorker*> it2(m_priorityWorkers);
while (it2.hasNext()) {
AnalyzerWorker* worker = it2.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
//TODO: ensure that they are all forcibly stopped.
}
}
//Add a track to be analyzed with a priority worker. (Like those required by loading a track into a player).
void AnalyzerManager::analyseTrackNow(TrackPointer tio) {
if (m_defaultTrackQueue.contains(tio)) {
m_defaultTrackQueue.removeAll(tio);
}
//TODO: There's one scenario that we still miss: load on a deck a track that is currently
//being analyzed by the background worker. We cannot reuse the background worker, but we should discard its work.
if (!m_prioTrackQueue.contains(tio)) {
m_prioTrackQueue.append(tio);
if (m_priorityWorkers.isEmpty() && m_defaultWorkers.size() > 0) {
//In order to keep the application responsive, and ensure that a priority worker is
//not slowed down by default workers (because they have the same OS thread priority),
//we stop one additional default worker
AnalyzerWorker * backwork = m_defaultWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_defaultWorkers.removeAll(backwork);
}
if (m_priorityWorkers.size() < m_MaxThreads-1) {
createNewWorker(WorkerType::priorityWorker);
if (m_priorityWorkers.size() + m_defaultWorkers.size() > m_MaxThreads-1) {
AnalyzerWorker * backwork = m_defaultWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_defaultWorkers.removeAll(backwork);
}
}
}
}
// This is called from the GUI for the analysis feature of the library.
void AnalyzerManager::queueAnalyseTrack(TrackPointer tio) {
//See notes on analyseTrackNow of why we reduce the number of threads if there are priority workers.
int maxDefThreads = (m_priorityWorkers.isEmpty()) ? m_MaxThreads : m_MaxThreads-1;
if (!m_defaultTrackQueue.contains(tio)) {
m_defaultTrackQueue.append(tio);
if (m_pausedWorkers.size() + m_defaultWorkers.size() < maxDefThreads) {
createNewWorker(WorkerType::defaultWorker);
}
}
}

// This slot is called from the decks and samplers when the track is loaded.
void AnalyzerManager::slotAnalyseTrack(TrackPointer tio) {
analyseTrackNow(tio);
}


//slot
void AnalyzerManager::slotUpdateProgress(int workerIdx, struct AnalyzerWorker::progress_info* progressInfo) {
//Updates to wave overview and player status text comes from a signal emited from the track by calling setAnalyzerProgress.
progressInfo->current_track->setAnalyzerProgress(progressInfo->track_progress);
//These update the Analysis feature and analysis view.
emit(trackProgress(workerIdx, progressInfo->track_progress / 10));
if (progressInfo->track_progress == 1000) {
//Right now no one is listening to trackDone, but it's here just in case.
emit(trackDone(progressInfo->current_track));
//Report that a track analysis has finished, and how many are still remaining.
emit(trackFinished(m_defaultWorkers.size() + m_defaultTrackQueue.size() - 1));
}
//TODO: Which is the consequence of not calling reset?
progressInfo->current_track.reset();
progressInfo->sema.release();
}

void AnalyzerManager::slotNextTrack(AnalyzerWorker* worker) {
//TODO: The old scan checked in isLoadedTrackWaiting for pTrack->getAnalyzerProgress()
// and either tried to load a previuos scan, or discarded the track if it had already been
// analyzed. I don't fully understand the scenario and I am not doing that right now.

//This is used when the maxThreads change. Extra workers are paused until active workers end.
//Then, those are terminated and the paused workers are resumed.
TrackPointer track;
AnalyzerWorker* forepaused=nullptr;
foreach(AnalyzerWorker* theworker, m_pausedWorkers) {
if (theworker->isPriorized()) { forepaused=theworker; break; }
}
if (!forepaused) {
if (worker->isPriorized()) {
while (!track && !m_prioTrackQueue.isEmpty()) {
track = m_prioTrackQueue.dequeue();
}
}
else {
if (m_defaultWorkers.size() + m_pausedWorkers.size() <= m_MaxThreads) {
//The while loop is done in the event that the track which was added to the queue is no
//longer available.
while (!track && !m_defaultTrackQueue.isEmpty()) {
track = m_defaultTrackQueue.dequeue();
}
}
}
}
if (track) {
worker->nextTrack(track);
}
else {
worker->endProcess();
//Removing from active lists, so that "isActive" can return the correct value.
m_defaultWorkers.removeAll(worker);
m_priorityWorkers.removeAll(worker);
m_endingWorkers.append(worker);

if (forepaused) {
forepaused->resume();
m_pausedWorkers.removeOne(forepaused);
m_priorityWorkers.append(forepaused);
}
else if (!m_pausedWorkers.isEmpty()) {
AnalyzerWorker* otherworker = m_pausedWorkers.first();
otherworker->resume();
m_pausedWorkers.removeOne(otherworker);
m_defaultWorkers.append(otherworker);
if (m_priorityWorkers.isEmpty() && !m_pausedWorkers.isEmpty()) {
//Once the priority workers have ended, restart the extra paused default worker.
otherworker = m_pausedWorkers.first();
otherworker->resume();
m_pausedWorkers.removeOne(otherworker);
m_defaultWorkers.append(otherworker);
}
}
}
}
void AnalyzerManager::slotWorkerFinished(AnalyzerWorker* worker) {
m_endingWorkers.removeAll(worker);
m_defaultWorkers.removeAll(worker);
m_priorityWorkers.removeAll(worker);
m_pausedWorkers.removeAll(worker);
if (!isDefaultQueueActive()) {
emit(queueEmpty());
}
}
void AnalyzerManager::slotPaused(AnalyzerWorker* worker) {
//No useful code to execute right now.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Q_UNUSED(worker); to eliminate a warning

Q_UNUSED(worker);
}
void AnalyzerManager::slotErrorString(QString errMsg) {
//TODO: This is currently unused.
qWarning() << "Testing with :" << errMsg;
}


void AnalyzerManager::slotMaxThreadsChanged(int threads) {
//See notes on analyseTrackNow of why we reduce the number of threads if there are priority workers.
int maxDefThreads = (m_priorityWorkers.isEmpty()) ? threads : threads-1;
// If it is Active, adapt the amount of workers. If it is not active, it will just update the variable.
if (threads < m_MaxThreads) {
//Pause workers
while (!m_defaultWorkers.isEmpty()
&& m_priorityWorkers.size() + m_defaultWorkers.size() > maxDefThreads) {
AnalyzerWorker * backwork = m_defaultWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_defaultWorkers.removeAll(backwork);
}
while (m_priorityWorkers.size() > threads) {
AnalyzerWorker * backwork = m_priorityWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_priorityWorkers.removeAll(backwork);
}
}
else {
//resume workers
int pendingworkers=threads-m_MaxThreads;
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
if (worker->isPriorized() && pendingworkers > 0) {
worker->resume();
m_pausedWorkers.removeOne(worker);
m_priorityWorkers.append(worker);
--pendingworkers;
}
}
if (!m_priorityWorkers.isEmpty() && pendingworkers > 0) {
pendingworkers--;
}
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
if (!worker->isPriorized() && pendingworkers > 0) {
worker->resume();
m_pausedWorkers.removeOne(worker);
m_defaultWorkers.append(worker);
--pendingworkers;
}
}
//Create new workers, if tracks in queue.
pendingworkers = math_min(pendingworkers,m_defaultTrackQueue.size());
for ( ;pendingworkers > 0; --pendingworkers) {
createNewWorker(WorkerType::defaultWorker);
}
}
m_MaxThreads=threads;
}

AnalyzerWorker* AnalyzerManager::createNewWorker(WorkerType wtype) {
bool priorized = (wtype == WorkerType::priorityWorker);
QThread* thread = new QThread();
AnalyzerWorker* worker = new AnalyzerWorker(m_pConfig, m_pDbConnectionPool, ++m_nextWorkerId, priorized);
worker->moveToThread(thread);
//Auto startup and auto cleanup of worker and thread.
connect(thread, SIGNAL(started()), worker, SLOT(slotProcess()));
connect(worker, SIGNAL(finished()), thread, SLOT(quit()));
connect(worker, SIGNAL(finished()), worker, SLOT(deleteLater()));
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
//Connect with manager.
connect(worker, SIGNAL(updateProgress(int, struct AnalyzerWorker::progress_info*)), this, SLOT(slotUpdateProgress(int, struct AnalyzerWorker::progress_info*)));
connect(worker, SIGNAL(waitingForNextTrack(AnalyzerWorker*)), this, SLOT(slotNextTrack(AnalyzerWorker*)));
connect(worker, SIGNAL(paused(AnalyzerWorker*)), this, SLOT(slotPaused(AnalyzerWorker*)));
connect(worker, SIGNAL(workerFinished(AnalyzerWorker*)), this, SLOT(slotWorkerFinished(AnalyzerWorker*)));
connect(worker, SIGNAL(error(QString)), this, SLOT(slotErrorString(QString)));
thread->start(QThread::LowPriority);
if (priorized) {
m_priorityWorkers.append(worker);
}
else {
m_defaultWorkers.append(worker);
}
return worker;
}



Loading