Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix setting of global thread number #1016

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/linux_valgrind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
tests.unit.modules.string_util \
tests.{performance,regressions,unit}.modules.synchronization \
tests.unit.modules.tag_invoke \
tests.unit.modules.thread_manager \
tests.unit.modules.threading \
tests.{regressions,unit}.modules.threading_base \
tests.unit.modules.topology \
Expand Down
26 changes: 16 additions & 10 deletions libs/pika/thread_manager/src/thread_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <pika/config.hpp>
#include <pika/assert.hpp>
#include <pika/execution_base/this_thread.hpp>
#include <pika/functional/bind.hpp>
#include <pika/modules/errors.hpp>
#include <pika/modules/logging.hpp>
#include <pika/modules/schedulers.hpp>
Expand Down Expand Up @@ -60,19 +59,26 @@ namespace pika::threads::detail {
: rtcfg_(rtcfg)
, notifier_(notifier)
{
using std::placeholders::_1;
using std::placeholders::_3;

// Add callbacks local to thread_manager.
notifier.add_on_start_thread_callback(
util::detail::bind(&thread_manager::init_tss, this, _1));
notifier.add_on_stop_thread_callback(util::detail::bind(&thread_manager::deinit_tss, this));

auto& rp = pika::resource::get_partitioner();
[this]([[maybe_unused]] std::size_t local_thread_num, std::size_t global_thread_num,
[[maybe_unused]] char const* pool_name,
[[maybe_unused]] char const* postfix) { this->init_tss(global_thread_num); });
notifier.add_on_stop_thread_callback(
[this]([[maybe_unused]] std::size_t local_thread_num,
[[maybe_unused]] std::size_t global_thread_num,
[[maybe_unused]] char const* pool_name,
[[maybe_unused]] char const* postfix) { this->deinit_tss(); });
notifier.add_on_start_thread_callback(
util::detail::bind(&resource::detail::partitioner::assign_pu, std::ref(rp), _3, _1));
[](std::size_t local_thread_num, [[maybe_unused]] std::size_t global_thread_num,
char const* pool_name, [[maybe_unused]] char const* postfix) {
pika::resource::get_partitioner().assign_pu(pool_name, local_thread_num);
});
notifier.add_on_stop_thread_callback(
util::detail::bind(&resource::detail::partitioner::unassign_pu, std::ref(rp), _3, _1));
[](std::size_t local_thread_num, [[maybe_unused]] std::size_t global_thread_num,
char const* pool_name, [[maybe_unused]] char const* postfix) {
pika::resource::get_partitioner().unassign_pu(pool_name, local_thread_num);
});
}

void thread_manager::create_pools()
Expand Down
24 changes: 23 additions & 1 deletion libs/pika/thread_manager/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Copyright (c) 2019 The STE||AR-Group
# Copyright (c) 2024 ETH Zurich
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests thread_num)

set(thread_num_PARAMETERS THREADS 4)

foreach(test ${tests})
set(sources ${test}.cpp)

source_group("Source Files" FILES ${sources})

pika_add_executable(
${test}_test INTERNAL_FLAGS
SOURCES ${sources} ${${test}_FLAGS}
EXCLUDE_FROM_ALL
FOLDER "Tests/Unit/Modules/ThreadManager/"
)

pika_add_unit_test(
"modules.thread_manager" ${test} ${${test}_PARAMETERS} VALGRIND
)

endforeach()
92 changes: 92 additions & 0 deletions libs/pika/thread_manager/tests/unit/thread_num.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2017 Mikael Simberg
msimberg marked this conversation as resolved.
Show resolved Hide resolved
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// Simple test verifying basic resource_partitioner functionality.

#include <pika/assert.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <pika/testing.hpp>

#include <cstddef>
#include <string>
#include <vector>

namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;

std::size_t const max_threads =
(std::min)(std::size_t(4), std::size_t(pika::threads::detail::hardware_concurrency()));

void test_scheduler(int argc, char* argv[], pika::resource::scheduling_policy scheduler)
{
using ::pika::threads::scheduler_mode;

pika::init_params init_args;

init_args.cfg = {"pika.os_threads=" + std::to_string(max_threads)};
init_args.rp_callback = [scheduler](auto& rp, pika::program_options::variables_map const&) {
std::size_t pools_added = 0;

rp.set_default_pool_name("0");
for (pika::resource::numa_domain const& d : rp.numa_domains())
{
for (pika::resource::core const& c : d.cores())
{
for (pika::resource::pu const& p : c.pus())
{
if (pools_added < max_threads)
{
std::string name = std::to_string(pools_added);
rp.create_thread_pool(name, scheduler,
scheduler_mode::default_mode | scheduler_mode::enable_elasticity);
rp.add_resource(p, name);
++pools_added;
}
}
}
}
};

pika::start(argc, argv, init_args);

for (std::size_t pool_num = 0; pool_num < max_threads; ++pool_num)
{
auto sched = ex::thread_pool_scheduler{&pika::resource::get_thread_pool(pool_num)};
tt::sync_wait(ex::schedule(sched) | ex::then([pool_num]() {
PIKA_TEST_EQ(pika::get_thread_pool_num(), pool_num);
PIKA_TEST_EQ(pika::get_worker_thread_num(), pool_num);
PIKA_TEST_EQ(pika::get_local_worker_thread_num(), static_cast<std::size_t>(0));
}));
}

pika::finalize();
PIKA_TEST_EQ(pika::stop(), 0);
}

int main(int argc, char* argv[])
{
PIKA_ASSERT(max_threads >= 2);

std::vector<pika::resource::scheduling_policy> schedulers = {
pika::resource::scheduling_policy::local,
pika::resource::scheduling_policy::local_priority_fifo,
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
pika::resource::scheduling_policy::local_priority_lifo,
#endif
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
pika::resource::scheduling_policy::abp_priority_fifo,
pika::resource::scheduling_policy::abp_priority_lifo,
#endif
pika::resource::scheduling_policy::static_,
pika::resource::scheduling_policy::static_priority,
pika::resource::scheduling_policy::shared_priority,
};

for (auto const scheduler : schedulers) { test_scheduler(argc, argv, scheduler); }

return 0;
}
Loading