Skip to content

Commit

Permalink
Try #738:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Aug 14, 2023
2 parents 62e8f38 + 4633bb2 commit adf39d7
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 52 deletions.
2 changes: 1 addition & 1 deletion libs/pika/affinity/src/affinity_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ namespace pika::detail {

init_cached_pu_nums(num_system_pus);

auto const& topo = threads::detail::create_topology();
auto const& topo = threads::detail::get_topology();

if (affinity_description == "none")
{
Expand Down
8 changes: 3 additions & 5 deletions libs/pika/affinity/src/parse_affinity_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ namespace pika::detail {
{
if (!use_process_mask) { return true; }

threads::detail::mask_type proc_mask = t.get_cpubind_mask();
threads::detail::mask_type proc_mask = t.get_cpubind_mask_main_thread();
threads::detail::mask_type pu_mask = t.init_thread_affinity_mask(num_core, num_pu);

return threads::detail::bit_and(proc_mask, pu_mask);
Expand All @@ -97,7 +97,7 @@ namespace pika::detail {
{
if (use_process_mask)
{
threads::detail::mask_type proc_mask = t.get_cpubind_mask();
threads::detail::mask_type proc_mask = t.get_cpubind_mask_main_thread();
std::size_t num_pus_proc_mask = threads::detail::count(proc_mask);

if (num_threads > num_pus_proc_mask)
Expand Down Expand Up @@ -481,9 +481,7 @@ namespace pika::detail {
parse_mappings(spec, mappings, ec);
if (ec) return;

// We need to instantiate a new topology object as the runtime has not
// been initialized yet
threads::detail::topology& t = threads::detail::create_topology();
threads::detail::topology& t = threads::detail::get_topology();

decode_distribution(mappings, t, affinities, used_cores, max_cores, num_threads, num_pus,
use_process_mask, ec);
Expand Down
8 changes: 4 additions & 4 deletions libs/pika/command_line_handling/src/command_line_handling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,21 @@ namespace pika::detail {
{
if (use_process_mask)
{
threads::detail::topology& top = threads::detail::create_topology();
return threads::detail::count(top.get_cpubind_mask());
threads::detail::topology& top = threads::detail::get_topology();
return threads::detail::count(top.get_cpubind_mask_main_thread());
}
else { return threads::detail::hardware_concurrency(); }
}

std::size_t get_number_of_default_cores(bool use_process_mask)
{
threads::detail::topology& top = threads::detail::create_topology();
threads::detail::topology& top = threads::detail::get_topology();

std::size_t num_cores = top.get_number_of_cores();

if (use_process_mask)
{
threads::detail::mask_type proc_mask = top.get_cpubind_mask();
threads::detail::mask_type proc_mask = top.get_cpubind_mask_main_thread();
std::size_t num_cores_proc_mask = 0;

for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
Expand Down
5 changes: 3 additions & 2 deletions libs/pika/resource_partitioner/src/detail_partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ namespace pika::resource::detail {
: rtcfg_()
, first_core_(std::size_t(-1))
, mode_(mode_default)
, topo_(threads::detail::create_topology())
, topo_(threads::detail::get_topology())
, default_scheduler_mode_(threads::scheduler_mode::default_mode)
{
// allow only one partitioner instance
Expand Down Expand Up @@ -358,7 +358,8 @@ namespace pika::resource::detail {

std::string process_mask_message = affinity_data_.using_process_mask() ?
fmt::format("pika is using a process mask: {}.",
pika::threads::detail::to_string(get_topology().get_cpubind_mask())) :
pika::threads::detail::to_string(
get_topology().get_cpubind_mask_main_thread())) :
"pika is not using a process mask.";
auto omp_proc_bind = std::getenv("OMP_PROC_BIND");
std::string omp_proc_bind_message = omp_proc_bind ?
Expand Down
2 changes: 1 addition & 1 deletion libs/pika/runtime/src/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ namespace pika {
namespace detail {
void handle_print_bind(std::size_t num_threads)
{
threads::detail::topology& top = threads::detail::create_topology();
threads::detail::topology& top = threads::detail::get_topology();
auto const& rp = pika::resource::get_partitioner();
auto const& tm = get_runtime().get_thread_manager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ namespace pika::threads::detail {
queues_[num_thread].data_->on_start_thread(num_thread);

std::size_t num_threads = num_queues_;
auto const& topo = ::pika::threads::detail::create_topology();
auto const& topo = ::pika::threads::detail::get_topology();

// get NUMA domain masks of all queues...
std::vector<::pika::threads::detail::mask_type> numa_masks(num_threads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ namespace pika::threads::detail {
, steals_in_numa_domain_()
, steals_outside_numa_domain_()
, numa_domain_masks_(init.num_queues_,
::pika::threads::detail::create_topology().get_machine_affinity_mask())
::pika::threads::detail::get_topology().get_machine_affinity_mask())
, outside_numa_domain_masks_(init.num_queues_,
::pika::threads::detail::create_topology().get_machine_affinity_mask())
::pika::threads::detail::get_topology().get_machine_affinity_mask())
{
::pika::threads::detail::resize(
steals_in_numa_domain_, threads::detail::hardware_concurrency());
Expand Down Expand Up @@ -781,7 +781,7 @@ namespace pika::threads::detail {

queues_[num_thread]->on_start_thread(num_thread);

auto const& topo = ::pika::threads::detail::create_topology();
auto const& topo = ::pika::threads::detail::get_topology();

// pre-calculate certain constants for the given thread number
std::size_t num_pu = affinity_data_.get_pu_num(num_thread);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ namespace pika::threads::detail {
using namespace pika::debug::detail;
PIKA_DETAIL_DP(spq_deb<5>, debug(str<>("start_thread"), "local_thread", local_thread));

auto const& topo = ::pika::threads::detail::create_topology();
auto const& topo = ::pika::threads::detail::get_topology();
// the main initialization can be done by any one thread
std::unique_lock<Mutex> lock(init_mutex);
if (!initialized_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ namespace pika::threads::detail {
std::make_shared<pika::concurrency::detail::barrier>(pool_threads + 1);
try
{
topology const& topo = create_topology();
topology const& topo = get_topology();

for (/**/; thread_num != pool_threads; ++thread_num)
{
Expand Down Expand Up @@ -392,7 +392,7 @@ namespace pika::threads::detail {
pika::threads::detail::scheduled_thread_pool<Scheduler>::thread_func(std::size_t thread_num,
std::size_t global_thread_num, std::shared_ptr<pika::concurrency::detail::barrier> startup)
{
topology const& topo = create_topology();
topology const& topo = get_topology();

// Set the affinity for the current thread.
threads::detail::mask_cref_type mask = affinity_data_.get_pu_mask(topo, global_thread_num);
Expand Down
4 changes: 2 additions & 2 deletions libs/pika/threading_base/src/thread_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace pika::threads::detail {
///////////////////////////////////////////////////////////////////////////
mask_type thread_pool_base::get_used_processing_units() const
{
auto const& topo = create_topology();
auto const& topo = get_topology();
auto const sched = get_scheduler();

mask_type used_processing_units = mask_type();
Expand All @@ -58,7 +58,7 @@ namespace pika::threads::detail {

hwloc_bitmap_ptr thread_pool_base::get_numa_domain_bitmap() const
{
auto const& topo = create_topology();
auto const& topo = get_topology();
mask_type used_processing_units = get_used_processing_units();
return topo.cpuset_to_nodeset(used_processing_units);
}
Expand Down
4 changes: 3 additions & 1 deletion libs/pika/topology/include/pika/topology/topology.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ namespace pika::threads::detail {
std::size_t get_pu_number(
std::size_t num_core, std::size_t num_pu, error_code& ec = throws) const;

mask_type get_cpubind_mask_main_thread(error_code& ec = throws) const;
mask_type get_cpubind_mask(error_code& ec = throws) const;
mask_type get_cpubind_mask(std::thread& handle, error_code& ec = throws) const;

Expand Down Expand Up @@ -361,12 +362,13 @@ namespace pika::threads::detail {
std::vector<mask_type> numa_node_affinity_masks_;
std::vector<mask_type> core_affinity_masks_;
std::vector<mask_type> thread_affinity_masks_;
mask_type main_thread_affinity_mask_;
};

#include <pika/config/warnings_suffix.hpp>

///////////////////////////////////////////////////////////////////////////
PIKA_EXPORT topology& create_topology();
PIKA_EXPORT topology& get_topology();

[[nodiscard]] PIKA_EXPORT unsigned int hardware_concurrency() noexcept;

Expand Down
60 changes: 31 additions & 29 deletions libs/pika/topology/src/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@
#endif

namespace pika::threads::detail {
std::size_t hwloc_hardware_concurrency()
{
threads::detail::topology& top = threads::detail::create_topology();
return top.get_number_of_pus();
}

void write_to_log(char const* valuename, std::size_t value)
{
LTM_(debug).format("topology: {}: {}", valuename, value); //-V128
Expand Down Expand Up @@ -183,6 +177,23 @@ namespace pika::threads::detail {
}

///////////////////////////////////////////////////////////////////////////
// We use a function-local static for the topology object so that we don't depend on
// initialization order between TUs happening in a particular order and we guarantee that the
// object has been created before access. However, we also want to initialize the topology
// object early so that we can read the CPU mask of the main thread in case OpenMP wants to
// reset it, so we also have a global object call get_topology so that we don't depend on others
// calling get_topology early for us.
topology& get_topology()
{
static topology topo;
return topo;
}

static struct init_topology_t
{
init_topology_t() { get_topology(); }
} init_topology{};

#if !defined(PIKA_HAVE_MAX_CPU_COUNT)
mask_type topology::empty_mask = mask_type(hardware_concurrency());
#else
Expand All @@ -193,6 +204,7 @@ namespace pika::threads::detail {
: topo(nullptr)
, use_pus_as_cores_(false)
, machine_affinity_mask_(0)
, main_thread_affinity_mask_(0)
{ // {{{
int err = hwloc_topology_init(&topo);
if (err != 0)
Expand Down Expand Up @@ -287,6 +299,10 @@ namespace pika::threads::detail {
{
thread_affinity_masks_.push_back(init_thread_affinity_mask(i));
}

// We assume here that the topology object is created in a global constructor on the main
// thread (get_cpubind_mask returns the mask of the current thread).
main_thread_affinity_mask_ = get_cpubind_mask();
} // }}}

void topology::write_to_log() const
Expand Down Expand Up @@ -1119,6 +1135,11 @@ namespace pika::threads::detail {
std::size_t topology::get_number_of_pus() const { return num_of_pus_; }

///////////////////////////////////////////////////////////////////////////
mask_type topology::get_cpubind_mask_main_thread(error_code&) const
{
return main_thread_affinity_mask_;
}

mask_type topology::get_cpubind_mask(error_code& ec) const
{
hwloc_cpuset_t cpuset = hwloc_bitmap_alloc();
Expand Down Expand Up @@ -1421,32 +1442,13 @@ namespace pika::threads::detail {
//print_vector(os, pu_numbers_);
}

///////////////////////////////////////////////////////////////////////////
topology& create_topology()
{
static topology topo;
return topo;
}

///////////////////////////////////////////////////////////////////////////
struct hw_concurrency
unsigned int hardware_concurrency() noexcept
{
hw_concurrency() noexcept
#if defined(__ANDROID__) && defined(ANDROID)
: num_of_cores_(::android_getCpuCount())
static auto concurrency = ::android_getCpuCount();
#else
: num_of_cores_(hwloc_hardware_concurrency())
static auto concurrency = get_topology().get_number_of_pus();
#endif
{
if (num_of_cores_ == 0) num_of_cores_ = 1;
}

std::size_t num_of_cores_;
};

unsigned int hardware_concurrency() noexcept
{
static detail::hw_concurrency hwc;
return static_cast<unsigned int>(hwc.num_of_cores_);
return static_cast<unsigned int>(concurrency);
}
} // namespace pika::threads::detail

0 comments on commit adf39d7

Please sign in to comment.