Skip to content

Commit

Permalink
Clean up schedulers module
Browse files Browse the repository at this point in the history
  • Loading branch information
aurianer committed Mar 24, 2023
1 parent 8d39475 commit dfbad41
Show file tree
Hide file tree
Showing 21 changed files with 251 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ int main()
{
{
// Choose a scheduler.
using sched_type = pika::threads::local_priority_queue_scheduler<>;
using sched_type = pika::threads::detail::local_priority_queue_scheduler<>;

// Choose all the parameters for the thread pool and scheduler.
std::size_t const num_threads =
Expand Down
2 changes: 1 addition & 1 deletion libs/pika/init_runtime/src/init_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ namespace pika {
}
#endif
#ifdef PIKA_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
threads::set_minimal_deadlock_detection_enabled(
threads::detail::set_minimal_deadlock_detection_enabled(
cmdline.rtcfg_.enable_minimal_deadlock_detection());
#endif
#ifdef PIKA_HAVE_SPINLOCK_DEADLOCK_DETECTION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static int pool_threads = 0;
#define CUSTOM_POOL_NAME "Custom"

// this is our custom scheduler type
using high_priority_sched = pika::threads::shared_priority_queue_scheduler<>;
using high_priority_sched = pika::threads::detail::shared_priority_queue_scheduler<>;
using pika::threads::scheduler_mode;

// Force an instantiation of the pool type templated on our custom scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static int pool_threads = 1;
static std::string const pool_name = "mpi";

// this is our custom scheduler type
using high_priority_sched = pika::threads::shared_priority_queue_scheduler<>;
using high_priority_sched = pika::threads::detail::shared_priority_queue_scheduler<>;
using namespace pika::threads;
using pika::threads::scheduler_mode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static std::string const pool_name = "mpi";

// ------------------------------------------------------------------------
// this is our custom scheduler type
using numa_scheduler = pika::threads::shared_priority_queue_scheduler<>;
using numa_scheduler = pika::threads::detail::shared_priority_queue_scheduler<>;
using namespace pika::threads;

// ------------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions libs/pika/schedulers/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ schedulers
This module provides schedulers used by thread pools in the
:ref:`modules_thread_pools` module. There are currently three main schedulers:

* :cpp:class:`pika::threads::local_priority_queue_scheduler`
* :cpp:class:`pika::threads::static_priority_queue_scheduler`
* :cpp:class:`pika::threads::shared_priority_queue_scheduler`
* :cpp:class:`pika::threads::detail::local_priority_queue_scheduler`
* :cpp:class:`pika::threads::detail::static_priority_queue_scheduler`
* :cpp:class:`pika::threads::detail::shared_priority_queue_scheduler`

Other schedulers are specializations or variations of the above schedulers. See
the examples of the :ref:`modules_resource_partitioner` module for examples of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

#include <pika/config.hpp>

namespace pika::threads {
namespace pika::threads::detail {
#ifdef PIKA_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
PIKA_EXPORT void set_minimal_deadlock_detection_enabled(bool enabled);
PIKA_EXPORT bool get_minimal_deadlock_detection_enabled();
#endif
} // namespace pika::threads
} // namespace pika::threads::detail
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
// TODO: add branch prediction and function heat

///////////////////////////////////////////////////////////////////////////////
namespace pika::threads {
namespace pika::threads::detail {
///////////////////////////////////////////////////////////////////////////
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
using default_local_priority_queue_scheduler_terminated_queue = lockfree_lifo;
Expand All @@ -60,7 +60,7 @@ namespace pika::threads {
template <typename Mutex = std::mutex, typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename TerminatedQueuing = default_local_priority_queue_scheduler_terminated_queue>
class PIKA_EXPORT local_priority_queue_scheduler : public detail::scheduler_base
class PIKA_EXPORT local_priority_queue_scheduler : public scheduler_base
{
public:
using has_periodic_maintenance = std::false_type;
Expand All @@ -76,7 +76,7 @@ namespace pika::threads {
{
init_parameter(std::size_t num_queues, pika::detail::affinity_data const& affinity_data,
std::size_t num_high_priority_queues = std::size_t(-1),
detail::thread_queue_init_parameters thread_queue_init = {},
thread_queue_init_parameters thread_queue_init = {},
char const* description = "local_priority_queue_scheduler")
: num_queues_(num_queues)
, num_high_priority_queues_(num_high_priority_queues == std::size_t(-1) ?
Expand All @@ -100,15 +100,15 @@ namespace pika::threads {

std::size_t num_queues_;
std::size_t num_high_priority_queues_;
detail::thread_queue_init_parameters thread_queue_init_;
thread_queue_init_parameters thread_queue_init_;
pika::detail::affinity_data const& affinity_data_;
char const* description_;
};
using init_parameter_type = init_parameter;

local_priority_queue_scheduler(
init_parameter_type const& init, bool deferred_initialization = true)
: detail::scheduler_base(init.num_queues_, init.description_, init.thread_queue_init_)
: scheduler_base(init.num_queues_, init.description_, init.thread_queue_init_)
, curr_queue_(0)
, affinity_data_(init.affinity_data_)
, num_queues_(init.num_queues_)
Expand Down Expand Up @@ -1262,11 +1262,11 @@ namespace pika::threads {
std::vector<pika::concurrency::detail::cache_line_data<std::vector<std::size_t>>>
victim_threads_;
};
} // namespace pika::threads
} // namespace pika::threads::detail

template <typename Mutex, typename PendingQueuing, typename StagedQueuing,
typename TerminatedQueuing>
struct fmt::formatter<pika::threads::local_priority_queue_scheduler<Mutex, PendingQueuing,
struct fmt::formatter<pika::threads::detail::local_priority_queue_scheduler<Mutex, PendingQueuing,
StagedQueuing, TerminatedQueuing>> : fmt::formatter<pika::threads::detail::scheduler_base>
{
template <typename FormatContext>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
// TODO: add branch prediction and function heat

///////////////////////////////////////////////////////////////////////////////
namespace pika::threads {
namespace pika::threads::detail {
///////////////////////////////////////////////////////////////////////////
#if defined(PIKA_HAVE_CXX11_STD_ATOMIC_128BIT)
using default_local_queue_scheduler_terminated_queue = lockfree_lifo;
Expand All @@ -54,7 +54,7 @@ namespace pika::threads {
template <typename Mutex = std::mutex, typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename TerminatedQueuing = default_local_queue_scheduler_terminated_queue>
class PIKA_EXPORT local_queue_scheduler : public detail::scheduler_base
class PIKA_EXPORT local_queue_scheduler : public scheduler_base
{
public:
using has_periodic_maintenance = std::false_type;
Expand All @@ -65,7 +65,7 @@ namespace pika::threads {
struct init_parameter
{
init_parameter(std::size_t num_queues, pika::detail::affinity_data const& affinity_data,
detail::thread_queue_init_parameters thread_queue_init = {},
thread_queue_init_parameters thread_queue_init = {},
char const* description = "local_queue_scheduler")
: num_queues_(num_queues)
, thread_queue_init_(thread_queue_init)
Expand All @@ -84,14 +84,14 @@ namespace pika::threads {
}

std::size_t num_queues_;
detail::thread_queue_init_parameters thread_queue_init_;
thread_queue_init_parameters thread_queue_init_;
pika::detail::affinity_data const& affinity_data_;
char const* description_;
};
using init_parameter_type = init_parameter;

local_queue_scheduler(init_parameter_type const& init, bool deferred_initialization = true)
: detail::scheduler_base(init.num_queues_, init.description_, init.thread_queue_init_)
: scheduler_base(init.num_queues_, init.description_, init.thread_queue_init_)
, queues_(init.num_queues_)
, curr_queue_(0)
, affinity_data_(init.affinity_data_)
Expand Down Expand Up @@ -876,13 +876,12 @@ namespace pika::threads {
std::vector<::pika::threads::detail::mask_type> numa_domain_masks_;
std::vector<::pika::threads::detail::mask_type> outside_numa_domain_masks_;
};
} // namespace pika::threads
} // namespace pika::threads::detail

template <typename Mutex, typename PendingQueuing, typename StagedQueuing,
typename TerminatedQueuing>
struct fmt::formatter<
pika::threads::local_queue_scheduler<Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing>>
: fmt::formatter<pika::threads::detail::scheduler_base>
struct fmt::formatter<pika::threads::detail::local_queue_scheduler<Mutex, PendingQueuing,
StagedQueuing, TerminatedQueuing>> : fmt::formatter<pika::threads::detail::scheduler_base>
{
template <typename FormatContext>
auto format(pika::threads::detail::scheduler_base const& scheduler, FormatContext& ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <cstdint>
#include <utility>

namespace pika::threads {
namespace pika::threads::detail {

struct lockfree_fifo;

Expand Down Expand Up @@ -331,4 +331,4 @@ namespace pika::threads {

#endif // PIKA_HAVE_CXX11_STD_ATOMIC_128BIT

} // namespace pika::threads
} // namespace pika::threads::detail
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

#include <pika/config.hpp>

namespace pika::threads {
namespace pika::threads::detail {
#ifdef PIKA_HAVE_THREAD_QUEUE_WAITTIME
PIKA_EXPORT void set_maintain_queue_wait_times_enabled(bool enabled);
PIKA_EXPORT bool get_maintain_queue_wait_times_enabled();
#endif
} // namespace pika::threads
} // namespace pika::threads::detail
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@
# endif
#endif

namespace pika {
namespace pika::detail {
static pika::debug::detail::enable_print<QUEUE_HOLDER_NUMA_DEBUG> nq_deb("QH_NUMA");
}

// ------------------------------------------------------------////////
namespace pika::threads {
namespace pika::threads::detail {
// ----------------------------------------------------------------
// Helper class to hold a set of thread queue holders.
// ----------------------------------------------------------------
Expand Down Expand Up @@ -105,7 +105,7 @@ namespace pika::threads {
if (queues_[q]->get_next_thread_HP(thrd, (stealing || (i > 0)), i == 0))
{
// clang-format off
nq_deb.debug(debug::detail::str<>("HP/BP get_next")
pika::detail::nq_deb.debug(debug::detail::str<>("HP/BP get_next")
, "D", debug::detail::dec<2>(domain_)
, "Q", debug::detail::dec<3>(q)
, "Qidx", debug::detail::dec<3>(qidx)
Expand Down Expand Up @@ -135,7 +135,7 @@ namespace pika::threads {
// if we got a thread, return it, only allow stealing if i>0
if (queues_[q]->get_next_thread(thrd, (stealing || (i > 0))))
{
nq_deb.debug(debug::detail::str<>("get_next"), "D",
pika::detail::nq_deb.debug(debug::detail::str<>("get_next"), "D",
debug::detail::dec<2>(domain_), "Q", debug::detail::dec<3>(q), "Qidx",
debug::detail::dec<3>(qidx),
((i == 0 && !stealing) ? "taken" : "stolen from"),
Expand All @@ -162,7 +162,7 @@ namespace pika::threads {
if (added > 0)
{
// clang-format off
nq_deb.debug(debug::detail::str<>("HP/BP add_new")
pika::detail::nq_deb.debug(debug::detail::str<>("HP/BP add_new")
, "added", debug::detail::dec<>(added)
, "D", debug::detail::dec<2>(domain_)
, "Q", debug::detail::dec<3>(q)
Expand Down Expand Up @@ -191,7 +191,7 @@ namespace pika::threads {
if (added > 0)
{
// clang-format off
nq_deb.debug(debug::detail::str<>("add_new")
pika::detail::nq_deb.debug(debug::detail::str<>("add_new")
, "added", debug::detail::dec<>(added)
, "D", debug::detail::dec<2>(domain_)
, "Q", debug::detail::dec<3>(q)
Expand Down Expand Up @@ -294,4 +294,4 @@ namespace pika::threads {
void on_stop_thread(std::size_t /* num_thread */) {}
void on_error(std::size_t /* num_thread */, std::exception_ptr const& /* e */) {}
};
} // namespace pika::threads
} // namespace pika::threads::detail
Loading

0 comments on commit dfbad41

Please sign in to comment.