-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multithreaded analysis #1069
Closed
Closed
Multithreaded analysis #1069
Changes from all commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
c4aab97
Initial implementation of multithreaded analysis
JosepMaJAZ 4a2a142
Fixed end detection and updated signals to show multiple thread progress
JosepMaJAZ c06ab24
bugfix: one division too much
JosepMaJAZ 3c5c680
Inform the UI to update when stopping the manager
JosepMaJAZ 90882d8
ups. forgot to change this when fixing the division
JosepMaJAZ 0d28354
Fix abatch waveform analysis
JosepMaJAZ 8b450dc
documentation of methods
JosepMaJAZ b581f88
Preferences option to set the max number of threads for analysis
JosepMaJAZ 711a3f9
build fixes and changes from PR https://github.com/mixxxdj/mixxx/pull…
JosepMaJAZ 569493e
second round of fixes
JosepMaJAZ b478ecc
renaming of background/foreground/batch to default/priority/force, ad…
JosepMaJAZ 4bf1152
bugfixes related to changing maxthreads
JosepMaJAZ 1613fb6
build fix, oups!
JosepMaJAZ 6326a69
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ 0b06821
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ fe99a7d
Merge branch 'master' into multithreaded-analysis
JosepMaJAZ cfb5542
leave one idle thread whenever there's a priority thread running. Thi…
JosepMaJAZ 5410b84
merging to master
JosepMaJAZ bb09bb7
corrected VAMP_PATH multi-initialization. Corrected trackfinished cal…
JosepMaJAZ 8a0e695
restart the paused default worker once all the priority workers finish
JosepMaJAZ 2d7ab48
merge from master and bugfix on library.cpp (ouch! how did this work?…
JosepMaJAZ 0d6cd9a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 39dc961
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ a6a7994
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 645648d
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ c6f138f
merge with current master
JosepMaJAZ 24ba937
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 82efeea
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 982ce5a
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 7b09641
Several fixes and retouches. I believe this fixes the rare crashes th…
JosepMaJAZ a1c8aa1
linux compilation fixes plus removed calculation of threads from the …
JosepMaJAZ 9fe94ff
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 614d40f
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ 6da72ef
changes for sourceaudioapi v2 compatibility
JosepMaJAZ d010df7
merge remote branch and fix conflict
JosepMaJAZ 2731050
Merge remote-tracking branch 'origin/master' into multithreaded-analysis
JosepMaJAZ a467944
build fix
JosepMaJAZ File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,311 @@ | ||
#include "analyzer/analyzermanager.h" | ||
|
||
#include <typeinfo> | ||
#include <QThread> | ||
|
||
#include <QtDebug> | ||
#include <QMutexLocker> | ||
#include <QListIterator> | ||
|
||
#include "library/trackcollection.h" | ||
#include "mixer/playerinfo.h" | ||
#include "track/track.h" | ||
#include "util/compatibility.h" | ||
#include "util/event.h" | ||
#include "util/timer.h" | ||
#include "util/trace.h" | ||
|
||
AnalyzerManager::AnalyzerManager(UserSettingsPointer pConfig, | ||
mixxx::DbConnectionPoolPtr pDbConnectionPool) : | ||
m_pDbConnectionPool(std::move(pDbConnectionPool)), | ||
m_pConfig(pConfig), | ||
m_nextWorkerId(0), | ||
m_defaultTrackQueue(), | ||
m_prioTrackQueue(), | ||
m_defaultWorkers(), | ||
m_priorityWorkers(), | ||
m_pausedWorkers(), | ||
m_endingWorkers() { | ||
|
||
int ideal = QThread::idealThreadCount(); | ||
int maxThreads = m_pConfig->getValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"), ideal); | ||
if (ideal < 1) { | ||
if (maxThreads > 0 && maxThreads <= 32) { | ||
qDebug() << "Cannot detect idealThreadCount. maxThreads is: " << maxThreads; | ||
} | ||
else { | ||
qWarning() << "Cannot detect idealThreadCount and maxThreads is incorrect: " << maxThreads <<". Using the sane value of 1"; | ||
maxThreads = ideal; | ||
m_pConfig->setValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"), maxThreads); | ||
} | ||
} | ||
else if (maxThreads <= 0 || maxThreads > ideal) { | ||
qWarning() << "maxThreads value is incorrect: " << maxThreads << ". Changing it to " << ideal; | ||
//Assume the value is incorrect, so fix it. | ||
maxThreads = ideal; | ||
m_pConfig->setValue<int>(ConfigKey("[Library]", "MaxAnalysisThreads"), maxThreads); | ||
} | ||
m_MaxThreads = maxThreads; | ||
} | ||
|
||
AnalyzerManager::~AnalyzerManager() { | ||
stop(true); | ||
} | ||
|
||
bool AnalyzerManager::isActive() { | ||
int total = m_priorityWorkers.size() + | ||
m_defaultWorkers.size() + m_pausedWorkers.size(); | ||
return total > 0; | ||
} | ||
bool AnalyzerManager::isDefaultQueueActive() { | ||
int total = m_defaultWorkers.size() + m_pausedWorkers.size(); | ||
return total > 0; | ||
} | ||
|
||
void AnalyzerManager::stop(bool shutdown) { | ||
m_defaultTrackQueue.clear(); | ||
foreach(AnalyzerWorker* worker, m_defaultWorkers) { | ||
worker->endProcess(); | ||
m_endingWorkers.append(worker); | ||
} | ||
foreach(AnalyzerWorker* worker, m_pausedWorkers) { | ||
worker->endProcess(); | ||
m_endingWorkers.append(worker); | ||
} | ||
if (shutdown) { | ||
m_prioTrackQueue.clear(); | ||
foreach(AnalyzerWorker* worker, m_priorityWorkers) { | ||
worker->endProcess(); | ||
m_endingWorkers.append(worker); | ||
} | ||
//TODO: ensure that they are all forcibly stopped. | ||
} | ||
} | ||
//Add a track to be analyzed with a priority worker. (Like those required by loading a track into a player). | ||
void AnalyzerManager::analyseTrackNow(TrackPointer tio) { | ||
if (m_defaultTrackQueue.contains(tio)) { | ||
m_defaultTrackQueue.removeAll(tio); | ||
} | ||
//TODO: There's one scenario that we still miss: load on a deck a track that is currently | ||
//being analyzed by the background worker. We cannot reuse the background worker, but we should discard its work. | ||
if (!m_prioTrackQueue.contains(tio)) { | ||
m_prioTrackQueue.append(tio); | ||
if (m_priorityWorkers.isEmpty() && m_defaultWorkers.size() > 0) { | ||
//In order to keep the application responsive, and ensure that a priority worker is | ||
//not slowed down by default workers (because they have the same OS thread priority), | ||
//we stop one additional default worker | ||
AnalyzerWorker * backwork = m_defaultWorkers.first(); | ||
backwork->pause(); | ||
//Ideally i would have done this on the slotPaused slot, but then i cannot | ||
//ensure i won't call pause twice for the same worker. | ||
m_pausedWorkers.append(backwork); | ||
m_defaultWorkers.removeAll(backwork); | ||
} | ||
if (m_priorityWorkers.size() < m_MaxThreads-1) { | ||
createNewWorker(WorkerType::priorityWorker); | ||
if (m_priorityWorkers.size() + m_defaultWorkers.size() > m_MaxThreads-1) { | ||
AnalyzerWorker * backwork = m_defaultWorkers.first(); | ||
backwork->pause(); | ||
//Ideally i would have done this on the slotPaused slot, but then i cannot | ||
//ensure i won't call pause twice for the same worker. | ||
m_pausedWorkers.append(backwork); | ||
m_defaultWorkers.removeAll(backwork); | ||
} | ||
} | ||
} | ||
} | ||
// This is called from the GUI for the analysis feature of the library. | ||
void AnalyzerManager::queueAnalyseTrack(TrackPointer tio) { | ||
//See notes on analyseTrackNow of why we reduce the number of threads if there are priority workers. | ||
int maxDefThreads = (m_priorityWorkers.isEmpty()) ? m_MaxThreads : m_MaxThreads-1; | ||
if (!m_defaultTrackQueue.contains(tio)) { | ||
m_defaultTrackQueue.append(tio); | ||
if (m_pausedWorkers.size() + m_defaultWorkers.size() < maxDefThreads) { | ||
createNewWorker(WorkerType::defaultWorker); | ||
} | ||
} | ||
} | ||
|
||
// This slot is called from the decks and samplers when the track is loaded. | ||
void AnalyzerManager::slotAnalyseTrack(TrackPointer tio) { | ||
analyseTrackNow(tio); | ||
} | ||
|
||
|
||
//slot | ||
void AnalyzerManager::slotUpdateProgress(int workerIdx, struct AnalyzerWorker::progress_info* progressInfo) { | ||
//Updates to wave overview and player status text comes from a signal emited from the track by calling setAnalyzerProgress. | ||
progressInfo->current_track->setAnalyzerProgress(progressInfo->track_progress); | ||
//These update the Analysis feature and analysis view. | ||
emit(trackProgress(workerIdx, progressInfo->track_progress / 10)); | ||
if (progressInfo->track_progress == 1000) { | ||
//Right now no one is listening to trackDone, but it's here just in case. | ||
emit(trackDone(progressInfo->current_track)); | ||
//Report that a track analysis has finished, and how many are still remaining. | ||
emit(trackFinished(m_defaultWorkers.size() + m_defaultTrackQueue.size() - 1)); | ||
} | ||
//TODO: Which is the consequence of not calling reset? | ||
progressInfo->current_track.reset(); | ||
progressInfo->sema.release(); | ||
} | ||
|
||
void AnalyzerManager::slotNextTrack(AnalyzerWorker* worker) { | ||
//TODO: The old scan checked in isLoadedTrackWaiting for pTrack->getAnalyzerProgress() | ||
// and either tried to load a previuos scan, or discarded the track if it had already been | ||
// analyzed. I don't fully understand the scenario and I am not doing that right now. | ||
|
||
//This is used when the maxThreads change. Extra workers are paused until active workers end. | ||
//Then, those are terminated and the paused workers are resumed. | ||
TrackPointer track; | ||
AnalyzerWorker* forepaused=nullptr; | ||
foreach(AnalyzerWorker* theworker, m_pausedWorkers) { | ||
if (theworker->isPriorized()) { forepaused=theworker; break; } | ||
} | ||
if (!forepaused) { | ||
if (worker->isPriorized()) { | ||
while (!track && !m_prioTrackQueue.isEmpty()) { | ||
track = m_prioTrackQueue.dequeue(); | ||
} | ||
} | ||
else { | ||
if (m_defaultWorkers.size() + m_pausedWorkers.size() <= m_MaxThreads) { | ||
//The while loop is done in the event that the track which was added to the queue is no | ||
//longer available. | ||
while (!track && !m_defaultTrackQueue.isEmpty()) { | ||
track = m_defaultTrackQueue.dequeue(); | ||
} | ||
} | ||
} | ||
} | ||
if (track) { | ||
worker->nextTrack(track); | ||
} | ||
else { | ||
worker->endProcess(); | ||
//Removing from active lists, so that "isActive" can return the correct value. | ||
m_defaultWorkers.removeAll(worker); | ||
m_priorityWorkers.removeAll(worker); | ||
m_endingWorkers.append(worker); | ||
|
||
if (forepaused) { | ||
forepaused->resume(); | ||
m_pausedWorkers.removeOne(forepaused); | ||
m_priorityWorkers.append(forepaused); | ||
} | ||
else if (!m_pausedWorkers.isEmpty()) { | ||
AnalyzerWorker* otherworker = m_pausedWorkers.first(); | ||
otherworker->resume(); | ||
m_pausedWorkers.removeOne(otherworker); | ||
m_defaultWorkers.append(otherworker); | ||
if (m_priorityWorkers.isEmpty() && !m_pausedWorkers.isEmpty()) { | ||
//Once the priority workers have ended, restart the extra paused default worker. | ||
otherworker = m_pausedWorkers.first(); | ||
otherworker->resume(); | ||
m_pausedWorkers.removeOne(otherworker); | ||
m_defaultWorkers.append(otherworker); | ||
} | ||
} | ||
} | ||
} | ||
void AnalyzerManager::slotWorkerFinished(AnalyzerWorker* worker) { | ||
m_endingWorkers.removeAll(worker); | ||
m_defaultWorkers.removeAll(worker); | ||
m_priorityWorkers.removeAll(worker); | ||
m_pausedWorkers.removeAll(worker); | ||
if (!isDefaultQueueActive()) { | ||
emit(queueEmpty()); | ||
} | ||
} | ||
void AnalyzerManager::slotPaused(AnalyzerWorker* worker) { | ||
//No useful code to execute right now. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add Q_UNUSED(worker); to eliminate a warning |
||
Q_UNUSED(worker); | ||
} | ||
void AnalyzerManager::slotErrorString(QString errMsg) { | ||
//TODO: This is currently unused. | ||
qWarning() << "Testing with :" << errMsg; | ||
} | ||
|
||
|
||
void AnalyzerManager::slotMaxThreadsChanged(int threads) { | ||
//See notes on analyseTrackNow of why we reduce the number of threads if there are priority workers. | ||
int maxDefThreads = (m_priorityWorkers.isEmpty()) ? threads : threads-1; | ||
// If it is Active, adapt the amount of workers. If it is not active, it will just update the variable. | ||
if (threads < m_MaxThreads) { | ||
//Pause workers | ||
while (!m_defaultWorkers.isEmpty() | ||
&& m_priorityWorkers.size() + m_defaultWorkers.size() > maxDefThreads) { | ||
AnalyzerWorker * backwork = m_defaultWorkers.first(); | ||
backwork->pause(); | ||
//Ideally i would have done this on the slotPaused slot, but then i cannot | ||
//ensure i won't call pause twice for the same worker. | ||
m_pausedWorkers.append(backwork); | ||
m_defaultWorkers.removeAll(backwork); | ||
} | ||
while (m_priorityWorkers.size() > threads) { | ||
AnalyzerWorker * backwork = m_priorityWorkers.first(); | ||
backwork->pause(); | ||
//Ideally i would have done this on the slotPaused slot, but then i cannot | ||
//ensure i won't call pause twice for the same worker. | ||
m_pausedWorkers.append(backwork); | ||
m_priorityWorkers.removeAll(backwork); | ||
} | ||
} | ||
else { | ||
//resume workers | ||
int pendingworkers=threads-m_MaxThreads; | ||
foreach(AnalyzerWorker* worker, m_pausedWorkers) { | ||
if (worker->isPriorized() && pendingworkers > 0) { | ||
worker->resume(); | ||
m_pausedWorkers.removeOne(worker); | ||
m_priorityWorkers.append(worker); | ||
--pendingworkers; | ||
} | ||
} | ||
if (!m_priorityWorkers.isEmpty() && pendingworkers > 0) { | ||
pendingworkers--; | ||
} | ||
foreach(AnalyzerWorker* worker, m_pausedWorkers) { | ||
if (!worker->isPriorized() && pendingworkers > 0) { | ||
worker->resume(); | ||
m_pausedWorkers.removeOne(worker); | ||
m_defaultWorkers.append(worker); | ||
--pendingworkers; | ||
} | ||
} | ||
//Create new workers, if tracks in queue. | ||
pendingworkers = math_min(pendingworkers,m_defaultTrackQueue.size()); | ||
for ( ;pendingworkers > 0; --pendingworkers) { | ||
createNewWorker(WorkerType::defaultWorker); | ||
} | ||
} | ||
m_MaxThreads=threads; | ||
} | ||
|
||
AnalyzerWorker* AnalyzerManager::createNewWorker(WorkerType wtype) { | ||
bool priorized = (wtype == WorkerType::priorityWorker); | ||
QThread* thread = new QThread(); | ||
AnalyzerWorker* worker = new AnalyzerWorker(m_pConfig, m_pDbConnectionPool, ++m_nextWorkerId, priorized); | ||
worker->moveToThread(thread); | ||
//Auto startup and auto cleanup of worker and thread. | ||
connect(thread, SIGNAL(started()), worker, SLOT(slotProcess())); | ||
connect(worker, SIGNAL(finished()), thread, SLOT(quit())); | ||
connect(worker, SIGNAL(finished()), worker, SLOT(deleteLater())); | ||
connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater())); | ||
//Connect with manager. | ||
connect(worker, SIGNAL(updateProgress(int, struct AnalyzerWorker::progress_info*)), this, SLOT(slotUpdateProgress(int, struct AnalyzerWorker::progress_info*))); | ||
connect(worker, SIGNAL(waitingForNextTrack(AnalyzerWorker*)), this, SLOT(slotNextTrack(AnalyzerWorker*))); | ||
connect(worker, SIGNAL(paused(AnalyzerWorker*)), this, SLOT(slotPaused(AnalyzerWorker*))); | ||
connect(worker, SIGNAL(workerFinished(AnalyzerWorker*)), this, SLOT(slotWorkerFinished(AnalyzerWorker*))); | ||
connect(worker, SIGNAL(error(QString)), this, SLOT(slotErrorString(QString))); | ||
thread->start(QThread::LowPriority); | ||
if (priorized) { | ||
m_priorityWorkers.append(worker); | ||
} | ||
else { | ||
m_defaultWorkers.append(worker); | ||
} | ||
return worker; | ||
} | ||
|
||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is + here correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, sorry for the noise.