Skip to content

Commit

Permalink
Fix setting of global thread number
Browse files Browse the repository at this point in the history
Refactor thread manager callbacks to use more verbose, but more explicit lambdas for the callbacks.
The on_start_thread callback was passing the local instead of the global thread number to init_tss.
  • Loading branch information
msimberg committed Feb 5, 2024
1 parent d23541a commit f94b8e1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 11 deletions.
26 changes: 16 additions & 10 deletions libs/pika/thread_manager/src/thread_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <pika/config.hpp>
#include <pika/assert.hpp>
#include <pika/execution_base/this_thread.hpp>
#include <pika/functional/bind.hpp>
#include <pika/modules/errors.hpp>
#include <pika/modules/logging.hpp>
#include <pika/modules/schedulers.hpp>
Expand Down Expand Up @@ -60,19 +59,26 @@ namespace pika::threads::detail {
: rtcfg_(rtcfg)
, notifier_(notifier)
{
using std::placeholders::_1;
using std::placeholders::_3;

// Add callbacks local to thread_manager.
notifier.add_on_start_thread_callback(
util::detail::bind(&thread_manager::init_tss, this, _1));
notifier.add_on_stop_thread_callback(util::detail::bind(&thread_manager::deinit_tss, this));

auto& rp = pika::resource::get_partitioner();
[this]([[maybe_unused]] std::size_t local_thread_num, std::size_t global_thread_num,
[[maybe_unused]] char const* pool_name,
[[maybe_unused]] char const* postfix) { this->init_tss(global_thread_num); });
notifier.add_on_stop_thread_callback(
[this]([[maybe_unused]] std::size_t local_thread_num,
[[maybe_unused]] std::size_t global_thread_num,
[[maybe_unused]] char const* pool_name,
[[maybe_unused]] char const* postfix) { this->deinit_tss(); });
notifier.add_on_start_thread_callback(
util::detail::bind(&resource::detail::partitioner::assign_pu, std::ref(rp), _3, _1));
[](std::size_t local_thread_num, [[maybe_unused]] std::size_t global_thread_num,
char const* pool_name, [[maybe_unused]] char const* postfix) {
pika::resource::get_partitioner().assign_pu(pool_name, local_thread_num);
});
notifier.add_on_stop_thread_callback(
util::detail::bind(&resource::detail::partitioner::unassign_pu, std::ref(rp), _3, _1));
[](std::size_t local_thread_num, [[maybe_unused]] std::size_t global_thread_num,
char const* pool_name, [[maybe_unused]] char const* postfix) {
pika::resource::get_partitioner().unassign_pu(pool_name, local_thread_num);
});
}

void thread_manager::create_pools()
Expand Down
24 changes: 23 additions & 1 deletion libs/pika/thread_manager/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Copyright (c) 2019 The STE||AR-Group
# Copyright (c) 2024 ETH Zurich
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests thread_num)

set(thread_num_PARAMETERS THREADS 4)

foreach(test ${tests})
set(sources ${test}.cpp)

source_group("Source Files" FILES ${sources})

pika_add_executable(
${test}_test INTERNAL_FLAGS
SOURCES ${sources} ${${test}_FLAGS}
EXCLUDE_FROM_ALL
FOLDER "Tests/Unit/Modules/ThreadManager/"
)

pika_add_unit_test(
"modules.thread_manager" ${test} ${${test}_PARAMETERS} VALGRIND
)

endforeach()
91 changes: 91 additions & 0 deletions libs/pika/thread_manager/tests/unit/thread_num.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2017 Mikael Simberg
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// Simple test verifying basic resource_partitioner functionality.

#include <pika/assert.hpp>
#include <pika/chrono.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <pika/testing.hpp>

#include <vector>

namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;

std::size_t const max_threads =
(std::min)(std::size_t(4), std::size_t(pika::threads::detail::hardware_concurrency()));

void test_scheduler(int argc, char* argv[], pika::resource::scheduling_policy scheduler)
{
using ::pika::threads::scheduler_mode;

pika::init_params init_args;

init_args.cfg = {"pika.os_threads=" + std::to_string(max_threads)};
init_args.rp_callback = [scheduler](auto& rp, pika::program_options::variables_map const&) {
std::size_t pools_added = 0;

rp.set_default_pool_name("0");
for (pika::resource::numa_domain const& d : rp.numa_domains())
{
for (pika::resource::core const& c : d.cores())
{
for (pika::resource::pu const& p : c.pus())
{
if (pools_added < max_threads)
{
std::string name = std::to_string(pools_added);
rp.create_thread_pool(name, scheduler,
scheduler_mode::default_mode | scheduler_mode::enable_elasticity);
rp.add_resource(p, name);
++pools_added;
}
}
}
}
};

pika::start(argc, argv, init_args);

for (std::size_t pool_num = 0; pool_num < max_threads; ++pool_num)
{
auto sched = ex::thread_pool_scheduler{&pika::resource::get_thread_pool(pool_num)};
tt::sync_wait(ex::schedule(sched) | ex::then([pool_num]() {
PIKA_TEST_EQ(pika::get_thread_pool_num(), pool_num);
PIKA_TEST_EQ(pika::get_worker_thread_num(), pool_num);
PIKA_TEST_EQ(pika::get_local_worker_thread_num(), static_cast<std::size_t>(0));
}));
}

pika::finalize();
PIKA_TEST_EQ(pika::stop(), 0);
}

int main(int argc, char* argv[])
{
PIKA_ASSERT(max_threads >= 2);

std::vector<pika::resource::scheduling_policy> schedulers = {
pika::resource::scheduling_policy::local,
pika::resource::scheduling_policy::local_priority_fifo,
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
pika::resource::scheduling_policy::local_priority_lifo,
#endif
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
pika::resource::scheduling_policy::abp_priority_fifo,
pika::resource::scheduling_policy::abp_priority_lifo,
#endif
pika::resource::scheduling_policy::static_,
pika::resource::scheduling_policy::static_priority,
pika::resource::scheduling_policy::shared_priority,
};

for (auto const scheduler : schedulers) { test_scheduler(argc, argv, scheduler); }

return 0;
}

0 comments on commit f94b8e1

Please sign in to comment.