Skip to content

Commit

Permalink
Changing to explicit callback registrations for all events
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Nov 11, 2019
1 parent 6ef80db commit 19237bd
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 121 deletions.
191 changes: 117 additions & 74 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* twice. So, we have a macro to make sure the macro is defined. */
#ifdef APEX_HAVE_HPX_CONFIG
#include <hpx/config.hpp>
#include <hpx/util/external_timer.hpp>
#endif

#include "apex.hpp"
Expand Down Expand Up @@ -63,6 +64,7 @@
#include <hpx/include/actions.hpp>
#include <hpx/include/util.hpp>
#include <hpx/lcos/local/composable_guard.hpp>
#include "global_constructor_destructor.h"
#ifdef APEX_HAVE_HPX_disabled
static void apex_schedule_shutdown(void);
#endif // APEX_HAVE_HPX_disabled
Expand Down Expand Up @@ -457,7 +459,7 @@ string& version() {
inline std::shared_ptr<task_wrapper> _new_task(
task_identifier * id,
const uint64_t task_id,
const std::shared_ptr<task_wrapper> &parent_task, apex* instance) {
const std::shared_ptr<task_wrapper> parent_task, apex* instance) {
std::shared_ptr<task_wrapper> tt_ptr = make_shared<task_wrapper>();
tt_ptr->task_id = id;
// if not tracking dependencies, don't save the parent
Expand Down Expand Up @@ -584,7 +586,7 @@ profiler* start(const apex_function_address function_address) {
return thread_instance::instance().restore_children_profilers(tt_ptr);
}

void debug_print(const char * event, std::shared_ptr<task_wrapper> &tt_ptr) {
void debug_print(const char * event, std::shared_ptr<task_wrapper> tt_ptr) {
if (_program_over) return;
static std::mutex this_mutex;
std::unique_lock<std::mutex> l(this_mutex);
Expand All @@ -600,29 +602,33 @@ void debug_print(const char * event, std::shared_ptr<task_wrapper> &tt_ptr) {
}
}

profiler* start(std::shared_ptr<task_wrapper> &tt_ptr) {
void start(std::shared_ptr<task_wrapper> tt_ptr) {
#if defined(APEX_DEBUG)//_disabled)
debug_print("Start", tt_ptr);
#endif
if (tt_ptr == nullptr) {
APEX_UTIL_REF_COUNT_APEX_INTERNAL_START
return nullptr;
tt_ptr->prof = nullptr;
return;
}
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_DISABLED_START
return nullptr;
tt_ptr->prof = nullptr;
return;
}
apex* instance = apex::instance(); // get the Apex static instance
// protect against calls after finalization
if (!instance || _exited) {
APEX_UTIL_REF_COUNT_START_AFTER_FINALIZE
return nullptr;
tt_ptr->prof = nullptr;
return;
}
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_SUSPENDED_START
return profiler::get_disabled_profiler();
tt_ptr->prof = profiler::get_disabled_profiler();
return;
}
if (_notify_listeners) {
bool success = true;
Expand All @@ -636,12 +642,13 @@ profiler* start(std::shared_ptr<task_wrapper> &tt_ptr) {
//cout << thread_instance::get_id() << " *** Not success! " <<
//id->get_name() << endl; fflush(stdout);
APEX_UTIL_REF_COUNT_FAILED_START
return profiler::get_disabled_profiler();
tt_ptr->prof = profiler::get_disabled_profiler();
return;
}
}
}
APEX_UTIL_REF_COUNT_START
return thread_instance::instance().restore_children_profilers(tt_ptr);
return;
}

profiler* resume(const std::string &timer_name) {
Expand Down Expand Up @@ -863,7 +870,7 @@ void stop(profiler* the_profiler, bool cleanup) {
}
}

void stop(std::shared_ptr<task_wrapper> &tt_ptr) {
void stop(std::shared_ptr<task_wrapper> tt_ptr) {
#if defined(APEX_DEBUG)//_disabled)
debug_print("Stop", tt_ptr);
#endif
Expand Down Expand Up @@ -962,7 +969,7 @@ void yield(profiler* the_profiler)
}
}

void yield(std::shared_ptr<task_wrapper> &tt_ptr)
void yield(std::shared_ptr<task_wrapper> tt_ptr)
{
#if defined(APEX_DEBUG)//_disabled)
debug_print("Yield", tt_ptr);
Expand Down Expand Up @@ -1054,16 +1061,16 @@ void sample_value(const std::string &name, double value)
}

std::shared_ptr<task_wrapper> new_task(
const std::string &timer_name,
const std::string &name,
const uint64_t task_id,
const std::shared_ptr<task_wrapper> &parent_task)
const std::shared_ptr<task_wrapper> parent_task)
{
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) { return nullptr; }
static const std::string apex_internal("apex_internal");
if (starts_with(timer_name, apex_internal)) {
if (starts_with(name, apex_internal)) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
// don't process our own events - queue scrubbing tasks.
return nullptr;
Expand All @@ -1073,7 +1080,7 @@ std::shared_ptr<task_wrapper> new_task(
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr;
} // protect against calls after finalization
task_identifier * id = task_identifier::get_task_id(timer_name);
task_identifier * id = task_identifier::get_task_id(name);
std::shared_ptr<task_wrapper>
tt_ptr(_new_task(id, task_id, parent_task, instance));
APEX_UTIL_REF_COUNT_TASK_WRAPPER
Expand All @@ -1083,7 +1090,7 @@ std::shared_ptr<task_wrapper> new_task(
std::shared_ptr<task_wrapper> new_task(
const apex_function_address function_address,
const uint64_t task_id,
const std::shared_ptr<task_wrapper> &parent_task) {
const std::shared_ptr<task_wrapper> parent_task) {
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
// if APEX is suspended, do nothing.
Expand All @@ -1099,7 +1106,7 @@ std::shared_ptr<task_wrapper> new_task(
}

std::shared_ptr<task_wrapper> update_task(
std::shared_ptr<task_wrapper> &wrapper,
std::shared_ptr<task_wrapper> wrapper,
const std::string &timer_name) {
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
Expand All @@ -1119,7 +1126,7 @@ std::shared_ptr<task_wrapper> update_task(
}

std::shared_ptr<task_wrapper> update_task(
std::shared_ptr<task_wrapper> &wrapper,
std::shared_ptr<task_wrapper> wrapper,
const apex_function_address function_address) {
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { return nullptr; }
Expand Down Expand Up @@ -1967,63 +1974,99 @@ extern "C" {
} // extern "C"

#ifdef APEX_HAVE_HPX
namespace hpx { namespace util { namespace external_timer {
uint64_t apex_impl::init(const char * thread_name,
const uint64_t comm_rank, const uint64_t comm_size) {
return APEX_TOP_LEVEL_PACKAGE::init(thread_name, comm_rank, comm_size);
};
void apex_impl::finalize(void) {
APEX_TOP_LEVEL_PACKAGE::finalize();
};
void apex_impl::register_thread(const std::string &name) {
APEX_TOP_LEVEL_PACKAGE::register_thread(name);
};
std::shared_ptr<task_wrapper> apex_impl::new_task(
const std::string &name, const uint64_t task_id,
const std::shared_ptr<task_wrapper> &parent_task) {
return APEX_TOP_LEVEL_PACKAGE::new_task(name, task_id,
dynamic_cast<const std::shared_ptr<APEX_TOP_LEVEL_PACKAGE::task_wrapper>& >(parent_task));
};
std::shared_ptr<task_wrapper> apex_impl::new_task(
uintptr_t address, const uint64_t task_id,
const std::shared_ptr<task_wrapper> &parent_task) {
return APEX_TOP_LEVEL_PACKAGE::new_task(address, task_id, parent_task);
};
void apex_impl::sample_value(const std::string &name, double value) {
APEX_TOP_LEVEL_PACKAGE::sample_value(name, value);
};
void apex_impl::send (uint64_t tag, uint64_t size, uint64_t target) {
APEX_TOP_LEVEL_PACKAGE::send(tag, size, target);
};
void apex_impl::recv (uint64_t tag, uint64_t size,
uint64_t source_rank, uint64_t source_thread) {
APEX_TOP_LEVEL_PACKAGE::recv(tag, size, source_rank, source_thread);
};
std::shared_ptr<task_wrapper> apex_impl::update_task(
std::shared_ptr<task_wrapper> &wrapper, const std::string &name) {
return APEX_TOP_LEVEL_PACKAGE::update_task(wrapper, name);
};
std::shared_ptr<task_wrapper> apex_impl::update_task(
std::shared_ptr<task_wrapper> &wrapper, uintptr_t address) {
return APEX_TOP_LEVEL_PACKAGE::update_task(wrapper, address);
};
profiler * apex_impl::start(std::shared_ptr<task_wrapper> &task_wrapper_ptr) {
return APEX_TOP_LEVEL_PACKAGE::start(task_wrapper_ptr);
};
void apex_impl::stop(std::shared_ptr<task_wrapper> &task_wrapper_ptr) {
APEX_TOP_LEVEL_PACKAGE::stop(task_wrapper_ptr);
};
void apex_impl::yield(std::shared_ptr<task_wrapper> &task_wrapper_ptr) {
APEX_TOP_LEVEL_PACKAGE::yield(task_wrapper_ptr);
};
}}} // namespace hpx::util::external_timer

void apex_register_with_hpx(void) {
hpx::util::external_timer::apex_impl impl;
hpx_register_external_timer(std::move(impl));
}
DEFINE_CONSTRUCTOR(apex_register_with_hpx)
DEFINE_CONSTRUCTOR(apex_register_with_hpx);

std::shared_ptr<hpx::util::external_timer::task_wrapper> new_task_adapter(
const std::string &name,
const uint64_t task_id,
const std::shared_ptr<hpx::util::external_timer::task_wrapper> parent_task)
{
return APEX_TOP_LEVEL_PACKAGE::new_task(name, task_id,
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(parent_task));
}

std::shared_ptr<hpx::util::external_timer::task_wrapper> new_task_adapter(
const uintptr_t address,
const uint64_t task_id,
const std::shared_ptr<hpx::util::external_timer::task_wrapper> parent_task)
{
return APEX_TOP_LEVEL_PACKAGE::new_task(address, task_id,
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(parent_task));
}

std::shared_ptr<hpx::util::external_timer::task_wrapper> update_task_adapter(
std::shared_ptr<hpx::util::external_timer::task_wrapper> wrapper,
const std::string &timer_name) {
return APEX_TOP_LEVEL_PACKAGE::update_task(
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(wrapper),
timer_name);
}

std::shared_ptr<hpx::util::external_timer::task_wrapper> update_task_adapter(
std::shared_ptr<hpx::util::external_timer::task_wrapper> wrapper,
const uintptr_t address) {
return APEX_TOP_LEVEL_PACKAGE::update_task(
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(wrapper),
address);
}

void start_adapter(std::shared_ptr<hpx::util::external_timer::task_wrapper> tt_ptr) {
APEX_TOP_LEVEL_PACKAGE::start(
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(tt_ptr));
}

void stop_adapter(std::shared_ptr<hpx::util::external_timer::task_wrapper> tt_ptr) {
APEX_TOP_LEVEL_PACKAGE::stop(
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(tt_ptr));
}

void yield_adapter(std::shared_ptr<hpx::util::external_timer::task_wrapper> tt_ptr) {
APEX_TOP_LEVEL_PACKAGE::yield(
static_pointer_cast<APEX_TOP_LEVEL_PACKAGE::task_wrapper>(tt_ptr));
}

static void apex_register_with_hpx(void) {
hpx::util::external_timer::registration reg;
reg.type = hpx::util::external_timer::init_flag;
reg.record.init = &APEX_TOP_LEVEL_PACKAGE::init;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::finalize_flag;
reg.record.finalize = &APEX_TOP_LEVEL_PACKAGE::finalize;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::register_thread_flag;
reg.record.register_thread = &APEX_TOP_LEVEL_PACKAGE::register_thread;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::new_task_string_flag;
reg.record.new_task_string = &new_task_adapter;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::new_task_address_flag;
reg.record.new_task_address = &new_task_adapter;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::sample_value_flag;
reg.record.sample_value = &APEX_TOP_LEVEL_PACKAGE::sample_value;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::send_flag;
reg.record.send = &APEX_TOP_LEVEL_PACKAGE::send;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::recv_flag;
reg.record.recv = &APEX_TOP_LEVEL_PACKAGE::recv;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::update_task_string_flag;
reg.record.update_task_string = &update_task_adapter;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::update_task_address_flag;
reg.record.update_task_address = &update_task_adapter;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::start_flag;
reg.record.start = &start_adapter;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::stop_flag;
reg.record.stop = &stop_adapter;
hpx::util::external_timer::register_external_timer(reg);
reg.type = hpx::util::external_timer::yield_flag;
reg.record.yield = &yield_adapter;
hpx::util::external_timer::register_external_timer(reg);
}
#endif

#ifdef APEX_HAVE_HPX_disabled
Expand Down
Loading

0 comments on commit 19237bd

Please sign in to comment.