Skip to content

Commit

Permalink
Merge pull request #1016 from msimberg/fix-global-thread-num-tss
Browse files Browse the repository at this point in the history
Fix setting of global thread number
  • Loading branch information
msimberg committed Feb 7, 2024
1 parent 8c72e52 commit e7f0839
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/linux_valgrind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ jobs:
tests.unit.modules.string_util \
tests.{performance,regressions,unit}.modules.synchronization \
tests.unit.modules.tag_invoke \
tests.unit.modules.thread_manager \
tests.unit.modules.threading \
tests.{regressions,unit}.modules.threading_base \
tests.unit.modules.topology \
Expand Down
26 changes: 16 additions & 10 deletions libs/pika/thread_manager/src/thread_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <pika/config.hpp>
#include <pika/assert.hpp>
#include <pika/execution_base/this_thread.hpp>
#include <pika/functional/bind.hpp>
#include <pika/modules/errors.hpp>
#include <pika/modules/logging.hpp>
#include <pika/modules/schedulers.hpp>
Expand Down Expand Up @@ -60,19 +59,26 @@ namespace pika::threads::detail {
: rtcfg_(rtcfg)
, notifier_(notifier)
{
using std::placeholders::_1;
using std::placeholders::_3;

// Add callbacks local to thread_manager.
notifier.add_on_start_thread_callback(
util::detail::bind(&thread_manager::init_tss, this, _1));
notifier.add_on_stop_thread_callback(util::detail::bind(&thread_manager::deinit_tss, this));

auto& rp = pika::resource::get_partitioner();
[this]([[maybe_unused]] std::size_t local_thread_num, std::size_t global_thread_num,
[[maybe_unused]] char const* pool_name,
[[maybe_unused]] char const* postfix) { this->init_tss(global_thread_num); });
notifier.add_on_stop_thread_callback(
[this]([[maybe_unused]] std::size_t local_thread_num,
[[maybe_unused]] std::size_t global_thread_num,
[[maybe_unused]] char const* pool_name,
[[maybe_unused]] char const* postfix) { this->deinit_tss(); });
notifier.add_on_start_thread_callback(
util::detail::bind(&resource::detail::partitioner::assign_pu, std::ref(rp), _3, _1));
[](std::size_t local_thread_num, [[maybe_unused]] std::size_t global_thread_num,
char const* pool_name, [[maybe_unused]] char const* postfix) {
pika::resource::get_partitioner().assign_pu(pool_name, local_thread_num);
});
notifier.add_on_stop_thread_callback(
util::detail::bind(&resource::detail::partitioner::unassign_pu, std::ref(rp), _3, _1));
[](std::size_t local_thread_num, [[maybe_unused]] std::size_t global_thread_num,
char const* pool_name, [[maybe_unused]] char const* postfix) {
pika::resource::get_partitioner().unassign_pu(pool_name, local_thread_num);
});
}

void thread_manager::create_pools()
Expand Down
24 changes: 23 additions & 1 deletion libs/pika/thread_manager/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Copyright (c) 2019 The STE||AR-Group
# Copyright (c) 2024 ETH Zurich
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests thread_num)

set(thread_num_PARAMETERS THREADS 4)

foreach(test ${tests})
set(sources ${test}.cpp)

source_group("Source Files" FILES ${sources})

pika_add_executable(
${test}_test INTERNAL_FLAGS
SOURCES ${sources} ${${test}_FLAGS}
EXCLUDE_FROM_ALL
FOLDER "Tests/Unit/Modules/ThreadManager/"
)

pika_add_unit_test(
"modules.thread_manager" ${test} ${${test}_PARAMETERS} VALGRIND
)

endforeach()
92 changes: 92 additions & 0 deletions libs/pika/thread_manager/tests/unit/thread_num.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2017 Mikael Simberg
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// Simple test verifying basic resource_partitioner functionality.

#include <pika/assert.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>
#include <pika/testing.hpp>

#include <cstddef>
#include <string>
#include <vector>

namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;

std::size_t const max_threads =
(std::min)(std::size_t(4), std::size_t(pika::threads::detail::hardware_concurrency()));

void test_scheduler(int argc, char* argv[], pika::resource::scheduling_policy scheduler)
{
using ::pika::threads::scheduler_mode;

pika::init_params init_args;

init_args.cfg = {"pika.os_threads=" + std::to_string(max_threads)};
init_args.rp_callback = [scheduler](auto& rp, pika::program_options::variables_map const&) {
std::size_t pools_added = 0;

rp.set_default_pool_name("0");
for (pika::resource::numa_domain const& d : rp.numa_domains())
{
for (pika::resource::core const& c : d.cores())
{
for (pika::resource::pu const& p : c.pus())
{
if (pools_added < max_threads)
{
std::string name = std::to_string(pools_added);
rp.create_thread_pool(name, scheduler,
scheduler_mode::default_mode | scheduler_mode::enable_elasticity);
rp.add_resource(p, name);
++pools_added;
}
}
}
}
};

pika::start(argc, argv, init_args);

for (std::size_t pool_num = 0; pool_num < max_threads; ++pool_num)
{
auto sched = ex::thread_pool_scheduler{&pika::resource::get_thread_pool(pool_num)};
tt::sync_wait(ex::schedule(sched) | ex::then([pool_num]() {
PIKA_TEST_EQ(pika::get_thread_pool_num(), pool_num);
PIKA_TEST_EQ(pika::get_worker_thread_num(), pool_num);
PIKA_TEST_EQ(pika::get_local_worker_thread_num(), static_cast<std::size_t>(0));
}));
}

pika::finalize();
PIKA_TEST_EQ(pika::stop(), 0);
}

int main(int argc, char* argv[])
{
PIKA_ASSERT(max_threads >= 2);

std::vector<pika::resource::scheduling_policy> schedulers = {
pika::resource::scheduling_policy::local,
pika::resource::scheduling_policy::local_priority_fifo,
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
pika::resource::scheduling_policy::local_priority_lifo,
#endif
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
pika::resource::scheduling_policy::abp_priority_fifo,
pika::resource::scheduling_policy::abp_priority_lifo,
#endif
pika::resource::scheduling_policy::static_,
pika::resource::scheduling_policy::static_priority,
pika::resource::scheduling_policy::shared_priority,
};

for (auto const scheduler : schedulers) { test_scheduler(argc, argv, scheduler); }

return 0;
}

0 comments on commit e7f0839

Please sign in to comment.