Skip to content

Commit

Permalink
Merge #609
Browse files Browse the repository at this point in the history
609: Improve CUDA polling r=aurianer a=biddisco

Redesign the way polling for cuda events is handled

Cuda events are polled (by any thread on the pool on which polling is enabled) and passed to a lockfree queue when ready. The polling loop first checks ready events and invokes callbacks, and only then takes the lock and checks outstanding events which are placed on the ready queue. 
This means that as soon as events are ready, any thread can invoke the callback - a single polling thread can find N events are ready and place them in the ready queue and N other threads can start handling the completions - instead of only allowing the polling thread to handle them.

The locking and completion handling has been reworked significantly and gives much better results.

Co-authored-by: John Biddiscombe <biddisco@cscs.ch>
  • Loading branch information
bors[bot] and biddisco authored Mar 8, 2023
2 parents 8fa25b7 + ede5ed4 commit af39de8
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include <string>

namespace pika::cuda::experimental {

PIKA_EXPORT const std::string& get_pool_name();
PIKA_EXPORT void set_pool_name(const std::string&);

// -----------------------------------------------------------------
// This RAII helper class enables polling for a scoped block
struct [[nodiscard]] enable_user_polling
Expand All @@ -28,10 +32,12 @@ namespace pika::cuda::experimental {
if (pool_name_.empty())
{
detail::register_polling(pika::resource::get_thread_pool(0));
set_pool_name(pika::resource::get_pool_name(0));
}
else
{
detail::register_polling(pika::resource::get_thread_pool(pool_name_));
set_pool_name(pool_name_);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <pika/debugging/print.hpp>

namespace pika::cuda::experimental::detail {
using print_on = debug::detail::enable_print<false>;
static constexpr print_on cud_debug("CUDA");
using namespace pika::debug::detail;
template <int Level>
static print_threshold<Level, 1> cud_debug("CUDA-EX");
} // namespace pika::cuda::experimental::detail
219 changes: 136 additions & 83 deletions libs/pika/async_cuda/src/cuda_event_callback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include <pika/async_cuda/detail/cuda_debug.hpp>
#include <pika/async_cuda/detail/cuda_event_callback.hpp>
#include <pika/concurrency/concurrentqueue.hpp>
#include <pika/datastructures/detail/small_vector.hpp>
#include <pika/resource_partitioner/detail/partitioner.hpp>
#include <pika/runtime/thread_pool_helpers.hpp>
#include <pika/synchronization/spinlock.hpp>
#include <pika/threading_base/scheduler_base.hpp>
#include <pika/threading_base/thread_pool_base.hpp>
Expand Down Expand Up @@ -44,10 +47,18 @@ namespace pika::cuda::experimental::detail {
event_callback_function_type f;
};

// a struct we use temporarily to hold callbacks we can invoke
struct ready_callback
{
whip::error_t status;
event_callback_function_type f;
};

class cuda_event_queue
{
public:
using event_callback_queue_type = concurrency::detail::ConcurrentQueue<event_callback>;
using event_ready_queue_type = concurrency::detail::ConcurrentQueue<ready_callback>;
using mutex_type = pika::spinlock;
using event_callback_vector_type = std::vector<event_callback>;

Expand All @@ -61,93 +72,113 @@ namespace pika::cuda::experimental::detail {
pika::threads::detail::polling_status poll()
{
using pika::threads::detail::polling_status;
using namespace pika::debug::detail;

// Don't poll if another thread is already polling
std::unique_lock<mutex_type> lk(vector_mtx, std::try_to_lock);
if (!lk.owns_lock())
// invoke ready callbacks without being under lock
detail::ready_callback ready_callback_;
while (ready_events.try_dequeue(ready_callback_))
{
if (cud_debug.is_enabled())
{
static auto poll_deb =
cud_debug.make_timer(1, debug::detail::str<>("Poll - lock failed"));
cud_debug.timed(poll_deb, "enqueued events",
debug::detail::dec<3>(get_number_of_enqueued_events()), "active events",
debug::detail::dec<3>(get_number_of_active_events()));
}
return polling_status::idle;
ready_callback_.f(ready_callback_.status);
}

if (cud_debug.is_enabled())
// locked section to access std::vector etc
{
static auto poll_deb =
cud_debug.make_timer(1, debug::detail::str<>("Poll - lock success"));
cud_debug.timed(poll_deb, "enqueued events",
debug::detail::dec<3>(get_number_of_enqueued_events()), "active events",
debug::detail::dec<3>(get_number_of_active_events()));
}

// Grab the handle to the event pool so we can return completed events
cuda_event_pool& pool = cuda_event_pool::get_event_pool();
// Don't poll if another thread is already polling
std::unique_lock<mutex_type> lk(vector_mtx, std::try_to_lock);
if (!lk.owns_lock())
{
if (cud_debug<5>.is_enabled())
{
static auto poll_deb =
cud_debug<5>.make_timer(1, str<>("Poll - lock failed"));
cud_debug<5>.timed(poll_deb, "enqueued events",
dec<3>(get_number_of_enqueued_events()), "active events",
dec<3>(get_number_of_active_events()));
}
return polling_status::idle;
}

// Iterate over our list of events and see if any have completed
event_callback_vector.erase(
std::remove_if(event_callback_vector.begin(), event_callback_vector.end(),
[&](event_callback& continuation) {
whip::error_t status = whip::success;
if (cud_debug<5>.is_enabled())
{
static auto poll_deb = cud_debug<5>.make_timer(1, str<>("Poll - lock success"));
cud_debug<5>.timed(poll_deb, "enqueued events",
dec<3>(get_number_of_enqueued_events()), "active events",
dec<3>(get_number_of_active_events()));
}

try
{
bool ready = whip::event_ready(continuation.event);
// Grab the handle to the event pool so we can return completed events
cuda_event_pool& pool = cuda_event_pool::get_event_pool();

// If the event is not yet ready, do nothing
if (!ready)
// Iterate over our list of events and see if any have completed
event_callback_vector.erase(
std::remove_if(event_callback_vector.begin(), event_callback_vector.end(),
[&](event_callback& continuation) {
whip::error_t status = whip::success;
try
{
return false;
bool ready = whip::event_ready(continuation.event);
// If the event is not yet ready, do nothing
if (!ready)
{
// do not be remove this item from the vector
return false;
}
}
}
catch (whip::exception const& e)
catch (whip::exception const& e)
{
status = e.get_error();
}

// Forward successes and other errors to the callback
PIKA_DP(cud_debug<5>,
debug(str<>("set ready vector"), "event",
hex<8>(continuation.event), "enqueued events",
dec<3>(get_number_of_enqueued_events()), "active events",
dec<3>(get_number_of_active_events())));
// save callback to ready queue
ready_events.enqueue({status, PIKA_MOVE(continuation.f)});
// release the event handle
pool.push(PIKA_MOVE(continuation.event));
// this item can be removed from the vector
return true;
}),
event_callback_vector.end());
active_events_counter = event_callback_vector.size();

// now move unprocessed events to the vector if not ready
detail::event_callback continuation;
while (event_callback_queue.try_dequeue(continuation))
{
whip::error_t status = whip::success;
try
{
if (!whip::event_ready(continuation.event))
{
status = e.get_error();
add_to_event_callback_vector(PIKA_MOVE(continuation));
continue;
}
}
catch (whip::exception const& e)
{
status = e.get_error();
}

// Forward successes and other errors to the callback
cud_debug.debug(debug::detail::str<>("set ready vector"), "event",
PIKA_DP(cud_debug<5>,
debug(debug::detail::str<>("set ready queue"), "event",
debug::detail::hex<8>(continuation.event), "enqueued events",
debug::detail::dec<3>(get_number_of_enqueued_events()), "active events",
debug::detail::dec<3>(get_number_of_active_events()));
continuation.f(status);
pool.push(PIKA_MOVE(continuation.event));
return true;
}),
event_callback_vector.end());
active_events_counter = event_callback_vector.size();

detail::event_callback continuation;
while (event_callback_queue.try_dequeue(continuation))
{
whip::error_t status = whip::success;

try
{
bool ready = whip::event_ready(continuation.event);

if (!ready)
{
add_to_event_callback_vector(PIKA_MOVE(continuation));
continue;
}
}
catch (whip::exception const& e)
{
status = e.get_error();
debug::detail::dec<3>(get_number_of_active_events())));
// save callback to ready queue
ready_events.enqueue({status, PIKA_MOVE(continuation.f)});
// release the event handle
pool.push(PIKA_MOVE(continuation.event));
}
} // end locked region

cud_debug.debug(debug::detail::str<>("set ready queue"), "event",
debug::detail::hex<8>(continuation.event), "enqueued events",
debug::detail::dec<3>(get_number_of_enqueued_events()), "active events",
debug::detail::dec<3>(get_number_of_active_events()));
continuation.f(status);
pool.push(PIKA_MOVE(continuation.event));
// invoke any new ready callbacks without being under lock
while (ready_events.try_dequeue(ready_callback_))
{
ready_callback_.f(ready_callback_.status);
}

using pika::threads::detail::polling_status;
Expand All @@ -162,20 +193,19 @@ namespace pika::cuda::experimental::detail {
PIKA_THROW_EXCEPTION(pika::error::invalid_status, "add_to_event_callback_queue",
"could not get an event");
}
PIKA_ASSERT(event != 0);
whip::event_record(event, stream);

event_callback continuation{event, PIKA_MOVE(f)};

PIKA_ASSERT_MSG(get_register_polling_count() != 0,
"CUDA event polling has not been enabled on any pool. Make sure that CUDA event "
"polling is enabled on at least one thread pool.");

event_callback_queue.enqueue(PIKA_MOVE(continuation));
PIKA_DP(cud_debug<5>,
debug(str<>("event queued"), "event", hex<8>(event), "enqueued events",
dec<3>(get_number_of_enqueued_events()), "active events",
dec<3>(get_number_of_active_events())));

cud_debug.debug(debug::detail::str<>("event queued"), "event",
debug::detail::hex<8>(continuation.event), "enqueued events",
debug::detail::dec<3>(get_number_of_enqueued_events()), "active events",
debug::detail::dec<3>(get_number_of_active_events()));
event_callback_queue.enqueue({event, PIKA_MOVE(f)});
}

std::size_t get_work_count() const noexcept
Expand All @@ -202,16 +232,18 @@ namespace pika::cuda::experimental::detail {
event_callback_vector.push_back(PIKA_MOVE(continuation));
++active_events_counter;

cud_debug.debug(debug::detail::str<>("event callback moved from queue to vector"),
"event", debug::detail::hex<8>(continuation.event), "enqueued events",
debug::detail::dec<3>(get_number_of_enqueued_events()), "active events",
debug::detail::dec<3>(get_number_of_active_events()));
PIKA_DP(cud_debug<5>,
debug(str<>("event callback moved from queue to vector"), "event",
hex<8>(continuation.event), "enqueued events",
dec<3>(get_number_of_enqueued_events()), "active events",
dec<3>(get_number_of_active_events())));
}

event_callback_queue_type event_callback_queue;
mutex_type vector_mtx;
std::atomic<std::size_t> active_events_counter{0};
event_callback_vector_type event_callback_vector;
event_ready_queue_type ready_events;
};

class cuda_event_queue_holder
Expand Down Expand Up @@ -284,7 +316,7 @@ namespace pika::cuda::experimental::detail {
#if defined(PIKA_DEBUG)
++get_register_polling_count();
#endif
cud_debug.debug(debug::detail::str<>("enable polling"));
PIKA_DP(cud_debug<2>, debug(str<>("enable polling"), pool.get_pool_name()));
auto* sched = pool.get_scheduler();
sched->set_cuda_polling_functions(&pika::cuda::experimental::detail::poll, &get_work_count);
}
Expand All @@ -299,8 +331,29 @@ namespace pika::cuda::experimental::detail {
"sure CUDA event polling is not disabled too early.");
}
#endif
cud_debug.debug(debug::detail::str<>("disable polling"));
PIKA_DP(cud_debug<2>, debug(str<>("disable polling"), pool.get_pool_name()));
auto* sched = pool.get_scheduler();
sched->clear_cuda_polling_function();
}

static std::string polling_pool_name = "default";

} // namespace pika::cuda::experimental::detail

namespace pika::cuda::experimental {
PIKA_EXPORT const std::string& get_pool_name()
{
if (pika::resource::pool_exists(detail::polling_pool_name))
{
return detail::polling_pool_name;
}
//
return resource::get_partitioner().get_default_pool_name();
}

PIKA_EXPORT void set_pool_name(const std::string& name)
{
PIKA_DP(detail::cud_debug<2>, debug(debug::detail::str<>("set pool name"), name));
detail::polling_pool_name = name;
}
} // namespace pika::cuda::experimental

0 comments on commit af39de8

Please sign in to comment.