diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 75f6f16bb25..33d116dae33 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -78,6 +78,7 @@ /// too short a period can cause errors to disappear immediately after creation. #define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC) #define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue. +#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16 #define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032 #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 2dbd495d2c4..44699a324f4 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -68,8 +68,10 @@ #include #include +#include #include -#include +#include + namespace ProfileEvents { @@ -1439,33 +1441,19 @@ void Context::dropCaches() const } BackgroundProcessingPool & Context::getBackgroundPool() -{ - // Note: shared->background_pool should be initialized first. - auto lock = getLock(); - return *shared->background_pool; -} - -BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size) { auto lock = getLock(); if (!shared->background_pool) - shared->background_pool = std::make_shared(pool_size); + shared->background_pool = std::make_shared(settings.background_pool_size); return *shared->background_pool; } BackgroundProcessingPool & Context::getBlockableBackgroundPool() { - // TODO: maybe a better name for the pool - // Note: shared->blockable_background_pool should be initialized first. - auto lock = getLock(); - return *shared->blockable_background_pool; -} - -BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size) -{ + // TODO: choose a better thread pool size and maybe a better name for the pool auto lock = getLock(); if (!shared->blockable_background_pool) - shared->blockable_background_pool = std::make_shared(pool_size); + shared->blockable_background_pool = std::make_shared(settings.background_pool_size); return *shared->blockable_background_pool; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 434179e1ab8..b6e759e364b 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -380,9 +380,7 @@ class Context bool useL0Opt() const; BackgroundProcessingPool & getBackgroundPool(); - BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size); BackgroundProcessingPool & getBlockableBackgroundPool(); - BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size); void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config); @@ -507,7 +505,7 @@ class DDLGuard class SessionCleaner { public: - explicit SessionCleaner(Context & context_) + SessionCleaner(Context & context_) : context{context_} {} ~SessionCleaner(); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index f2b3bbbd7fe..9361e0525d2 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -80,8 +80,8 @@ struct Settings M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \ M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ - M(SettingUInt64, background_pool_size, 0, "Number of threads performing background work for tables (for example, merging in merge tree). Only effective at server startup. " \ - "0 means a quarter of the number of logical CPU cores of the machine.") \ + M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server " \ + "startup.") \ \ M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \ \ @@ -356,7 +356,7 @@ struct Settings M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \ M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \ M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ - M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means the number of logical CPU cores. Only effective at server startup")\ + M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\ M(SettingBool, enable_async_server, true, "Enable async rpc server.") \ M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 186ab0889d8..1bb35e51866 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -53,15 +53,10 @@ #include #include #include -#include -#include -#include #include #include #include -#include #include -#include #include #include #include @@ -86,6 +81,12 @@ #include #include +#include "HTTPHandlerFactory.h" +#include "MetricsPrometheus.h" +#include "MetricsTransmitter.h" +#include "StatusFile.h" +#include "TCPHandlerFactory.h" + #if Poco_NetSSL_FOUND #include #include @@ -1127,19 +1128,6 @@ int Server::main(const std::vector & /*args*/) global_context->getPathCapacity(), global_context->getFileProvider()); - /// if default value of background_pool_size is 0 - /// set it to the a quarter of the number of logical CPU cores of machine. - Settings & settings = global_context->getSettingsRef(); - if (settings.background_pool_size == 0) - { - global_context->setSetting("background_pool_size", std::to_string(server_info.cpu_info.logical_cores / 4)); - } - LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size); - - /// Initialize the background & blockable background thread pool. - auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size); - auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size); - global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast(global_context->getPageStorageRunMode())); @@ -1256,6 +1244,13 @@ int Server::main(const std::vector & /*args*/) /// Load global settings from default_profile and system_profile. /// It internally depends on UserConfig::parseSettings. global_context->setDefaultProfiles(config()); + Settings & settings = global_context->getSettingsRef(); + + /// Initialize the background thread pool. + /// It internally depends on settings.background_pool_size, + /// so must be called after settings has been load. + auto & bg_pool = global_context->getBackgroundPool(); + auto & blockable_bg_pool = global_context->getBlockableBackgroundPool(); /// Initialize RateLimiter. global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool); @@ -1407,7 +1402,7 @@ int Server::main(const std::vector & /*args*/) { auto size = settings.grpc_completion_queue_pool_size; if (size == 0) - size = server_info.cpu_info.logical_cores; + size = std::thread::hardware_concurrency(); GRPCCompletionQueuePool::global_instance = std::make_unique(size); } diff --git a/dbms/src/Storages/BackgroundProcessingPool.cpp b/dbms/src/Storages/BackgroundProcessingPool.cpp index 15740fa2875..45ba032bf53 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/BackgroundProcessingPool.cpp @@ -86,9 +86,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_) , thread_ids_counter(size_) { - if (size <= 0) - throw Exception("BackgroundProcessingPool size must be greater than 0", ErrorCodes::LOGICAL_ERROR); - LOG_FMT_INFO(&Poco::Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with {} threads", size); threads.resize(size); diff --git a/dbms/src/Storages/BackgroundProcessingPool.h b/dbms/src/Storages/BackgroundProcessingPool.h index 49a01b3a397..1ba6c4efcf8 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.h +++ b/dbms/src/Storages/BackgroundProcessingPool.h @@ -81,7 +81,7 @@ class BackgroundProcessingPool using TaskHandle = std::shared_ptr; - explicit BackgroundProcessingPool(int size_); + BackgroundProcessingPool(int size_); size_t getNumberOfThreads() const { return size; } @@ -96,7 +96,7 @@ class BackgroundProcessingPool /// 2. thread B also get the same task /// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task /// 4. thread B find the task is not occupied and execute the task again almost immediately - TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0); + TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0); void removeTask(const TaskHandle & task); ~BackgroundProcessingPool(); diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index a7bcfe43d7a..cbd42b57550 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -24,8 +24,6 @@ #include #include -#include - namespace DB::tests { std::unique_ptr TiFlashTestEnv::global_context = nullptr; @@ -41,10 +39,6 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR KeyManagerPtr key_manager = std::make_shared(false); global_context->initializeFileProvider(key_manager, false); - // initialize background & blockable background thread pool - global_context->initializeBackgroundPool(std::thread::hardware_concurrency() / 4); - global_context->initializeBlockableBackgroundPool(std::thread::hardware_concurrency() / 4); - // Theses global variables should be initialized by the following order // 1. capacity // 2. path pool