Skip to content

Commit

Permalink
Meaningful thread names for threads of pool (#1699)
Browse files Browse the repository at this point in the history
* feature: meaningful thread names for threads of pool

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>
  • Loading branch information
xDimon authored Jul 18, 2023
1 parent 6fb4b3e commit fbb9df2
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 11 deletions.
5 changes: 4 additions & 1 deletion core/application/impl/kagome_application_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ namespace kagome::application {
}

app_state_manager->atLaunch([ctx{io_context}, log{logger_}] {
std::thread asio_runner([ctx{ctx}, log{log}] { ctx->run(); });
std::thread asio_runner([ctx{ctx}, log{log}] {
soralog::util::setThreadName("kagome"); // explicitly for macos
ctx->run();
});
asio_runner.detach();
return true;
});
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace kagome::consensus::grandpa {
block_tree_(std::move(block_tree)),
reputation_repository_(std::move(reputation_repository)),
babe_status_observable_(std::move(babe_status_observable)),
execution_thread_pool_{std::make_shared<ThreadPool>(1ull)},
execution_thread_pool_{std::make_shared<ThreadPool>("grandpa", 1ull)},
internal_thread_context_{execution_thread_pool_->handler()},
main_thread_context_{std::move(main_thread_context)},
scheduler_{std::make_shared<libp2p::basic::SchedulerImpl>(
Expand Down
4 changes: 2 additions & 2 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,10 @@ namespace {
sptr<ThreadPool> get_thread_pool(const Injector &injector) {
const auto cores = std::thread::hardware_concurrency();
if (cores == 0ul) {
return std::make_shared<ThreadPool>(5ull);
return std::make_shared<ThreadPool>("worker", 5ull);
}

return std::make_shared<ThreadPool>(cores);
return std::make_shared<ThreadPool>("worker", cores);
}

template <typename... Ts>
Expand Down
6 changes: 4 additions & 2 deletions core/metrics/impl/exposer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ namespace kagome::metrics {
acceptor_->local_endpoint().port());
acceptOnce();

thread_ = std::make_shared<std::thread>(
[context = context_]() { context->run(); });
thread_ = std::make_shared<std::thread>([context = context_] {
soralog::util::setThreadName("metric-exposer");
context->run();
});
thread_->detach();

return true;
Expand Down
1 change: 1 addition & 0 deletions core/metrics/impl/metrics_watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace kagome::metrics {

bool MetricsWatcher::start() {
thread_ = std::thread([this] {
soralog::util::setThreadName("metric-watcher");
while (not shutdown_requested_) {
auto storage_size_res = measure_storage_size();
if (storage_size_res.has_value()) {
Expand Down
2 changes: 2 additions & 0 deletions core/network/impl/stream_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#include <random>
#include <unordered_map>

#if defined(BACKWARD_HAS_BACKTRACE)
#include <backward.hpp>
#endif

#include "libp2p/connection/stream.hpp"
#include "libp2p/host/host.hpp"
Expand Down
7 changes: 6 additions & 1 deletion core/offchain/impl/offchain_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ namespace kagome::offchain {
outcome::result<void> OffchainWorkerImpl::run() {
BOOST_ASSERT(not ocw_pool_->getWorker());

soralog::util::setThreadName("ocw." + std::to_string(block_.number));
auto at_end =
gsl::finally([prev_thread_name = soralog::util::getThreadName()] {
soralog::util::setThreadName(prev_thread_name);
});

soralog::util::setThreadName("ocw.#" + std::to_string(block_.number));

ocw_pool_->addWorker(shared_from_this());
auto remove = gsl::finally([&] { ocw_pool_->removeWorker(); });
Expand Down
2 changes: 1 addition & 1 deletion core/offchain/impl/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace kagome::offchain {
: threads_{threads},
free_threads_{threads},
max_tasks_{max_tasks},
thread_pool_{threads_} {}
thread_pool_{"ocw", threads_} {}

void run(Task &&task) {
std::unique_lock lock{mutex_};
Expand Down
2 changes: 1 addition & 1 deletion core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ namespace kagome::parachain {
std::shared_ptr<parachain::Pvf> pvf,
std::shared_ptr<parachain::Recovery> recovery,
std::shared_ptr<boost::asio::io_context> this_context)
: int_pool_{std::make_shared<ThreadPool>(1ull)},
: int_pool_{std::make_shared<ThreadPool>("approval", 1ull)},
internal_context_{int_pool_->handler()},
thread_pool_{std::move(thread_pool)},
thread_pool_context_{thread_pool_->handler()},
Expand Down
5 changes: 4 additions & 1 deletion core/telemetry/impl/service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ namespace kagome::telemetry {
connections_.emplace_back(std::move(connection));
}
worker_thread_ = std::make_shared<std::thread>(
[io_context{io_context_}] { io_context->run(); });
[io_context{io_context_}] {
soralog::util::setThreadName("telemetry");
io_context->run();
});
worker_thread_->detach();
return true;
}
Expand Down
37 changes: 36 additions & 1 deletion core/utils/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <optional>
#include <thread>

#include "soralog/util.hpp"
#include "utils/non_copyable.hpp"

namespace kagome {
Expand Down Expand Up @@ -89,15 +90,49 @@ namespace kagome {
ThreadPool &operator=(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;

[[deprecated("Please, use ctor for pool with named threads")]] //
explicit ThreadPool(size_t thread_count)
: ioc_{std::make_shared<boost::asio::io_context>()},
work_guard_{ioc_->get_executor()} {
BOOST_ASSERT(ioc_);
BOOST_ASSERT(thread_count > 0);

static size_t pool_count = 0;

threads_.reserve(thread_count);
for (size_t i = 0; i < thread_count; ++i) {
threads_.emplace_back(
[io{ioc_}, thread_count, pool_n = ++pool_count, thread_n = i + 1] {
if (thread_count > 1) {
soralog::util::setThreadName(
fmt::format("worker.{}.{}", pool_n, thread_n));
} else {
soralog::util::setThreadName(fmt::format("worker.{}", pool_n));
}
io->run();
});
}
}

ThreadPool(std::string_view pool_tag, size_t thread_count)
: ioc_{std::make_shared<boost::asio::io_context>()},
work_guard_{ioc_->get_executor()} {
BOOST_ASSERT(ioc_);
BOOST_ASSERT(thread_count > 0);

threads_.reserve(thread_count);
for (size_t i = 0; i < thread_count; ++i) {
threads_.emplace_back([io{ioc_}] { io->run(); });
threads_.emplace_back([io{ioc_},
pool_tag = std::string(pool_tag),
thread_count,
n = i + 1] {
if (thread_count > 1) {
soralog::util::setThreadName(fmt::format("{}.{}", pool_tag, n));
} else {
soralog::util::setThreadName(pool_tag);
}
io->run();
});
}
}

Expand Down

0 comments on commit fbb9df2

Please sign in to comment.