From 6b9ffdc3f7d6710b4c63689aa0c3be4d99b9a077 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Mon, 23 May 2022 11:14:45 +0800 Subject: [PATCH 1/3] deleted the scheduled task with exceeded state from the waiting tasks queue Signed-off-by: fzhedu --- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 9fbe4b7b7cb..67ad34da11d 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -155,7 +155,10 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) { auto task = query_task_set->waiting_tasks.front(); if (!scheduleImp(current_query_id, query_task_set, task, true)) + { + query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, as the task is scheduled as the exceeded state. return; + } query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } From a4c816da6fc747b24e9e0c7aae61bee3a52558cf Mon Sep 17 00:00:00 2001 From: fzhedu Date: Mon, 23 May 2022 12:43:48 +0800 Subject: [PATCH 2/3] use task id instead of task ptr in the waiting task queue Signed-off-by: fzhedu --- dbms/src/Flash/Mpp/MPPTaskManager.h | 2 +- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 024dd4f3a59..d7047804aca 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -35,7 +35,7 @@ struct MPPQueryTaskSet bool to_be_cancelled = false; MPPTaskMap task_map; /// only used in scheduler - std::queue waiting_tasks; + std::queue waiting_tasks; }; using MPPQueryTaskSetPtr = std::shared_ptr; diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 67ad34da11d..2afc8293c78 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -97,7 +97,9 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage { while (!query_task_set->waiting_tasks.empty()) { - query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED); + auto task = query_task_set->task_map.find(query_task_set->waiting_tasks.front())->second; + if (task != nullptr) + task->scheduleThisTask(MPPTask::ScheduleState::FAILED); query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } @@ -153,8 +155,8 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { - auto task = query_task_set->waiting_tasks.front(); - if (!scheduleImp(current_query_id, query_task_set, task, true)) + auto task = query_task_set->task_map.find(query_task_set->waiting_tasks.front())->second; + if (task != nullptr && !scheduleImp(current_query_id, query_task_set, task, true)) { query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, as the task is scheduled as the exceeded state. return; @@ -207,7 +209,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q if (!isWaiting) { waiting_set.insert(tso); - query_task_set->waiting_tasks.push(task); + query_task_set->waiting_tasks.push(task->getId()); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); } From 9d4f6b8457da1bf47369aa11fbf6b47471d6238f Mon Sep 17 00:00:00 2001 From: fzhedu Date: Mon, 23 May 2022 16:19:31 +0800 Subject: [PATCH 3/3] fix bugs Signed-off-by: fzhedu --- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 21 +++++++++++++-------- dbms/src/Flash/Mpp/MinTSOScheduler.h | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 2afc8293c78..af525bd1a55 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -72,7 +72,8 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta LOG_FMT_WARNING(log, "{} is scheduled with miss or cancellation.", id.toString()); return true; } - return scheduleImp(id.start_ts, query_task_set, task, false); + bool has_error = false; + return scheduleImp(id.start_ts, query_task_set, task, false, has_error); } /// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled. @@ -97,9 +98,9 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage { while (!query_task_set->waiting_tasks.empty()) { - auto task = query_task_set->task_map.find(query_task_set->waiting_tasks.front())->second; - if (task != nullptr) - task->scheduleThisTask(MPPTask::ScheduleState::FAILED); + auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front()); + if (task_it != query_task_set->task_map.end() && task_it->second != nullptr) + task_it->second->scheduleThisTask(MPPTask::ScheduleState::FAILED); query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } @@ -155,10 +156,12 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { - auto task = query_task_set->task_map.find(query_task_set->waiting_tasks.front())->second; - if (task != nullptr && !scheduleImp(current_query_id, query_task_set, task, true)) + auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front()); + bool has_error = false; + if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error)) { - query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, as the task is scheduled as the exceeded state. + if (has_error) + query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors. return; } query_task_set->waiting_tasks.pop(); @@ -171,7 +174,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) } /// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases. -bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting) +bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error) { auto needed_threads = task->getNeededThreads(); auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit; @@ -192,6 +195,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q { if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here. { + has_error = true; auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); LOG_FMT_ERROR(log, "{}", msg); GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); @@ -205,6 +209,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q { throw Exception(msg); } + return false; } if (!isWaiting) { diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.h b/dbms/src/Flash/Mpp/MinTSOScheduler.h index 501aa772a33..17ab1f4dfa3 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.h +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.h @@ -42,7 +42,7 @@ class MinTSOScheduler : private boost::noncopyable void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager); private: - bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting); + bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error); bool updateMinTSO(const UInt64 tso, const bool retired, const String msg); void scheduleWaitingQueries(MPPTaskManager & task_manager); bool isDisabled()