Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise default background threads size #4723

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
/// too short a period can cause errors to disappear immediately after creation.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC)
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16

#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
Expand Down
24 changes: 18 additions & 6 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@
#include <fmt/core.h>

#include <boost/functional/hash/hash.hpp>
#include <map>
#include <pcg_random.hpp>
#include <set>

#include <unordered_map>

namespace ProfileEvents
{
Expand Down Expand Up @@ -1443,19 +1441,33 @@ void Context::dropCaches() const
}

BackgroundProcessingPool & Context::getBackgroundPool()
{
// Note: shared->background_pool should be initialized first.
auto lock = getLock();
return *shared->background_pool;
}

BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size)
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
return *shared->background_pool;
}

BackgroundProcessingPool & Context::getBlockableBackgroundPool()
{
// TODO: choose a better thread pool size and maybe a better name for the pool
// TODO: maybe a better name for the pool
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
// Note: shared->blockable_background_pool should be initialized first.
auto lock = getLock();
return *shared->blockable_background_pool;
}

BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size)
{
auto lock = getLock();
if (!shared->blockable_background_pool)
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
return *shared->blockable_background_pool;
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ class Context
bool useL0Opt() const;

BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBlockableBackgroundPool();
BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size);

void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config);

Expand Down Expand Up @@ -505,7 +507,7 @@ class DDLGuard
class SessionCleaner
{
public:
SessionCleaner(Context & context_)
explicit SessionCleaner(Context & context_)
: context{context_}
{}
~SessionCleaner();
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ struct Settings
M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server " \
"startup.") \
M(SettingUInt64, background_pool_size, 0, "Number of threads performing background work for tables (for example, merging in merge tree). Only effective at server startup. " \
"0 means a quarter of the number of logical CPU cores of the machine.") \
\
M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \
\
Expand Down Expand Up @@ -356,7 +356,7 @@ struct Settings
M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \
M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \
M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \
M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\
M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means the number of logical CPU cores. Only effective at server startup")\
M(SettingBool, enable_async_server, true, "Enable async rpc server.") \
M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \
M(SettingUInt64, async_cqs, 1, "grpc async cqs") \
Expand Down
33 changes: 19 additions & 14 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,15 @@
#include <Poco/Net/NetException.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/MetricsPrometheus.h>
#include <Server/MetricsTransmitter.h>
#include <Server/RaftConfigParser.h>
#include <Server/Server.h>
#include <Server/ServerInfo.h>
#include <Server/StatusFile.h>
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
Expand All @@ -81,12 +86,6 @@
#include <limits>
#include <memory>

#include "HTTPHandlerFactory.h"
#include "MetricsPrometheus.h"
#include "MetricsTransmitter.h"
#include "StatusFile.h"
#include "TCPHandlerFactory.h"

#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h>
Expand Down Expand Up @@ -1135,6 +1134,19 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getPathCapacity(),
global_context->getFileProvider());

/// if default value of background_pool_size is 0
/// set it to the a quarter of the number of logical CPU cores of machine.
Settings & settings = global_context->getSettingsRef();
if (settings.background_pool_size == 0)
{
global_context->setSetting("background_pool_size", std::to_string(server_info.cpu_info.logical_cores / 4));
}
LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);

/// Initialize the background & blockable background thread pool.
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);

global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page);
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));
Expand Down Expand Up @@ -1251,13 +1263,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
global_context->setDefaultProfiles(config());
Settings & settings = global_context->getSettingsRef();

/// Initialize the background thread pool.
/// It internally depends on settings.background_pool_size,
/// so must be called after settings has been load.
auto & bg_pool = global_context->getBackgroundPool();
auto & blockable_bg_pool = global_context->getBlockableBackgroundPool();

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);
Expand Down Expand Up @@ -1409,7 +1414,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
size = server_info.cpu_info.logical_cores;
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_)
: size(size_)
, thread_ids_counter(size_)
{
if (size <= 0)
throw Exception("BackgroundProcessingPool size must be greater than 0", ErrorCodes::LOGICAL_ERROR);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved

LOG_FMT_INFO(&Poco::Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with {} threads", size);

threads.resize(size);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class BackgroundProcessingPool
using TaskHandle = std::shared_ptr<TaskInfo>;


BackgroundProcessingPool(int size_);
explicit BackgroundProcessingPool(int size_);

size_t getNumberOfThreads() const { return size; }

Expand All @@ -96,7 +96,7 @@ class BackgroundProcessingPool
/// 2. thread B also get the same task
/// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task
/// 4. thread B find the task is not occupied and execute the task again almost immediately
TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0);
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

~BackgroundProcessingPool();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/TestUtils/TiFlashTestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <Storages/Transaction/TMTContext.h>
#include <TestUtils/TiFlashTestEnv.h>

#include <thread>

namespace DB::tests
{
std::unique_ptr<Context> TiFlashTestEnv::global_context = nullptr;
Expand All @@ -39,6 +41,10 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR
KeyManagerPtr key_manager = std::make_shared<MockKeyManager>(false);
global_context->initializeFileProvider(key_manager, false);

// initialize background & blockable background thread pool
global_context->initializeBackgroundPool(std::thread::hardware_concurrency() / 4);
global_context->initializeBlockableBackgroundPool(std::thread::hardware_concurrency() / 4);

// Theses global variables should be initialized by the following order
// 1. capacity
// 2. path pool
Expand Down