diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 33d116dae33..75f6f16bb25 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -78,7 +78,6 @@ /// too short a period can cause errors to disappear immediately after creation. #define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC) #define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue. -#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16 #define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032 #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 3beedbd3601..a0345daaa75 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -68,10 +68,8 @@ #include #include -#include #include -#include - +#include namespace ProfileEvents { @@ -1443,19 +1441,33 @@ void Context::dropCaches() const } BackgroundProcessingPool & Context::getBackgroundPool() +{ + // Note: shared->background_pool should be initialized first. + auto lock = getLock(); + return *shared->background_pool; +} + +BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size) { auto lock = getLock(); if (!shared->background_pool) - shared->background_pool = std::make_shared(settings.background_pool_size); + shared->background_pool = std::make_shared(pool_size); return *shared->background_pool; } BackgroundProcessingPool & Context::getBlockableBackgroundPool() { - // TODO: choose a better thread pool size and maybe a better name for the pool + // TODO: maybe a better name for the pool + // Note: shared->blockable_background_pool should be initialized first. + auto lock = getLock(); + return *shared->blockable_background_pool; +} + +BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size) +{ auto lock = getLock(); if (!shared->blockable_background_pool) - shared->blockable_background_pool = std::make_shared(settings.background_pool_size); + shared->blockable_background_pool = std::make_shared(pool_size); return *shared->blockable_background_pool; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index b6e759e364b..434179e1ab8 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -380,7 +380,9 @@ class Context bool useL0Opt() const; BackgroundProcessingPool & getBackgroundPool(); + BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size); BackgroundProcessingPool & getBlockableBackgroundPool(); + BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size); void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config); @@ -505,7 +507,7 @@ class DDLGuard class SessionCleaner { public: - SessionCleaner(Context & context_) + explicit SessionCleaner(Context & context_) : context{context_} {} ~SessionCleaner(); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 9361e0525d2..f2b3bbbd7fe 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -80,8 +80,8 @@ struct Settings M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \ M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ - M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server " \ - "startup.") \ + M(SettingUInt64, background_pool_size, 0, "Number of threads performing background work for tables (for example, merging in merge tree). Only effective at server startup. " \ + "0 means a quarter of the number of logical CPU cores of the machine.") \ \ M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \ \ @@ -356,7 +356,7 @@ struct Settings M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \ M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \ M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ - M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\ + M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means the number of logical CPU cores. Only effective at server startup")\ M(SettingBool, enable_async_server, true, "Enable async rpc server.") \ M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3e2c29de76c..95c1d5d3f2a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -53,10 +53,15 @@ #include #include #include +#include +#include +#include #include #include #include +#include #include +#include #include #include #include @@ -81,12 +86,6 @@ #include #include -#include "HTTPHandlerFactory.h" -#include "MetricsPrometheus.h" -#include "MetricsTransmitter.h" -#include "StatusFile.h" -#include "TCPHandlerFactory.h" - #if Poco_NetSSL_FOUND #include #include @@ -1135,6 +1134,19 @@ int Server::main(const std::vector & /*args*/) global_context->getPathCapacity(), global_context->getFileProvider()); + /// if default value of background_pool_size is 0 + /// set it to the a quarter of the number of logical CPU cores of machine. + Settings & settings = global_context->getSettingsRef(); + if (settings.background_pool_size == 0) + { + global_context->setSetting("background_pool_size", std::to_string(server_info.cpu_info.logical_cores / 4)); + } + LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size); + + /// Initialize the background & blockable background thread pool. + auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size); + auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size); + global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast(global_context->getPageStorageRunMode())); @@ -1251,13 +1263,6 @@ int Server::main(const std::vector & /*args*/) /// Load global settings from default_profile and system_profile. /// It internally depends on UserConfig::parseSettings. global_context->setDefaultProfiles(config()); - Settings & settings = global_context->getSettingsRef(); - - /// Initialize the background thread pool. - /// It internally depends on settings.background_pool_size, - /// so must be called after settings has been load. - auto & bg_pool = global_context->getBackgroundPool(); - auto & blockable_bg_pool = global_context->getBlockableBackgroundPool(); /// Initialize RateLimiter. global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool); @@ -1409,7 +1414,7 @@ int Server::main(const std::vector & /*args*/) { auto size = settings.grpc_completion_queue_pool_size; if (size == 0) - size = std::thread::hardware_concurrency(); + size = server_info.cpu_info.logical_cores; GRPCCompletionQueuePool::global_instance = std::make_unique(size); } diff --git a/dbms/src/Storages/BackgroundProcessingPool.cpp b/dbms/src/Storages/BackgroundProcessingPool.cpp index 96c2c6cc622..9fb4271ea38 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/BackgroundProcessingPool.cpp @@ -87,6 +87,9 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_) , thread_ids_counter(size_) { + if (size <= 0) + throw Exception("BackgroundProcessingPool size must be greater than 0", ErrorCodes::LOGICAL_ERROR); + LOG_FMT_INFO(&Poco::Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with {} threads", size); threads.resize(size); diff --git a/dbms/src/Storages/BackgroundProcessingPool.h b/dbms/src/Storages/BackgroundProcessingPool.h index 1ba6c4efcf8..49a01b3a397 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.h +++ b/dbms/src/Storages/BackgroundProcessingPool.h @@ -81,7 +81,7 @@ class BackgroundProcessingPool using TaskHandle = std::shared_ptr; - BackgroundProcessingPool(int size_); + explicit BackgroundProcessingPool(int size_); size_t getNumberOfThreads() const { return size; } @@ -96,7 +96,7 @@ class BackgroundProcessingPool /// 2. thread B also get the same task /// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task /// 4. thread B find the task is not occupied and execute the task again almost immediately - TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0); + TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0); void removeTask(const TaskHandle & task); ~BackgroundProcessingPool(); diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index cbd42b57550..a7bcfe43d7a 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -24,6 +24,8 @@ #include #include +#include + namespace DB::tests { std::unique_ptr TiFlashTestEnv::global_context = nullptr; @@ -39,6 +41,10 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR KeyManagerPtr key_manager = std::make_shared(false); global_context->initializeFileProvider(key_manager, false); + // initialize background & blockable background thread pool + global_context->initializeBackgroundPool(std::thread::hardware_concurrency() / 4); + global_context->initializeBlockableBackgroundPool(std::thread::hardware_concurrency() / 4); + // Theses global variables should be initialized by the following order // 1. capacity // 2. path pool