Skip to content

Commit

Permalink
Fix thread leaks (#1972)
Browse files Browse the repository at this point in the history
* refactor: join all started threads

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: logger preparation for tests

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: tsan alert suppression list

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: logging thread pool lifetime

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: strict type of thread pool for rpc

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: separate thread pool and thread handler

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: rpc thread pool injection

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: extract approval distribution error

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: strict type of thread pool for pvf

* refactor: strict type of thread pool for disputes

* refactor: strict type of thread pool for grandpa

* refactor: strict type of thread pool for off-chain workers

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: strict type of thread pool for approval distribution

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: strict type of thread pool of workers

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: remove ability to inject basic thread pool

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: tests

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: confusing naming

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: shutdown over watchdog stop

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: missed include

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactor: replace thread handler by context

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: review issue

Co-authored-by: Ruslan Tushov <turuslan@users.noreply.github.com>

* fix: review issue

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: tests

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: review issue

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: test

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: tests

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: test

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

---------

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>
Co-authored-by: Ruslan Tushov <turuslan@users.noreply.github.com>
  • Loading branch information
xDimon and turuslan authored Feb 2, 2024
1 parent e3db8bf commit 6468930
Show file tree
Hide file tree
Showing 64 changed files with 949 additions and 632 deletions.
1 change: 1 addition & 0 deletions .thread-sanitizer-ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
race:soralog::Logger::push
15 changes: 11 additions & 4 deletions cmake/toolchain/flags/sanitize_thread.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ endif ()

include(${CMAKE_CURRENT_LIST_DIR}/../../add_cache_flag.cmake)

add_cache_flag(CMAKE_CXX_FLAGS "-fsanitize=thread")
add_cache_flag(CMAKE_CXX_FLAGS "-g")
set(FLAGS
-fsanitize=thread
-fsanitize-blacklist="${CMAKE_CURRENT_LIST_DIR}/../../../.thread-sanitizer-ignore"
-fsanitize-ignorelist="${CMAKE_CURRENT_LIST_DIR}/../../../.thread-sanitizer-ignore"
-g
-O1
)

add_cache_flag(CMAKE_C_FLAGS "-fsanitize=thread")
add_cache_flag(CMAKE_C_FLAGS "-g")
foreach(FLAG IN LISTS FLAGS)
add_cache_flag(CMAKE_CXX_FLAGS ${FLAG})
add_cache_flag(CMAKE_C_FLAGS ${FLAG})
endforeach()

add_cache_flag(CMAKE_EXE_LINKER_FLAGS "-fsanitize=thread")
add_cache_flag(CMAKE_SHARED_LINKER_FLAGS "-fsanitize=thread")
Expand Down
13 changes: 6 additions & 7 deletions core/api/service/impl/api_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "api/jrpc/jrpc_processor.hpp"
#include "api/jrpc/jrpc_server.hpp"
#include "api/jrpc/value_converter.hpp"
#include "api/service/impl/rpc_thread_pool.hpp"
#include "api/transport/listener.hpp"
#include "application/app_state_manager.hpp"
#include "blockchain/block_tree.hpp"
Expand Down Expand Up @@ -135,8 +136,7 @@ namespace kagome::api {
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<storage::trie::TrieStorage> trie_storage,
std::shared_ptr<runtime::Core> core,
std::shared_ptr<Watchdog> watchdog,
std::shared_ptr<RpcContext> rpc_context)
std::shared_ptr<RpcThreadPool> rpc_thread_pool)
: listeners_(std::move(listeners)),
server_(std::move(server)),
logger_{log::createLogger("ApiService", "api")},
Expand All @@ -147,8 +147,7 @@ namespace kagome::api {
.chain = std::move(chain_sub_engine),
.ext = std::move(ext_sub_engine)},
extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)},
execution_thread_pool_{std::make_shared<ThreadPool>(
std::move(watchdog), "rpc", 1ull, std::move(rpc_context))} {
rpc_thread_pool_{std::move(rpc_thread_pool)} {
BOOST_ASSERT(block_tree_);
BOOST_ASSERT(trie_storage_);
BOOST_ASSERT(core_);
Expand All @@ -161,13 +160,13 @@ namespace kagome::api {
BOOST_ASSERT(processor != nullptr);
processor->registerHandlers();
}

app_state_manager.takeControl(*this);

BOOST_ASSERT(subscription_engines_.chain);
BOOST_ASSERT(subscription_engines_.storage);
BOOST_ASSERT(subscription_engines_.ext);
BOOST_ASSERT(extrinsic_event_key_repo_);
BOOST_ASSERT(rpc_thread_pool_);

app_state_manager.takeControl(*this);
}

jsonrpc::Value ApiServiceImpl::createStateStorageEvent(
Expand Down
7 changes: 3 additions & 4 deletions core/api/service/impl/api_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#include "primitives/block_id.hpp"
#include "primitives/event_types.hpp"
#include "subscription/subscription_engine.hpp"
#include "utils/thread_pool.hpp"

namespace kagome::api {
class JRpcProcessor;
class JRpcServer;
class Listener;
class RpcThreadPool;
} // namespace kagome::api
namespace kagome::application {
class AppStateManager;
Expand Down Expand Up @@ -123,8 +123,7 @@ namespace kagome::api {
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<storage::trie::TrieStorage> trie_storage,
std::shared_ptr<runtime::Core> core,
std::shared_ptr<Watchdog> watchdog,
std::shared_ptr<RpcContext> rpc_context);
std::shared_ptr<RpcThreadPool> rpc_thread_pool);

~ApiServiceImpl() override = default;

Expand Down Expand Up @@ -245,6 +244,6 @@ namespace kagome::api {
std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
extrinsic_event_key_repo_;

std::shared_ptr<ThreadPool> execution_thread_pool_;
std::shared_ptr<RpcThreadPool> rpc_thread_pool_;
};
} // namespace kagome::api
20 changes: 20 additions & 0 deletions core/api/service/impl/rpc_thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "api/transport/rpc_io_context.hpp"
#include "utils/thread_pool.hpp"
#include "utils/watchdog.hpp"

namespace kagome::api {
class RpcThreadPool final : public ThreadPool {
public:
RpcThreadPool(std::shared_ptr<Watchdog> watchdog,
std::shared_ptr<RpcContext> rpc_context)
: ThreadPool(std::move(watchdog), "rpc", 1, std::move(rpc_context)) {}
};
} // namespace kagome::api
29 changes: 18 additions & 11 deletions core/application/impl/kagome_application_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,23 @@ namespace kagome::application {
exit(EXIT_FAILURE);
}

app_state_manager->atLaunch([ctx{io_context}, watchdog] {
std::thread asio_runner([ctx{ctx}, watchdog] {
soralog::util::setThreadName("kagome"); // explicitly for macos
std::unique_ptr<std::thread> asio_runner;

app_state_manager->atLaunch([ctx{io_context}, watchdog, &asio_runner] {
asio_runner = std::make_unique<std::thread>([ctx{ctx}, watchdog] {
soralog::util::setThreadName("main_runner"); // explicitly for macos
watchdog->run(ctx);
});
asio_runner.detach();
return true;
});

std::thread watchdog_thread(
[watchdog] { watchdog->checkLoop(kWatchdogDefaultTimeout); });
watchdog_thread.detach();

app_state_manager->atShutdown([ctx{io_context}, watchdog] {
ctx->stop();
watchdog->stop();
std::thread watchdog_thread([watchdog] {
soralog::util::setThreadName("watchdog");
watchdog->checkLoop(kWatchdogDefaultTimeout);
});

app_state_manager->atShutdown([watchdog] { watchdog->stop(); });

{ // Metrics
auto metrics_registry = metrics::createRegistry();

Expand Down Expand Up @@ -127,6 +126,14 @@ namespace kagome::application {
}

app_state_manager->run();

watchdog->stop();

if (asio_runner) {
asio_runner->join();
}

watchdog_thread.join();
}

} // namespace kagome::application
68 changes: 39 additions & 29 deletions core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "log/profiling_logger.hpp"
#include "storage/database_error.hpp"
#include "storage/trie_pruner/trie_pruner.hpp"
#include "utils/thread_handler.hpp"
#include "utils/weak_io_context.hpp"

namespace {
constexpr auto blockHeightMetricName = "kagome_block_height";
Expand Down Expand Up @@ -124,7 +126,7 @@ namespace kagome::blockchain {
std::shared_ptr<const class JustificationStoragePolicy>
justification_storage_policy,
std::shared_ptr<storage::trie_pruner::TriePruner> state_pruner,
WeakIoContext main_thread) {
WeakIoContext main_thread_context) {
BOOST_ASSERT(storage != nullptr);
BOOST_ASSERT(header_repo != nullptr);

Expand Down Expand Up @@ -283,7 +285,7 @@ namespace kagome::blockchain {
std::move(extrinsic_event_key_repo),
std::move(justification_storage_policy),
state_pruner,
std::move(main_thread)));
std::move(main_thread_context)));

// Add non-finalized block to the block tree
for (auto &e : collected) {
Expand Down Expand Up @@ -419,7 +421,7 @@ namespace kagome::blockchain {
std::shared_ptr<const JustificationStoragePolicy>
justification_storage_policy,
std::shared_ptr<storage::trie_pruner::TriePruner> state_pruner,
WeakIoContext main_thread)
WeakIoContext main_thread_context)
: block_tree_data_{BlockTreeData{
.header_repo_ = std::move(header_repo),
.storage_ = std::move(storage),
Expand All @@ -433,7 +435,11 @@ namespace kagome::blockchain {
.genesis_block_hash_ = {},
.blocks_pruning_ = {app_config.blocksPruning(), finalized.number},
}},
main_thread_{std::move(main_thread)} {
main_thread_handler_{[&] {
BOOST_ASSERT(not main_thread_context.expired());
return std::make_shared<ThreadHandler>(
std::move(main_thread_context));
}()} {
block_tree_data_.sharedAccess([&](const BlockTreeData &p) {
BOOST_ASSERT(p.header_repo_ != nullptr);
BOOST_ASSERT(p.storage_ != nullptr);
Expand Down Expand Up @@ -480,7 +486,7 @@ namespace kagome::blockchain {
extrinsic_events_engine_ = std::move(extrinsic_events_engine);
BOOST_ASSERT(extrinsic_events_engine_ != nullptr);

main_thread_.start();
main_thread_handler_->start();
}

const primitives::BlockHash &BlockTreeImpl::getGenesisBlockHash() const {
Expand Down Expand Up @@ -559,7 +565,7 @@ namespace kagome::blockchain {
auto extrinsic_hash = p.hasher_->blake2b_256(ext.data);
SL_DEBUG(log_, "Adding extrinsic with hash {}", extrinsic_hash);
if (auto key = p.extrinsic_event_key_repo_->get(extrinsic_hash)) {
main_thread_.execute(
main_thread_handler_->execute(
[wself{weak_from_this()}, key{key.value()}, block_hash]() {
if (auto self = wself.lock()) {
self->extrinsic_events_engine_->notify(
Expand All @@ -582,11 +588,13 @@ namespace kagome::blockchain {
primitives::events::ChainEventType event,
const primitives::BlockHeader &header) {
BOOST_ASSERT(header.hash_opt.has_value());
main_thread_.execute([wself{weak_from_this()}, event, header]() mutable {
if (auto self = wself.lock()) {
self->chain_events_engine_->notify(std::move(event), std::move(header));
}
});
main_thread_handler_->execute(
[wself{weak_from_this()}, event, header]() mutable {
if (auto self = wself.lock()) {
self->chain_events_engine_->notify(std::move(event),
std::move(header));
}
});
}

outcome::result<void> BlockTreeImpl::removeLeaf(
Expand Down Expand Up @@ -819,9 +827,9 @@ namespace kagome::blockchain {
for (auto &ext : body.value()) {
auto extrinsic_hash = p.hasher_->blake2b_256(ext.data);
if (auto key = p.extrinsic_event_key_repo_->get(extrinsic_hash)) {
main_thread_.execute([wself{weak_from_this()},
key{key.value()},
block_hash]() {
main_thread_handler_->execute([wself{weak_from_this()},
key{key.value()},
block_hash]() {
if (auto self = wself.lock()) {
self->extrinsic_events_engine_->notify(
key,
Expand All @@ -833,15 +841,16 @@ namespace kagome::blockchain {
}
}

main_thread_.execute([weak{weak_from_this()},
retired_hashes{std::move(retired_hashes)}] {
if (auto self = weak.lock()) {
self->chain_events_engine_->notify(
primitives::events::ChainEventType::
kDeactivateAfterFinalization,
retired_hashes);
}
});
main_thread_handler_->execute(
[weak{weak_from_this()},
retired_hashes{std::move(retired_hashes)}] {
if (auto self = weak.lock()) {
self->chain_events_engine_->notify(
primitives::events::ChainEventType::
kDeactivateAfterFinalization,
retired_hashes);
}
});

log_->info("Finalized block {}", node->info);
telemetry_->notifyBlockFinalized(node->info);
Expand Down Expand Up @@ -1301,9 +1310,9 @@ namespace kagome::blockchain {
for (auto &ext : block_body_opt.value()) {
auto extrinsic_hash = p.hasher_->blake2b_256(ext.data);
if (auto key = p.extrinsic_event_key_repo_->get(extrinsic_hash)) {
main_thread_.execute([wself{weak_from_this()},
key{key.value()},
block_hash{block.hash}]() {
main_thread_handler_->execute([wself{weak_from_this()},
key{key.value()},
block_hash{block.hash}]() {
if (auto self = wself.lock()) {
self->extrinsic_events_engine_->notify(
key,
Expand All @@ -1322,9 +1331,10 @@ namespace kagome::blockchain {
}

// trying to return extrinsics back to transaction pool
main_thread_.execute([extrinsics{std::move(extrinsics)},
wself{weak_from_this()},
retired_hashes{std::move(retired_hashes)}]() mutable {
main_thread_handler_->execute([extrinsics{std::move(extrinsics)},
wself{weak_from_this()},
retired_hashes{
std::move(retired_hashes)}]() mutable {
if (auto self = wself.lock()) {
auto eo = self->block_tree_data_.sharedAccess(
[&](const BlockTreeData &p) { return p.extrinsic_observer_; });
Expand Down
13 changes: 9 additions & 4 deletions core/blockchain/impl/block_tree_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <memory>
#include <optional>
#include <queue>
#include <thread>
#include <unordered_set>

#include <libp2p/common/final_action.hpp>
Expand All @@ -31,7 +32,11 @@
#include "subscription/extrinsic_event_key_repository.hpp"
#include "telemetry/service.hpp"
#include "utils/safe_object.hpp"
#include "utils/thread_pool.hpp"
#include "utils/weak_io_context.hpp"

namespace kagome {
class ThreadHandler;
}

namespace kagome::storage::trie_pruner {
class TriePruner;
Expand Down Expand Up @@ -60,7 +65,7 @@ namespace kagome::blockchain {
std::shared_ptr<const class JustificationStoragePolicy>
justification_storage_policy,
std::shared_ptr<storage::trie_pruner::TriePruner> state_pruner,
WeakIoContext main_thread);
WeakIoContext main_thread_context);

/// Recover block tree state at provided block
static outcome::result<void> recover(
Expand Down Expand Up @@ -192,7 +197,7 @@ namespace kagome::blockchain {
std::shared_ptr<const class JustificationStoragePolicy>
justification_storage_policy,
std::shared_ptr<storage::trie_pruner::TriePruner> state_pruner,
WeakIoContext main_thread);
WeakIoContext main_thread_context);

outcome::result<void> reorgAndPrune(BlockTreeData &p,
const ReorgAndPrune &changes);
Expand Down Expand Up @@ -273,7 +278,7 @@ namespace kagome::blockchain {
metrics::Gauge *metric_best_block_height_;
metrics::Gauge *metric_finalized_block_height_;
metrics::Gauge *metric_known_chain_leaves_;
ThreadHandler main_thread_;
std::shared_ptr<ThreadHandler> main_thread_handler_;
telemetry::Telemetry telemetry_ = telemetry::createTelemetryService();
};
} // namespace kagome::blockchain
22 changes: 22 additions & 0 deletions core/common/worker_thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "utils/thread_pool.hpp"
#include "utils/watchdog.hpp"

namespace kagome::common {
class WorkerThreadPool final : public ThreadPool {
public:
WorkerThreadPool(std::shared_ptr<Watchdog> watchdog)
: ThreadPool(
std::move(watchdog),
"worker",
std::max<size_t>(3, std::thread::hardware_concurrency()) - 1,
std::nullopt) {}
};
} // namespace kagome::common
Loading

0 comments on commit 6468930

Please sign in to comment.