Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded analysis #1069

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c4aab97
Initial implementation of multithreaded analysis
JosepMaJAZ Dec 3, 2016
4a2a142
Fixed end detection and updated signals to show multiple thread progress
JosepMaJAZ Dec 3, 2016
c06ab24
bugfix: one division too much
JosepMaJAZ Dec 3, 2016
3c5c680
Inform the UI to update when stopping the manager
JosepMaJAZ Dec 3, 2016
90882d8
ups. forgot to change this when fixing the division
JosepMaJAZ Dec 3, 2016
0d28354
Fix abatch waveform analysis
JosepMaJAZ Dec 4, 2016
8b450dc
documentation of methods
JosepMaJAZ Dec 10, 2016
b581f88
Preferences option to set the max number of threads for analysis
JosepMaJAZ Dec 11, 2016
711a3f9
build fixes and changes from PR https://github.com/mixxxdj/mixxx/pull…
JosepMaJAZ Dec 11, 2016
569493e
second round of fixes
JosepMaJAZ Dec 11, 2016
b478ecc
renaming of background/foreground/batch to default/priority/force, ad…
JosepMaJAZ Dec 11, 2016
4bf1152
bugfixes related to changing maxthreads
JosepMaJAZ Dec 15, 2016
1613fb6
build fix, oups!
JosepMaJAZ Dec 16, 2016
6326a69
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 18, 2017
0b06821
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 21, 2017
fe99a7d
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ Jan 21, 2017
cfb5542
leave one idle thread whenever there's a priority thread running. Thi…
JosepMaJAZ Jan 29, 2017
5410b84
merging to master
JosepMaJAZ Jan 29, 2017
bb09bb7
corrected VAMP_PATH multi-initialization. Corrected trackfinished cal…
JosepMaJAZ Jan 29, 2017
8a0e695
restart the paused default worker once all the priority workers finish
JosepMaJAZ Feb 12, 2017
2d7ab48
merge from master and bugfix on library.cpp (ouch! how did this work?…
JosepMaJAZ Apr 8, 2017
0d6cd9a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Apr 9, 2017
39dc961
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ May 10, 2017
a6a7994
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ May 27, 2017
645648d
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Jun 2, 2017
c6f138f
merge with current master
JosepMaJAZ Sep 9, 2017
24ba937
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Sep 30, 2017
82efeea
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Oct 14, 2017
982ce5a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Nov 19, 2017
7b09641
Several fixes and retouches. I believe this fixes the rare crashes th…
JosepMaJAZ Nov 19, 2017
a1c8aa1
linux compilation fixes plus removed calculation of threads from the …
JosepMaJAZ Nov 23, 2017
9fe94ff
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Nov 24, 2017
614d40f
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Dec 6, 2017
6da72ef
changes for sourceaudioapi v2 compatibility
JosepMaJAZ Dec 6, 2017
d010df7
merge remote branch and fix conflict
JosepMaJAZ Dec 14, 2017
2731050
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ Dec 21, 2017
a467944
build fix
JosepMaJAZ Dec 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions build/depends.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ def enabled_modules(build):
def enabled_imageformats(build):
qt5 = Qt.qt5_enabled(build)
qt_imageformats = [
'qgif', 'qico', 'qjpeg', 'qmng', 'qtga', 'qtiff', 'qsvg'
'qgif', 'qico', 'qjpeg', 'qtga', 'qtiff', 'qsvg'
]
if qt5:
qt_imageformats.extend(['qdds', 'qicns', 'qjp2', 'qwbmp', 'qwebp'])
qt_imageformats.extend(['qdds', 'qicns', 'qwbmp', 'qwebp'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: These two changes were done for compatibility when building with QT5. If needed, they can be restored

return qt_imageformats

def satisfy(self):
Expand Down Expand Up @@ -737,7 +737,8 @@ def sources(self, build):
"engine/cachingreaderchunk.cpp",
"engine/cachingreaderworker.cpp",

"analyzer/analyzerqueue.cpp",
"analyzer/analyzermanager.cpp",
"analyzer/analyzerworker.cpp",
"analyzer/analyzerwaveform.cpp",
"analyzer/analyzergain.cpp",
"analyzer/analyzerebur128.cpp",
Expand Down
9 changes: 5 additions & 4 deletions src/analyzer/analyzerbeats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "track/beatutils.h"
#include "track/track.h"

AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig)
AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig, bool forceBeatDetection)
: m_pConfig(pConfig),
m_pVamp(NULL),
m_bPreferencesReanalyzeOldBpm(false),
Expand All @@ -27,7 +27,8 @@ AnalyzerBeats::AnalyzerBeats(UserSettingsPointer pConfig)
m_iSampleRate(0),
m_iTotalSamples(0),
m_iMinBpm(0),
m_iMaxBpm(9999) {
m_iMaxBpm(9999),
m_forceBeatDetection(forceBeatDetection) {
}

AnalyzerBeats::~AnalyzerBeats() {
Expand All @@ -38,10 +39,10 @@ bool AnalyzerBeats::initialize(TrackPointer tio, int sampleRate, int totalSample
return false;
}

bool bPreferencesBeatDetectionEnabled = static_cast<bool>(
bool beatDetectionEnabled = m_forceBeatDetection || static_cast<bool>(
m_pConfig->getValueString(
ConfigKey(BPM_CONFIG_KEY, BPM_DETECTION_ENABLED)).toInt());
if (!bPreferencesBeatDetectionEnabled) {
if (!beatDetectionEnabled) {
qDebug() << "Beat calculation is deactivated";
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion src/analyzer/analyzerbeats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class AnalyzerBeats: public Analyzer {
public:
AnalyzerBeats(UserSettingsPointer pConfig);
AnalyzerBeats(UserSettingsPointer pConfig, bool forceBeatDetection);
virtual ~AnalyzerBeats();

bool initialize(TrackPointer tio, int sampleRate, int totalSamples) override;
Expand All @@ -40,6 +40,7 @@ class AnalyzerBeats: public Analyzer {

int m_iSampleRate, m_iTotalSamples;
int m_iMinBpm, m_iMaxBpm;
bool m_forceBeatDetection;
};

#endif /* ANALYZER_ANALYZERBEATS_H */
296 changes: 296 additions & 0 deletions src/analyzer/analyzermanager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
#include "analyzer/analyzermanager.h"

#include <typeinfo>
#include <QThread>

#include <QtDebug>
#include <QMutexLocker>
#include <QListIterator>

#include "library/trackcollection.h"
#include "mixer/playerinfo.h"
#include "track/track.h"
#include "util/compatibility.h"
#include "util/event.h"
#include "util/timer.h"
#include "util/trace.h"

AnalyzerManager::AnalyzerManager(UserSettingsPointer pConfig) :
m_pConfig(pConfig),
m_nextWorkerId(0),
m_defaultTrackQueue(),
m_prioTrackQueue(),
m_defaultWorkers(),
m_priorityWorkers(),
m_pausedWorkers() {

int maxThreads = m_pConfig->getValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"));
int ideal = QThread::idealThreadCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideal can be moved into the if branch below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. yup... Initially I was using the ideal in the if, instead of the hardcoded 32, but then i had the use case where i get -1 for ideal and i wasn't sure how to handle that.
Now that I think about it, i believe i will need to change this again, because now it does not work as I've explained above (it does not prevent opening 8 threads on a 4thread CPU)

if (ideal < 1) {
if (maxThreads > 0 && maxThreads <= 32) {
qDebug() << "Cannot detect idealThreadCount. maxThreads is: " << maxThreads;
ideal = maxThreads;
}
else {
qWarning() << "Cannot detect idealThreadCount. Using the sane value of 1";
ideal = 1;
}
}
if (maxThreads <= 0 || maxThreads > ideal) {
qWarning() << "maxThreads value is incorrect. Changing it to " << ideal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also print the invalid value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not add issue a warning if the Mixxx starts the first time with this new preference option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will print that too.
About the initial value, I don't know where first-time values are set up. Could you enlighten me?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work:
int maxThreads = m_pConfig->getValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"), ideal);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, Yup, that would work too, although I meant where is the configuration initizalized for the first time, so that I could assign the ideal value to it at that time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With exact this command. If "[Library]", "MaxAnalysisThreads" is not set ideal is returned.

//Assume the value is incorrect, so fix it.
maxThreads = ideal;
}
m_MaxThreads = maxThreads;
}

AnalyzerManager::~AnalyzerManager() {
stop(true);
}

bool AnalyzerManager::isActive() {
int total = m_priorityWorkers.size() +
m_defaultWorkers.size() + m_pausedWorkers.size();
return total > 0;
}
bool AnalyzerManager::isDefaultQueueActive() {
int total = m_defaultWorkers.size() + m_pausedWorkers.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is + here correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, sorry for the noise.

return total > 0;
}

void AnalyzerManager::stop(bool shutdown) {
m_defaultTrackQueue.clear();
QListIterator<AnalyzerWorker*> it(m_defaultWorkers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Range based for loops more compact and easier to grasp than while loops with subsequently numbered local variables.

while (it.hasNext()) {
AnalyzerWorker* worker = it.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
QListIterator<AnalyzerWorker*> it3(m_pausedWorkers);
while (it3.hasNext()) {
AnalyzerWorker* worker = it3.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
if (shutdown) {
m_prioTrackQueue.clear();
QListIterator<AnalyzerWorker*> it2(m_priorityWorkers);
while (it2.hasNext()) {
AnalyzerWorker* worker = it2.next();
worker->endProcess();
m_endingWorkers.append(worker);
}
//TODO: ensure that they are all forcibly stopped.
}
}
//Add a track to be analyzed with a priority worker. (Like those required by loading a track into a player).
void AnalyzerManager::analyseTrackNow(TrackPointer tio) {
if (m_defaultTrackQueue.contains(tio)) {
m_defaultTrackQueue.removeAll(tio);
}
//TODO: There's one scenario that we still miss: load on a deck a track that is currently
//being analyzed by the background worker. We cannot reuse the background worker, but we should discard its work.
if (!m_prioTrackQueue.contains(tio)) {
m_prioTrackQueue.append(tio);
if (m_priorityWorkers.size() < m_MaxThreads) {
createNewWorker(WorkerType::priorityWorker);
if (m_priorityWorkers.size() + m_defaultWorkers.size() > m_MaxThreads) {
AnalyzerWorker * backwork = m_defaultWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_defaultWorkers.removeAll(backwork);
}
}
}
}
// This is called from the GUI for the analysis feature of the library.
void AnalyzerManager::queueAnalyseTrack(TrackPointer tio) {
if (!m_defaultTrackQueue.contains(tio)) {
m_defaultTrackQueue.append(tio);
if (m_pausedWorkers.size() + m_defaultWorkers.size() < m_MaxThreads) {
createNewWorker(WorkerType::defaultWorker);
}
}
}

// This slot is called from the decks and samplers when the track is loaded.
void AnalyzerManager::slotAnalyseTrack(TrackPointer tio) {
analyseTrackNow(tio);
}


//slot
void AnalyzerManager::slotUpdateProgress(int workerIdx, struct AnalyzerWorker::progress_info* progressInfo) {
//Updates to wave overview and player status text comes from a signal emited from the track by calling setAnalyzerProgress.
progressInfo->current_track->setAnalyzerProgress(progressInfo->track_progress);
//These update the Analysis feature and analysis view.
emit(trackProgress(workerIdx, progressInfo->track_progress / 10));
if (progressInfo->track_progress == 1000) {
//Right now no one is listening to trackDone, but it's here just in case.
emit(trackDone(progressInfo->current_track));
//Report that a track analysis has finished, and how many are still remaining.
emit(trackFinished(m_defaultWorkers.size() + m_defaultTrackQueue.size() - 1));
}
//TODO: Which is the consequence of not calling reset?
progressInfo->current_track.reset();
progressInfo->sema.release();
}

void AnalyzerManager::slotNextTrack(AnalyzerWorker* worker) {
//TODO: The old scan checked in isLoadedTrackWaiting for pTrack->getAnalyzerProgress()
// and either tried to load a previuos scan, or discarded the track if it had already been
// analyzed. I don't fully understand the scenario and I am not doing that right now.

//This is used when the maxThreads change. Extra workers are paused until active workers end.
//Then, those are terminated and the paused workers are resumed.
TrackPointer track;
AnalyzerWorker* forepaused=nullptr;
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker is also user outside the loop. Please pick a new name to avoid confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch! 👍 Indeed, i added that when adding the refresh of the threads from the preferences, but I overlooked the names.

if (worker->isPriorized()) { forepaused=worker; break; }
}
if (!forepaused) {
if (worker->isPriorized()) {
while (!track && !m_prioTrackQueue.isEmpty()) {
track = m_prioTrackQueue.dequeue();
}
}
else {
if (m_defaultWorkers.size() + m_pausedWorkers.size() <= m_MaxThreads) {
//The while loop is done in the event that the track which was added to the queue is no
//longer available.
while (!track && !m_defaultTrackQueue.isEmpty()) {
track = m_defaultTrackQueue.dequeue();
}
}
}
}
if (track) {
worker->nextTrack(track);
}
else {
worker->endProcess();
//Removing from active lists, so that "isActive" can return the correct value.
m_defaultWorkers.removeAll(worker);
m_priorityWorkers.removeAll(worker);
m_endingWorkers.append(worker);

if (forepaused) {
forepaused->resume();
m_pausedWorkers.removeOne(forepaused);
m_priorityWorkers.append(forepaused);
}
else if (!m_pausedWorkers.isEmpty()) {
AnalyzerWorker* otherworker = m_pausedWorkers.first();
otherworker->resume();
m_pausedWorkers.removeOne(otherworker);
if (otherworker->isPriorized()) {
m_priorityWorkers.append(otherworker);
}
else {
m_defaultWorkers.append(otherworker);
}
}
}
//Check if background workers are empty.
if (!isDefaultQueueActive()) {
emit(queueEmpty());
}
}
void AnalyzerManager::slotWorkerFinished(AnalyzerWorker* worker) {
m_endingWorkers.removeAll(worker);
m_defaultWorkers.removeAll(worker);
m_priorityWorkers.removeAll(worker);
m_pausedWorkers.removeAll(worker);
if (!isDefaultQueueActive()) {
emit(queueEmpty());
}
}
void AnalyzerManager::slotPaused(AnalyzerWorker* worker) {
//No useful code to execute right now.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Q_UNUSED(worker); to eliminate a warning

}
void AnalyzerManager::slotErrorString(QString errMsg) {
//TODO: This is currently unused.
qWarning() << "Testing with :" << errMsg;
}


void AnalyzerManager::slotMaxThreadsChanged(int threads) {
// If it is Active, adapt the amount of workers. If it is not active, it will just update the variable.
if (threads < m_MaxThreads) {
//Pause workers
while (!m_defaultWorkers.isEmpty()
&& m_priorityWorkers.size() + m_defaultWorkers.size() > threads) {
AnalyzerWorker * backwork = m_defaultWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_defaultWorkers.removeAll(backwork);
}
while (m_priorityWorkers.size() > threads) {
AnalyzerWorker * backwork = m_priorityWorkers.first();
backwork->pause();
//Ideally i would have done this on the slotPaused slot, but then i cannot
//ensure i won't call pause twice for the same worker.
m_pausedWorkers.append(backwork);
m_priorityWorkers.removeAll(backwork);
}
}
else {
//resume workers
int pendingworkers=threads-m_MaxThreads;
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
if (worker->isPriorized() && pendingworkers > 0) {
worker->resume();
m_pausedWorkers.removeOne(worker);
m_priorityWorkers.append(worker);
--pendingworkers;
}
}
foreach(AnalyzerWorker* worker, m_pausedWorkers) {
if (!worker->isPriorized() && pendingworkers > 0) {
worker->resume();
m_pausedWorkers.removeOne(worker);
m_defaultWorkers.append(worker);
--pendingworkers;
}
}
//Create new workers, if tracks in queue.
pendingworkers = math_min(pendingworkers,m_defaultTrackQueue.size());
for ( ;pendingworkers > 0; --pendingworkers) {
createNewWorker(WorkerType::defaultWorker);
}
}
m_MaxThreads=threads;
}

AnalyzerWorker* AnalyzerManager::createNewWorker(WorkerType wtype) {
bool priorized = (wtype == WorkerType::priorityWorker);
QThread* thread = new QThread();
AnalyzerWorker* worker = new AnalyzerWorker(m_pConfig, ++m_nextWorkerId, priorized);
worker->moveToThread(thread);
//Auto startup and auto cleanup of worker and thread.
connect(thread, SIGNAL(started()), worker, SLOT(slotProcess()));
connect(worker, SIGNAL(finished()), thread, SLOT(quit()));
connect(worker, SIGNAL(finished()), worker, SLOT(deleteLater()));
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
//Connect with manager.
connect(worker, SIGNAL(updateProgress(int, struct AnalyzerWorker::progress_info*)), this, SLOT(slotUpdateProgress(int, struct AnalyzerWorker::progress_info*)));
connect(worker, SIGNAL(waitingForNextTrack(AnalyzerWorker*)), this, SLOT(slotNextTrack(AnalyzerWorker*)));
connect(worker, SIGNAL(paused(AnalyzerWorker*)), this, SLOT(slotPaused(AnalyzerWorker*)));
connect(worker, SIGNAL(workerFinished(AnalyzerWorker*)), this, SLOT(slotWorkerFinished(AnalyzerWorker*)));
connect(worker, SIGNAL(error(QString)), this, SLOT(slotErrorString(QString)));
thread->start(QThread::LowPriority);
if (priorized) {
m_priorityWorkers.append(worker);
}
else {
m_defaultWorkers.append(worker);
}
return worker;
}



Loading