Skip to content

Commit

Permalink
New metrics for ThreadPool
Browse files Browse the repository at this point in the history
- Implement a new set of metrics for ThreadPool to provide deeper insights into the performance and operational aspects, aiding in diagnosing potential slowdowns.
- Introduce safety checks in ThreadPoolImpl constructor to ensure thread count parameters are within logical bounds and system limitations.
- Establish MAX_THREADS_LIMIT as 32768, in line with standard PID limits on Linux systems, to ensure thread pool sizes adhere to realistic system constraints.
  • Loading branch information
filimonov committed Jan 26, 2024
1 parent 930ce11 commit fafec69
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \
M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \
M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \
M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \
M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \
M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \
M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \
M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
Expand Down
67 changes: 66 additions & 1 deletion src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
Expand Down Expand Up @@ -28,6 +29,21 @@ namespace CurrentMetrics
extern const Metric GlobalThreadScheduled;
}

namespace ProfileEvents
{
extern const Event GlobalThreadPoolExpansions;
extern const Event GlobalThreadPoolShrinks;
extern const Event GlobalThreadPoolThreadCreationMicroseconds;
extern const Event GlobalThreadPoolLockWaitMicroseconds;
extern const Event GlobalThreadPoolJobs;

extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolThreadCreationMicroseconds;
extern const Event LocalThreadPoolLockWaitMicroseconds;
extern const Event LocalThreadPoolJobs;
}

class JobWithPriority
{
public:
Expand Down Expand Up @@ -63,6 +79,7 @@ class JobWithPriority
};

static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
static constexpr auto MAX_THREADS_LIMIT = 32768; // This is the default PID limit on most Linux systems.

template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_, Metric metric_scheduled_jobs_)
Expand Down Expand Up @@ -98,11 +115,22 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
if (max_threads == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The 'max_threads' parameter for the thread pool cannot be zero.");

if (max_threads > MAX_THREADS_LIMIT)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The 'max_threads' parameter ({}) for the thread pool should not exceed the limit of {}.", max_threads, MAX_THREADS_LIMIT);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
if (value == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The 'max_threads' value cannot be set to zero.");

if (value > MAX_THREADS_LIMIT)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The 'max_threads' value ({}) for the thread pool should not exceed the limit of {}.", max_threads, MAX_THREADS_LIMIT);

std::lock_guard lock(mutex);
bool need_start_threads = (value > max_threads);
bool need_finish_free_threads = (value < max_free_threads);
Expand Down Expand Up @@ -162,6 +190,7 @@ template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context)
{
Stopwatch watch;
auto on_error = [&](const std::string & reason)
{
if constexpr (std::is_same_v<ReturnType, void>)
Expand All @@ -182,10 +211,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

{
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
if (wait_microseconds) /// Check for optional. Condition is also true if the optional is set and the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds));
Expand Down Expand Up @@ -214,7 +246,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

try
{
Stopwatch watch;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}
catch (...)
{
Expand All @@ -237,6 +275,12 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobScheduleMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleMicroseconds,
watch.elapsedMicroseconds());

ProfileEvents::increment( std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs);

return static_cast<ReturnType>(true);
}

Expand All @@ -260,7 +304,14 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()

try
{
Stopwatch watch;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);

}
catch (...)
{
Expand Down Expand Up @@ -291,7 +342,11 @@ void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, Priority priority, uint64_
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
Expand Down Expand Up @@ -332,7 +387,11 @@ void ThreadPoolImpl<Thread>::finalize()

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}

threads.clear();
}
Expand Down Expand Up @@ -389,7 +448,11 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::optional<JobWithPriority> job_data;

{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

// Finish with previous job if any
if (job_is_done)
Expand Down Expand Up @@ -422,6 +485,8 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
return;
}
Expand Down

0 comments on commit fafec69

Please sign in to comment.