diff --git a/src/apex/apex.cpp b/src/apex/apex.cpp index 87704ba3..f6db380f 100644 --- a/src/apex/apex.cpp +++ b/src/apex/apex.cpp @@ -145,9 +145,9 @@ static void finalize_hpx_runtime(void) { } } // Tell other localities to shutdown APEX - apex_schedule_shutdown(); + //apex_schedule_shutdown(); // Shutdown APEX - finalize(); + //finalize(); hpx_finalized = true; FUNCTION_EXIT } @@ -417,18 +417,25 @@ string& version() { } /* Populate the new task_wrapper object, and notify listeners. */ -inline task_wrapper * _new_task(task_identifier * id, uint64_t task_id, - task_wrapper * parent_task, apex* instance) { - task_wrapper * tt_ptr = new task_wrapper(); +inline std::shared_ptr _new_task( + task_identifier * id, + const uint64_t task_id, + const std::shared_ptr &parent_task, apex* instance) { + std::shared_ptr tt_ptr = make_shared(); tt_ptr->task_id = id; // was a parent passed in? if (parent_task != nullptr) { tt_ptr->parent_guid = parent_task->guid; + tt_ptr->parent = parent_task; } else { // if not, is there a current timer? profiler * p = thread_instance::instance().get_current_profiler(); if (p != nullptr) { tt_ptr->parent_guid = p->guid; + tt_ptr->parent = p->tt_ptr; + } else { + tt_ptr->parent = task_wrapper::get_apex_main_wrapper(); + // tt_ptr->parent_guid is 0 by default } } if (task_id == UINTMAX_MAX) { @@ -438,12 +445,7 @@ inline task_wrapper * _new_task(task_identifier * id, uint64_t task_id, // use the runtime provided GUID tt_ptr->guid = task_id; } - if (_notify_listeners) { - //read_lock_type l(instance->listener_mutex); - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_new_task(tt_ptr, parent_task); - } - } + //instance->active_task_wrappers.insert(tt_ptr); return tt_ptr; } @@ -460,7 +462,7 @@ profiler* start(const std::string &timer_name) APEX_UTIL_REF_COUNT_APEX_INTERNAL_START return profiler::get_disabled_profiler(); // don't process our own events - queue scrubbing tasks. } -#ifdef APEX_HAVE_HPX +#ifdef APEX_HAVE_HPX_disabled // Finalize at the _start_ of HPX shutdown so that we can stop any // outstanding hpx::util::interval_timer instances. If any are left // running, HPX shutdown will never complete. @@ -488,11 +490,11 @@ profiler* start(const std::string &timer_name) APEX_UTIL_REF_COUNT_SUSPENDED_START return profiler::get_disabled_profiler(); } - task_wrapper * tt_ptr = nullptr; + std::shared_ptr tt_ptr(nullptr); if (_notify_listeners) { bool success = true; task_identifier * id = task_identifier::get_task_id(timer_name); - tt_ptr = _new_task(id, UINTMAX_MAX, nullptr, instance); + tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); APEX_UTIL_REF_COUNT_TASK_WRAPPER //read_lock_type l(instance->listener_mutex); //cout << thread_instance::get_id() << " Start : " << id->get_name() << endl; fflush(stdout); @@ -514,7 +516,7 @@ profiler* start(const std::string &timer_name) return thread_instance::instance().restore_children_profilers(tt_ptr); } -profiler* start(apex_function_address function_address) { +profiler* start(const apex_function_address function_address) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { APEX_UTIL_REF_COUNT_DISABLED_START @@ -531,11 +533,11 @@ profiler* start(apex_function_address function_address) { APEX_UTIL_REF_COUNT_SUSPENDED_START return profiler::get_disabled_profiler(); } - task_wrapper * tt_ptr = nullptr; + std::shared_ptr tt_ptr(nullptr); if (_notify_listeners) { bool success = true; task_identifier * id = task_identifier::get_task_id(function_address); - tt_ptr = _new_task(id, UINTMAX_MAX, nullptr, instance); + tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); APEX_UTIL_REF_COUNT_TASK_WRAPPER //cout << thread_instance::get_id() << " Start : " << id->get_name() << endl; fflush(stdout); //read_lock_type l(instance->listener_mutex); @@ -552,7 +554,7 @@ profiler* start(apex_function_address function_address) { return thread_instance::instance().restore_children_profilers(tt_ptr); } -void debug_print(const char * event, task_wrapper * tt_ptr) { +void debug_print(const char * event, std::shared_ptr &tt_ptr) { static std::mutex this_mutex; std::unique_lock l(this_mutex); if (tt_ptr == nullptr) { @@ -560,11 +562,11 @@ void debug_print(const char * event, task_wrapper * tt_ptr) { << endl; fflush(stdout); } else { cout << thread_instance::get_id() << " " << event << " : " << tt_ptr->guid << " : " << - tt_ptr->task_id->get_name() << endl; fflush(stdout); + tt_ptr->get_task_id()->get_name() << endl; fflush(stdout); } } -profiler* start(task_wrapper * tt_ptr) { +profiler* start(std::shared_ptr &tt_ptr) { #if defined(APEX_DEBUG_disabled) debug_print("Start", tt_ptr); #endif @@ -628,10 +630,10 @@ profiler* resume(const std::string &timer_name) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return nullptr; } - task_wrapper * tt_ptr = nullptr; + std::shared_ptr tt_ptr(nullptr); if (_notify_listeners) { task_identifier * id = task_identifier::get_task_id(timer_name); - tt_ptr = _new_task(id, UINTMAX_MAX, nullptr, instance); + tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); APEX_UTIL_REF_COUNT_TASK_WRAPPER try { //read_lock_type l(instance->listener_mutex); @@ -652,7 +654,7 @@ profiler* resume(const std::string &timer_name) { return thread_instance::instance().restore_children_profilers(tt_ptr); } -profiler* resume(apex_function_address function_address) { +profiler* resume(const apex_function_address function_address) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { APEX_UTIL_REF_COUNT_DISABLED_RESUME @@ -669,10 +671,10 @@ profiler* resume(apex_function_address function_address) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return nullptr; } - task_wrapper * tt_ptr = nullptr; + std::shared_ptr tt_ptr(nullptr); if (_notify_listeners) { task_identifier * id = task_identifier::get_task_id(function_address); - tt_ptr = _new_task(id, UINTMAX_MAX, nullptr, instance); + tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); APEX_UTIL_REF_COUNT_TASK_WRAPPER try { //read_lock_type l(instance->listener_mutex); @@ -712,14 +714,10 @@ profiler* resume(profiler * p) { p->restart(); if (_notify_listeners) { try { - task_wrapper * tt_ptr = new task_wrapper(); - tt_ptr->task_id = p->task_id; - tt_ptr->prof = p; - tt_ptr->guid = p->guid; // skip the profiler_listener - we are restoring a child timer // for a parent that was yielded. for (unsigned int i = 1 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_resume(tt_ptr); + instance->listeners[i]->on_resume(p->tt_ptr); } } catch (disabled_profiler_exception e) { APEX_UTIL_REF_COUNT_FAILED_RESUME @@ -727,7 +725,7 @@ profiler* resume(profiler * p) { } } static std::string apex_process_profile_str("apex::process_profiles"); - if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) { + if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) { APEX_UTIL_REF_COUNT_APEX_INTERNAL_RESUME } else { APEX_UTIL_REF_COUNT_RESUME @@ -774,7 +772,7 @@ void set_state(apex_thread_state state) { instance->set_state(thread_instance::get_id(), state); } -void stop(profiler* the_profiler) { +void stop(profiler* the_profiler, bool cleanup) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { APEX_UTIL_REF_COUNT_DISABLED_STOP @@ -792,7 +790,7 @@ void stop(profiler* the_profiler) { APEX_UTIL_REF_COUNT_DOUBLE_STOP return; } - thread_instance::instance().clear_current_profiler(the_profiler, false, nullptr); + thread_instance::instance().clear_current_profiler(the_profiler, false, null_task_wrapper); apex* instance = apex::instance(); // get the Apex static instance // protect against calls after finalization if (!instance || _exited || _measurement_stopped) { @@ -806,16 +804,26 @@ void stop(profiler* the_profiler) { instance->listeners[i]->on_stop(p); } } - //cout << thread_instance::get_id() << " Stop : " << the_profiler->task_id->get_name() << endl; fflush(stdout); + //cout << thread_instance::get_id() << " Stop : " << the_profiler->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout); static std::string apex_process_profile_str("apex::process_profiles"); - if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) { + if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) { APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP } else { APEX_UTIL_REF_COUNT_STOP } + if (cleanup) { + if (_notify_listeners) { + //read_lock_type l(instance->listener_mutex); + for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { + instance->listeners[i]->on_task_complete(p->tt_ptr); + } + } + //instance->active_task_wrappers.erase(p->tt_ptr); + p->tt_ptr = nullptr; + } } -void stop(task_wrapper * tt_ptr) { +void stop(std::shared_ptr &tt_ptr) { #if defined(APEX_DEBUG_disabled) debug_print("Stop", tt_ptr); #endif @@ -829,22 +837,19 @@ void stop(task_wrapper * tt_ptr) { APEX_UTIL_REF_COUNT_NULL_STOP return; } + apex* instance = apex::instance(); // get the Apex static instance if (tt_ptr->prof == profiler::get_disabled_profiler()) { APEX_UTIL_REF_COUNT_DISABLED_STOP - delete(tt_ptr); return; // profiler was throttled. } if (tt_ptr->prof->stopped) { APEX_UTIL_REF_COUNT_DOUBLE_STOP - delete(tt_ptr); return; } - thread_instance::instance().clear_current_profiler(tt_ptr->prof, false, nullptr); - apex* instance = apex::instance(); // get the Apex static instance + thread_instance::instance().clear_current_profiler(tt_ptr->prof, false, null_task_wrapper); // protect against calls after finalization if (!instance || _exited || _measurement_stopped) { APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE - delete(tt_ptr); return; } std::shared_ptr p{tt_ptr->prof}; @@ -854,14 +859,19 @@ void stop(task_wrapper * tt_ptr) { instance->listeners[i]->on_stop(p); } } - //cout << thread_instance::get_id() << " Stop : " << tt_ptr->task_id->get_name() << endl; fflush(stdout); + //cout << thread_instance::get_id() << " Stop : " << tt_ptr->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout); static std::string apex_process_profile_str("apex::process_profiles"); - if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) { + if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) { APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP } else { APEX_UTIL_REF_COUNT_STOP } - delete(tt_ptr); + if (_notify_listeners) { + //read_lock_type l(instance->listener_mutex); + for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { + instance->listeners[i]->on_task_complete(tt_ptr); + } + } } void yield(profiler* the_profiler) @@ -889,7 +899,7 @@ void yield(profiler* the_profiler) APEX_UTIL_REF_COUNT_DOUBLE_YIELD return; } - thread_instance::instance().clear_current_profiler(the_profiler, false, nullptr); + thread_instance::instance().clear_current_profiler(the_profiler, false, null_task_wrapper); std::shared_ptr p{the_profiler}; if (_notify_listeners) { //read_lock_type l(instance->listener_mutex); @@ -897,16 +907,16 @@ void yield(profiler* the_profiler) instance->listeners[i]->on_yield(p); } } - //cout << thread_instance::get_id() << " Yield : " << the_profiler->task_id->get_name() << endl; fflush(stdout); + //cout << thread_instance::get_id() << " Yield : " << the_profiler->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout); static std::string apex_process_profile_str("apex::process_profiles"); - if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) { + if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) { APEX_UTIL_REF_COUNT_APEX_INTERNAL_YIELD } else { APEX_UTIL_REF_COUNT_YIELD } } -void yield(task_wrapper * tt_ptr) +void yield(std::shared_ptr &tt_ptr) { #if defined(APEX_DEBUG_disabled) debug_print("Yield", tt_ptr); @@ -942,9 +952,9 @@ void yield(task_wrapper * tt_ptr) instance->listeners[i]->on_yield(p); } } - //cout << thread_instance::get_id() << " Yield : " << tt_ptr->prof->task_id->get_name() << endl; fflush(stdout); + //cout << thread_instance::get_id() << " Yield : " << tt_ptr->prof->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout); static std::string apex_process_profile_str("apex::process_profiles"); - if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) { + if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) { APEX_UTIL_REF_COUNT_APEX_INTERNAL_YIELD } else { APEX_UTIL_REF_COUNT_YIELD @@ -994,7 +1004,10 @@ void sample_value(const std::string &name, double value) } } -task_wrapper * new_task(const std::string &timer_name, uint64_t task_id, task_wrapper * parent_task) +std::shared_ptr new_task( + const std::string &timer_name, + const uint64_t task_id, + const std::shared_ptr &parent_task) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { return nullptr; } @@ -1011,12 +1024,15 @@ task_wrapper * new_task(const std::string &timer_name, uint64_t task_id, task_wr return nullptr; } // protect against calls after finalization task_identifier * id = task_identifier::get_task_id(timer_name); - task_wrapper * tt_ptr = _new_task(id, task_id, parent_task, instance); + std::shared_ptr tt_ptr(_new_task(id, task_id, parent_task, instance)); APEX_UTIL_REF_COUNT_TASK_WRAPPER return tt_ptr; } -task_wrapper * new_task(apex_function_address function_address, uint64_t task_id, task_wrapper * parent_task) { +std::shared_ptr new_task( + const apex_function_address function_address, + const uint64_t task_id, + const std::shared_ptr &parent_task) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { return nullptr; } // if APEX is suspended, do nothing. @@ -1024,32 +1040,28 @@ task_wrapper * new_task(apex_function_address function_address, uint64_t task_id apex* instance = apex::instance(); // get the Apex static instance if (!instance || _exited) { return nullptr; } // protect against calls after finalization task_identifier * id = task_identifier::get_task_id(function_address); - task_wrapper * tt_ptr = _new_task(id, task_id, parent_task, instance); + std::shared_ptr tt_ptr(_new_task(id, task_id, parent_task, instance)); return tt_ptr; } -task_wrapper * update_task(task_wrapper * wrapper, const std::string &timer_name) { +std::shared_ptr update_task( + std::shared_ptr &wrapper, + const std::string &timer_name) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { return nullptr; } // if APEX is suspended, do nothing. if (apex_options::suspend() == true) { return nullptr; } - if (wrapper == nullptr) { - apex* instance = apex::instance(); // get the Apex static instance - if (!instance || _exited) { return nullptr; } // protect against calls after finalization - task_identifier * id = task_identifier::get_task_id(timer_name); - wrapper = _new_task(id, UINTMAX_MAX, nullptr, instance); - } else { - wrapper->task_id = task_identifier::get_task_id(timer_name); - /* Oh, help us if a profile has started already. - * This could be a horrible idea. */ - if (wrapper->prof != nullptr) { - wrapper->prof->task_id = wrapper->task_id; - } + assert(wrapper); + task_identifier * id = task_identifier::get_task_id(timer_name); + if (id != wrapper->get_task_id()) { + wrapper->aliases.insert(id); } return wrapper; } -task_wrapper * update_task(task_wrapper * wrapper, apex_function_address function_address) { +std::shared_ptr update_task( + std::shared_ptr &wrapper, + const apex_function_address function_address) { // if APEX is disabled, do nothing. if (apex_options::disable() == true) { return nullptr; } // if APEX is suspended, do nothing. @@ -1058,7 +1070,7 @@ task_wrapper * update_task(task_wrapper * wrapper, apex_function_address functio apex* instance = apex::instance(); // get the Apex static instance if (!instance || _exited) { return nullptr; } // protect against calls after finalization task_identifier * id = task_identifier::get_task_id(function_address); - wrapper = _new_task(id, UINTMAX_MAX, nullptr, instance); + wrapper = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); } else { wrapper->task_id = task_identifier::get_task_id(function_address); } @@ -1286,6 +1298,11 @@ void cleanup(void) { /* this is one of the last things we should do - because the apex_options * sometimes control behavior at shutdown. */ apex_options::delete_instance(); + /* + for (auto t : instance->active_task_wrappers) { + instance->active_task_wrappers.erase(t); + } + */ delete(instance); FUNCTION_EXIT } diff --git a/src/apex/apex.hpp b/src/apex/apex.hpp index ec279715..4110d904 100644 --- a/src/apex/apex.hpp +++ b/src/apex/apex.hpp @@ -32,6 +32,7 @@ #include "apex_options.hpp" #include "apex_export.h" #include +#include #include "apex_cxx_shared_lock.hpp" #ifdef APEX_HAVE_RCR diff --git a/src/apex/apex_api.hpp b/src/apex/apex_api.hpp index 08af1566..527333f9 100644 --- a/src/apex/apex_api.hpp +++ b/src/apex/apex_api.hpp @@ -14,10 +14,6 @@ #include #endif -#include -#include -#include -#include #include "apex_types.h" #include "apex_options.hpp" #include "apex_export.h" @@ -26,6 +22,10 @@ #include "task_wrapper.hpp" #include #include +#include +#include +#include +#include #endif /* DOXYGEN_SHOULD_SKIP_THIS */ @@ -41,6 +41,9 @@ class apex_tuning_request; namespace apex { +// declare a default "null" pointer for std::shared_ptr& references +static std::shared_ptr null_task_wrapper(nullptr); + // These are all static functions for the class. There should be only // one APEX object in the process space. @@ -134,7 +137,7 @@ APEX_EXPORT profiler * start(const std::string &timer_name); call when the timer should be stopped. \sa @ref apex::stop, @ref apex::yield, @ref apex::resume */ -APEX_EXPORT profiler * start(apex_function_address function_address); +APEX_EXPORT profiler * start(const apex_function_address function_address); /** \brief Start a timer. @@ -153,7 +156,7 @@ APEX_EXPORT profiler * start(apex_function_address function_address); call when the timer should be stopped. \sa @ref apex::stop, @ref apex::yield, @ref apex::resume @ref apex::new_task */ -APEX_EXPORT profiler * start(task_wrapper * task_wrapper_ptr); +APEX_EXPORT profiler * start(std::shared_ptr &task_wrapper_ptr); /** \brief Stop a timer. @@ -166,7 +169,7 @@ APEX_EXPORT profiler * start(task_wrapper * task_wrapper_ptr); \return No return value. \sa @ref apex::start, @ref apex::yield, @ref apex::resume */ -APEX_EXPORT void stop(profiler * the_profiler); +APEX_EXPORT void stop(profiler * the_profiler, bool cleanup=true); /** \brief Stop a timer. @@ -179,7 +182,7 @@ APEX_EXPORT void stop(profiler * the_profiler); \return No return value. \sa @ref apex::start, @ref apex::yield, @ref apex::resume, @ref apex::new_task */ -APEX_EXPORT void stop(task_wrapper * task_wrapper_ptr); +APEX_EXPORT void stop(std::shared_ptr &task_wrapper_ptr); /** \brief Stop a timer, but don't increment the number of calls. @@ -209,7 +212,7 @@ APEX_EXPORT void yield(profiler * the_profiler); \return No return value. \sa @ref apex::start, @ref apex::stop, @ref apex::resume */ -APEX_EXPORT void yield(task_wrapper * task_wrapper_ptr); +APEX_EXPORT void yield(std::shared_ptr &task_wrapper_ptr); /** \brief Resume a timer. @@ -247,7 +250,7 @@ APEX_EXPORT profiler * resume(const std::string &timer_name); call when the timer should be stopped. \sa apex::stop, apex::yield, apex::start */ -APEX_EXPORT profiler * resume(apex_function_address function_address); +APEX_EXPORT profiler * resume(const apex_function_address function_address); /** \brief Resume a timer. @@ -269,7 +272,7 @@ APEX_EXPORT profiler * resume(apex_function_address function_address); call when the timer should be stopped. \sa apex::stop, apex::yield, apex::start */ -APEX_EXPORT profiler * resume(task_wrapper * task_wrapper_ptr); +APEX_EXPORT profiler * resume(std::shared_ptr &task_wrapper_ptr); /* * Functions for resetting timer values @@ -336,7 +339,10 @@ APEX_EXPORT void sample_value(const std::string &name, double value); \return pointer to an apex::task_wrapper object */ -APEX_EXPORT task_wrapper * new_task(const std::string &name, uint64_t task_id = UINTMAX_MAX, apex::task_wrapper * parent_task = nullptr); +APEX_EXPORT std::shared_ptr new_task( + const std::string &name, + const uint64_t task_id = UINTMAX_MAX, + const std::shared_ptr &parent_task = null_task_wrapper); /** \brief Create a new task (dependency). @@ -349,7 +355,10 @@ APEX_EXPORT task_wrapper * new_task(const std::string &name, uint64_t task_id = \return pointer to an apex::task_wrapper object */ -APEX_EXPORT task_wrapper * new_task(apex_function_address function_address, uint64_t task_id = UINTMAX_MAX, apex::task_wrapper * parent_task = nullptr); +APEX_EXPORT std::shared_ptr new_task( + const apex_function_address function_address, + const uint64_t task_id = UINTMAX_MAX, + const std::shared_ptr &parent_task = null_task_wrapper); /** \brief Update a task (dependency). @@ -360,7 +369,9 @@ APEX_EXPORT task_wrapper * new_task(apex_function_address function_address, uint \param name The new name of the timer. */ -APEX_EXPORT task_wrapper * update_task(task_wrapper * wrapper, const std::string &name); +APEX_EXPORT std::shared_ptr update_task( + std::shared_ptr &wrapper, + const std::string &name); /** \brief Update a task wrapper (dependency). @@ -371,7 +382,9 @@ APEX_EXPORT task_wrapper * update_task(task_wrapper * wrapper, const std::string \param function_address The new function address of the timer. */ -APEX_EXPORT task_wrapper * update_task(task_wrapper * wrapper, apex_function_address function_address); +APEX_EXPORT std::shared_ptr update_task( + std::shared_ptr &wrapper, + const apex_function_address function_address); /** \brief Register an event type with APEX. @@ -807,7 +820,7 @@ APEX_EXPORT void recv (uint64_t tag, uint64_t size, uint64_t source_rank, uint64 */ class scoped_timer { private: - apex::task_wrapper * twp; + std::shared_ptr twp; public: /** \brief Construct and start an APEX timer. @@ -833,7 +846,7 @@ class scoped_timer { \param func The address of a function used to identify the timer type \param thread_name The name of this new worker thread in the runtime */ - scoped_timer(uint64_t func, apex::task_wrapper * parent) + scoped_timer(uint64_t func, std::shared_ptr parent) : twp(nullptr) { twp = apex::new_task((apex_function_address)func, UINTMAX_MAX, parent); apex::start(twp); @@ -844,7 +857,7 @@ class scoped_timer { \param func The name of a function used to identify the timer type \param thread_name The name of this new worker thread in the runtime */ - scoped_timer(std::string func, apex::task_wrapper * parent) + scoped_timer(std::string func, std::shared_ptr parent) : twp(nullptr) { twp = apex::new_task(func, UINTMAX_MAX, parent); apex::start(twp); @@ -870,7 +883,7 @@ class scoped_timer { /* \brief Get the internal task wrapper object */ - apex::task_wrapper * get_task_wrapper(void) { + std::shared_ptr get_task_wrapper(void) { return twp; } }; diff --git a/src/apex/concurrency_handler.cpp b/src/apex/concurrency_handler.cpp index d39206be..8676c345 100644 --- a/src/apex/concurrency_handler.cpp +++ b/src/apex/concurrency_handler.cpp @@ -141,12 +141,12 @@ bool concurrency_handler::common_start(task_identifier *id) { } } -bool concurrency_handler::on_start(task_wrapper * tt_ptr) { - return common_start(tt_ptr->task_id); +bool concurrency_handler::on_start(std::shared_ptr &tt_ptr) { + return common_start(tt_ptr->get_task_id()); } -bool concurrency_handler::on_resume(task_wrapper * tt_ptr) { - return common_start(tt_ptr->task_id); +bool concurrency_handler::on_resume(std::shared_ptr &tt_ptr) { + return common_start(tt_ptr->get_task_id()); } void concurrency_handler::common_stop(std::shared_ptr &p) { diff --git a/src/apex/concurrency_handler.hpp b/src/apex/concurrency_handler.hpp index 52299661..6a814101 100644 --- a/src/apex/concurrency_handler.hpp +++ b/src/apex/concurrency_handler.hpp @@ -60,13 +60,12 @@ class concurrency_handler : public handler, public event_listener { void on_new_node(node_event_data &data) { APEX_UNUSED(data); }; void on_new_thread(new_thread_event_data &data); void on_exit_thread(event_data &data); - bool on_start(task_wrapper * tt_ptr); + bool on_start(std::shared_ptr &tt_ptr); void on_stop(std::shared_ptr &p); void on_yield(std::shared_ptr &p); - bool on_resume(task_wrapper * tt_ptr); - void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) { + bool on_resume(std::shared_ptr &tt_ptr); + void on_task_complete(std::shared_ptr &tt_ptr) { APEX_UNUSED(tt_ptr); - APEX_UNUSED(parent_ptr); }; void on_sample_value(sample_value_event_data &data) { APEX_UNUSED(data); }; void on_periodic(periodic_event_data &data) { APEX_UNUSED(data); }; diff --git a/src/apex/event_listener.cpp b/src/apex/event_listener.cpp index 15d98141..a6fba959 100644 --- a/src/apex/event_listener.cpp +++ b/src/apex/event_listener.cpp @@ -21,7 +21,7 @@ timer_event_data::timer_event_data(task_identifier * id) : task_id(id) { /* this object never actually gets instantiated. too much overhead. */ timer_event_data::timer_event_data(std::shared_ptr &the_profiler) : my_profiler(the_profiler) { - this->task_id = the_profiler->task_id; + this->task_id = the_profiler->tt_ptr->get_task_id(); } timer_event_data::~timer_event_data() { diff --git a/src/apex/event_listener.hpp b/src/apex/event_listener.hpp index c46e345d..0d16a346 100644 --- a/src/apex/event_listener.hpp +++ b/src/apex/event_listener.hpp @@ -117,11 +117,11 @@ class event_listener virtual void on_new_node(node_event_data &data) = 0; virtual void on_new_thread(new_thread_event_data &data) = 0; virtual void on_exit_thread(event_data &data) = 0; - virtual bool on_start(task_wrapper * tt_ptr) = 0; + virtual bool on_start(std::shared_ptr &tt_ptr) = 0; virtual void on_stop(std::shared_ptr &p) = 0; virtual void on_yield(std::shared_ptr &p) = 0; - virtual bool on_resume(task_wrapper * tt_ptr) = 0; - virtual void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) = 0; + virtual bool on_resume(std::shared_ptr &tt_ptr) = 0; + virtual void on_task_complete(std::shared_ptr &tt_ptr) = 0; virtual void on_sample_value(sample_value_event_data &data) = 0; virtual void on_periodic(periodic_event_data &data) = 0; virtual void on_custom_event(custom_event_data &data) = 0; diff --git a/src/apex/otf2_listener.cpp b/src/apex/otf2_listener.cpp index e450b81d..c67b5773 100644 --- a/src/apex/otf2_listener.cpp +++ b/src/apex/otf2_listener.cpp @@ -812,8 +812,8 @@ namespace apex { return; } - bool otf2_listener::on_start(task_wrapper * tt_ptr) { - task_identifier * id = tt_ptr->task_id; + bool otf2_listener::on_start(std::shared_ptr &tt_ptr) { + task_identifier * id = tt_ptr->get_task_id(); // don't close the archive on us! read_lock_type lock(_archive_mutex); // not likely, but just in case... @@ -849,7 +849,7 @@ namespace apex { return false; } - bool otf2_listener::on_resume(task_wrapper * tt_ptr) { + bool otf2_listener::on_resume(std::shared_ptr &tt_ptr) { return on_start(tt_ptr); } @@ -865,7 +865,7 @@ namespace apex { // create an attribute OTF2_AttributeList_AddUint64( al, 0, p->guid ); if (thread_instance::get_id() == 0) { - uint64_t idx = get_region_index(p->task_id); + uint64_t idx = get_region_index(p->get_task_id()); // Because the event writer for thread 0 is also // used for communication events and sampled values, // we have to get a lock for it. @@ -879,7 +879,7 @@ namespace apex { } else { uint64_t stamp = get_time(); OTF2_EC(OTF2_EvtWriter_Leave( local_evt_writer, al, stamp, - get_region_index(p->task_id) /* region */ )); + get_region_index(p->get_task_id()) /* region */ )); } // delete the attribute list OTF2_AttributeList_Delete(al); diff --git a/src/apex/otf2_listener.hpp b/src/apex/otf2_listener.hpp index 0f12e33a..2d802732 100644 --- a/src/apex/otf2_listener.hpp +++ b/src/apex/otf2_listener.hpp @@ -176,14 +176,13 @@ namespace apex { void on_new_node(node_event_data &data); void on_new_thread(new_thread_event_data &data); void on_exit_thread(event_data &data); - bool on_start(task_wrapper * tt_ptr); + bool on_start(std::shared_ptr &tt_ptr); void on_stop(std::shared_ptr &p); void on_yield(std::shared_ptr &p); - bool on_resume(task_wrapper * tt_ptr); + bool on_resume(std::shared_ptr &tt_ptr); void on_sample_value(sample_value_event_data &data); - void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) { + void on_task_complete(std::shared_ptr &tt_ptr) { APEX_UNUSED(tt_ptr); - APEX_UNUSED(parent_ptr); }; void on_periodic(periodic_event_data &data) { APEX_UNUSED(data); }; diff --git a/src/apex/policy_handler.cpp b/src/apex/policy_handler.cpp index a10763fc..7c607357 100644 --- a/src/apex/policy_handler.cpp +++ b/src/apex/policy_handler.cpp @@ -442,22 +442,22 @@ namespace apex { call_policies(exit_thread_policies, (void *)&data, APEX_EXIT_THREAD); } - bool policy_handler::on_start(task_wrapper * tt_ptr) { - call_policies(start_event_policies, (void *)tt_ptr->task_id, APEX_START_EVENT); + bool policy_handler::on_start(std::shared_ptr &tt_ptr) { + call_policies(start_event_policies, (void *)tt_ptr->get_task_id(), APEX_START_EVENT); return true; } - bool policy_handler::on_resume(task_wrapper * tt_ptr) { - call_policies(resume_event_policies, (void *)tt_ptr->task_id, APEX_RESUME_EVENT); + bool policy_handler::on_resume(std::shared_ptr &tt_ptr) { + call_policies(resume_event_policies, (void *)tt_ptr->get_task_id(), APEX_RESUME_EVENT); return true; } void policy_handler::on_stop(std::shared_ptr &p) { - call_policies(stop_event_policies, (void *)p->task_id, APEX_STOP_EVENT); + call_policies(stop_event_policies, (void *)p->tt_ptr->get_task_id(), APEX_STOP_EVENT); } void policy_handler::on_yield(std::shared_ptr &p) { - call_policies(yield_event_policies, (void *)p->task_id, APEX_YIELD_EVENT); + call_policies(yield_event_policies, (void *)p->tt_ptr->get_task_id(), APEX_YIELD_EVENT); } void policy_handler::on_sample_value(sample_value_event_data &data) { diff --git a/src/apex/policy_handler.hpp b/src/apex/policy_handler.hpp index 8e8987be..b87e2479 100644 --- a/src/apex/policy_handler.hpp +++ b/src/apex/policy_handler.hpp @@ -96,13 +96,12 @@ class policy_handler : public handler, public event_listener void on_new_node(node_event_data &data); void on_new_thread(new_thread_event_data &data); void on_exit_thread(event_data &data); - bool on_start(task_wrapper * tt_ptr); + bool on_start(std::shared_ptr &tt_ptr); void on_stop(std::shared_ptr &p); void on_yield(std::shared_ptr &p); - bool on_resume(task_wrapper * tt_ptr); - void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) { + bool on_resume(std::shared_ptr &tt_ptr); + void on_task_complete(std::shared_ptr &tt_ptr) { APEX_UNUSED(tt_ptr); - APEX_UNUSED(parent_ptr); }; void on_sample_value(sample_value_event_data &data); void on_custom_event(custom_event_data &data); diff --git a/src/apex/profiler.hpp b/src/apex/profiler.hpp index 5d07a4c3..62a06b96 100644 --- a/src/apex/profiler.hpp +++ b/src/apex/profiler.hpp @@ -5,13 +5,18 @@ #pragma once +// forward declaration +namespace apex { +class profiler; +}; + #include #include #include #include "apex_options.hpp" #include "apex_types.h" #include -#include "task_identifier.hpp" +#include "task_wrapper.hpp" #if defined(APEX_HAVE_HPX) #include #endif @@ -64,7 +69,10 @@ typedef rdtsc_clock<1> OneHzClock; #endif class profiler { +private: + task_identifier * task_id; // for counters, timers public: + std::shared_ptr tt_ptr; // for timers MYCLOCK::time_point start; MYCLOCK::time_point end; #if APEX_HAVE_PAPI @@ -76,15 +84,37 @@ class profiler { //apex_function_address action_address; //std::string * timer_name; //bool have_name; - task_identifier * task_id; uint64_t guid; bool is_counter; bool is_resume; // for yield or resume reset_type is_reset; bool stopped; + task_identifier * get_task_id(void) { + return task_id; + } + // this constructor is for regular timers + profiler(std::shared_ptr &task, + bool resume = false, + reset_type reset = reset_type::NONE) : + task_id(task->get_task_id()), + tt_ptr(task), + start(MYCLOCK::now()), +#if APEX_HAVE_PAPI + papi_start_values{0,0,0,0,0,0,0,0}, + papi_stop_values{0,0,0,0,0,0,0,0}, +#endif + value(0.0), + children_value(0.0), + guid(0), + is_counter(false), + is_resume(resume), + is_reset(reset), stopped(false) { }; + // this constructor is for resetting profile values profiler(task_identifier * id, bool resume = false, reset_type reset = reset_type::NONE) : + task_id(id), + tt_ptr(nullptr), start(MYCLOCK::now()), #if APEX_HAVE_PAPI papi_start_values{0,0,0,0,0,0,0,0}, @@ -92,12 +122,14 @@ class profiler { #endif value(0.0), children_value(0.0), - task_id(id), guid(0), is_counter(false), is_resume(resume), is_reset(reset), stopped(false) { }; + // this constructor is for counters profiler(task_identifier * id, double value_) : + task_id(id), + tt_ptr(nullptr), start(MYCLOCK::now()), #if APEX_HAVE_PAPI papi_start_values{0,0,0,0,0,0,0,0}, @@ -105,25 +137,28 @@ class profiler { #endif value(value_), children_value(0.0), - task_id(id), is_counter(true), is_resume(false), is_reset(reset_type::NONE), stopped(true) { }; //copy constructor - profiler(const profiler& in) : start(in.start), end(in.end) { + profiler(const profiler& in) : + task_id(in.task_id), + tt_ptr(in.tt_ptr), + start(in.start), + end(in.end), + value(in.value), + children_value(in.children_value), + is_counter(in.is_counter), + is_resume(in.is_resume), // for yield or resume + is_reset(in.is_reset), + stopped(in.stopped) + { #if APEX_HAVE_PAPI for (int i = 0 ; i < 8 ; i++) { papi_start_values[i] = in.papi_start_values[i]; papi_stop_values[i] = in.papi_stop_values[i]; } #endif - value = in.value; - children_value = in.children_value; - task_id = in.task_id; - is_counter = in.is_counter; - is_resume = in.is_resume; // for yield or resume - is_reset = in.is_reset; - stopped = in.stopped; } ~profiler(void) { /* not much to do here. */ }; // for "yield" support diff --git a/src/apex/profiler_listener.cpp b/src/apex/profiler_listener.cpp index e3450cc3..e14d5d96 100644 --- a/src/apex/profiler_listener.cpp +++ b/src/apex/profiler_listener.cpp @@ -10,13 +10,13 @@ #include "profiler_listener.hpp" #include "profiler.hpp" +#include "task_wrapper.hpp" #include "thread_instance.hpp" #include #include #include #include #include "apex_options.hpp" -#include "profiler.hpp" #include "profile.hpp" #include "apex.hpp" @@ -242,7 +242,7 @@ std::unordered_set free_profiles; } #ifdef APEX_WITH_JUPYTER_SUPPORT // restart the main timer - main_timer = std::make_shared(task_identifier::get_task_id(string(APEX_MAIN))); + main_timer = std::make_shared(task_wrapper::get_apex_main_wrapper()); #endif } @@ -272,7 +272,7 @@ std::unordered_set free_profiles; } #endif std::unique_lock task_map_lock(_task_map_mutex); - unordered_map::const_iterator it = task_map.find(*(p->task_id)); + unordered_map::const_iterator it = task_map.find(*(p->get_task_id())); if (it != task_map.end()) { // A profile for this ID already exists. theprofile = (*it).second; @@ -291,22 +291,22 @@ std::unordered_set free_profiles; unordered_set::const_iterator it2; { read_lock_type l(throttled_event_set_mutex); - it2 = throttled_tasks.find(*(p->task_id)); + it2 = throttled_tasks.find(*(p->get_task_id())); } if (it2 == throttled_tasks.end()) { // lock the set for insert { write_lock_type l(throttled_event_set_mutex); // was it inserted when we were waiting? - it2 = throttled_tasks.find(*(p->task_id)); + it2 = throttled_tasks.find(*(p->get_task_id())); // no? OK - insert it. if (it2 == throttled_tasks.end()) { - throttled_tasks.insert(*(p->task_id)); + throttled_tasks.insert(*(p->get_task_id())); } } if (apex_options::use_screen_output()) { cout << "APEX: disabling lightweight timer " - << p->task_id->get_name() + << p->get_task_id()->get_name() << endl; fflush(stdout); } @@ -317,13 +317,13 @@ std::unordered_set free_profiles; } else { // Create a new profile for this name. theprofile = new profile(p->is_reset == reset_type::CURRENT ? 0.0 : p->elapsed(), tmp_num_counters, values, p->is_resume, p->is_counter ? APEX_COUNTER : APEX_TIMER); - task_map[*(p->task_id)] = theprofile; + task_map[*(p->get_task_id())] = theprofile; task_map_lock.unlock(); #ifdef APEX_HAVE_HPX #ifdef APEX_REGISTER_HPX3_COUNTERS if(!_done) { - if(get_hpx_runtime_ptr() != nullptr && p->task_id->has_name()) { - std::string timer_name(p->task_id->get_name()); + if(get_hpx_runtime_ptr() != nullptr && p->get_task_id()->has_name()) { + std::string timer_name(p->get_task_id()->get_name()); //Don't register timers containing "/" if(timer_name.find("/") == std::string::npos) { hpx::performance_counters::install_counter_type( @@ -349,13 +349,13 @@ std::unordered_set free_profiles; if (!p->is_counter) { static int thresh = RAND_MAX/100; if (std::rand() < thresh) { - /* before calling p->task_id->get_name(), make sure we create + /* before calling p->get_task_id()->get_name(), make sure we create * a thread_instance object that is NOT a worker. */ thread_instance::instance(false); std::unique_lock task_map_lock(_mtx); task_scatterplot_samples << p->normalized_timestamp() << " " << p->elapsed()*profiler::get_cpu_mhz()*1000000 << " " - << "'" << p->task_id->get_name() << "'" << endl; + << "'" << p->get_task_id()->get_name() << "'" << endl; int loc0 = task_scatterplot_samples.tellp(); if (loc0 > 32768) { // lock access to the file @@ -757,6 +757,13 @@ node_color * get_node_color(double v,double vmin,double vmax) } task_dependencies.clear(); + // our TOTAL available time is the elapsed * the number of threads, or cores + int num_worker_threads = thread_instance::get_num_threads(); +#ifdef APEX_HAVE_HPX + num_worker_threads = num_worker_threads - num_non_worker_threads_registered; +#endif + //double total_main = main_timer->elapsed() * fmin(hardware_concurrency(), num_worker_threads); + // output nodes with "main" [shape=box; style=filled; fillcolor="#ff0000" ]; unordered_map::const_iterator it; std::unique_lock task_map_lock(_task_map_mutex); @@ -915,9 +922,7 @@ node_color * get_node_color(double v,double vmin,double vmax) profiler_listener * pl = inst->the_profiler_listener; if (pl != nullptr) { #ifdef APEX_TRACE_APEX - profiler * p = start("apex::process_profiles"); - pl->process_profiles(); - stop(p); + scoped_timer p("apex::process_profiles"); #else pl->process_profiles(); #endif @@ -1142,7 +1147,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl #endif // time the whole application. - main_timer = std::make_shared(task_identifier::get_task_id(string(APEX_MAIN))); + main_timer = std::make_shared(task_wrapper::get_apex_main_wrapper()); #if APEX_HAVE_PAPI if (num_papi_counters > 0 && !apex_options::papi_suspend() && thread_papi_state == papi_running) { int rc = PAPI_read( EventSet, main_timer->papi_start_values ); @@ -1167,6 +1172,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl // output to screen? if ((apex_options::use_screen_output() || + apex_options::use_taskgraph_output() || apex_options::use_csv_output()) && node_id == 0) { size_t ignored = 0; @@ -1239,7 +1245,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl } #endif // restart the main timer - main_timer = std::make_shared(task_identifier::get_task_id(string(APEX_MAIN))); + main_timer = std::make_shared(task_wrapper::get_apex_main_wrapper()); if (data.reset) { reset_all(); } @@ -1306,7 +1312,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl /* When a start event happens, create a profiler object. Unless this * named event is throttled, in which case do nothing, as quickly as possible */ - inline bool profiler_listener::_common_start(task_wrapper * tt_ptr, bool is_resume) { + inline bool profiler_listener::_common_start(std::shared_ptr &tt_ptr, bool is_resume) { if (!_done) { #if defined(APEX_THROTTLE) if (!apex_options::use_tau()) { @@ -1314,7 +1320,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl unordered_set::const_iterator it; { read_lock_type l(throttled_event_set_mutex); - it = throttled_tasks.find(*tt_ptr->task_id); + it = throttled_tasks.find(*tt_ptr->get_task_id()); } if (it != throttled_tasks.end()) { /* @@ -1327,8 +1333,9 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl } #endif // start the profiler object, which starts our timers - //std::shared_ptr p = std::make_shared(tt_ptr->task_id, is_resume); - profiler * p = new profiler(tt_ptr->task_id, is_resume); + //std::shared_ptr p = std::make_shared(tt_ptr, is_resume); + // get the right task identifier, based on whether there are aliases + profiler * p = new profiler(tt_ptr, is_resume); p->guid = thread_instance::get_guid(); thread_instance::instance().set_current_profiler(p); #if APEX_HAVE_PAPI @@ -1361,7 +1368,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl // if we aren't processing profiler objects, just return. if (!apex_options::process_async_state()) { return; } #ifdef APEX_TRACE_APEX - if (p->task_id->name == "apex::process_profiles") { return; } + if (p->get_task_id()->name == "apex::process_profiles") { return; } #endif // we have to make a local copy, because lockfree queues DO NOT SUPPORT shared_ptrs! thequeue()->enqueue(p); @@ -1398,13 +1405,13 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl } /* Start the timer */ - bool profiler_listener::on_start(task_wrapper * tt_ptr) { + bool profiler_listener::on_start(std::shared_ptr &tt_ptr) { return _common_start(tt_ptr, false); } /* This is just like starting a timer, but don't increment the number of calls * value. That is because we are restarting an existing timer. */ - bool profiler_listener::on_resume(task_wrapper * tt_ptr) { + bool profiler_listener::on_resume(std::shared_ptr &tt_ptr) { return _common_start(tt_ptr, true); } @@ -1442,22 +1449,19 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl } } - void profiler_listener::on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) { + void profiler_listener::on_task_complete(std::shared_ptr &tt_ptr) { //printf("New task: %llu\n", task_id); fflush(stdout); if (!apex_options::use_taskgraph_output()) { return; } + // get the right task identifier, based on whether there are aliases + task_identifier * id = tt_ptr->get_task_id(); // if the parent task is not null, use it (obviously) - if (parent_ptr != nullptr) { - dependency_queue()->enqueue(new task_dependency(parent_ptr->task_id, tt_ptr->task_id)); + if (tt_ptr->parent != nullptr) { + task_identifier * pid = tt_ptr->parent->get_task_id(); + dependency_queue.enqueue(new task_dependency(pid, id)); return; } - // get the current profiler - profiler * p = thread_instance::instance().get_current_profiler(); - if (p != NULL) { - dependency_queue()->enqueue(new task_dependency(p->task_id, tt_ptr->task_id)); - } else { - task_identifier * parent = task_identifier::get_task_id(string(APEX_MAIN)); - dependency_queue()->enqueue(new task_dependency(parent, tt_ptr->task_id)); - } + task_identifier * parent = task_wrapper::get_apex_main_wrapper()->task_id; + dependency_queue.enqueue(new task_dependency(parent, id)); } /* Communication send event. Save the number of bytes. */ diff --git a/src/apex/profiler_listener.hpp b/src/apex/profiler_listener.hpp index c913918f..c19b5626 100644 --- a/src/apex/profiler_listener.hpp +++ b/src/apex/profiler_listener.hpp @@ -10,6 +10,8 @@ #endif #include "apex_api.hpp" +#include "profiler.hpp" +#include "task_wrapper.hpp" #include "event_listener.hpp" #include "apex_types.h" #include @@ -92,7 +94,7 @@ class profiler_listener : public event_listener { unsigned int process_dependency(task_dependency* td); int node_id; std::mutex _mtx; - bool _common_start(task_wrapper * tt_ptr, bool is_resume); // internal, inline function + bool _common_start(std::shared_ptr &tt_ptr, bool is_resume); // internal, inline function void _common_stop(std::shared_ptr &p, bool is_yield); // internal, inline function void push_profiler(int my_tid, std::shared_ptr &p); std::unordered_map task_map; @@ -187,11 +189,11 @@ class profiler_listener : public event_listener { void on_new_node(node_event_data &data); void on_new_thread(new_thread_event_data &data); void on_exit_thread(event_data &data); - bool on_start(task_wrapper * tt_ptr); + bool on_start(std::shared_ptr &tt_ptr); void on_stop(std::shared_ptr &p); void on_yield(std::shared_ptr &p); - bool on_resume(task_wrapper * tt_ptr); - void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr); + bool on_resume(std::shared_ptr &tt_ptr); + void on_task_complete(std::shared_ptr &tt_ptr); void on_sample_value(sample_value_event_data &data); void on_periodic(periodic_event_data &data); void on_custom_event(custom_event_data &event_data); diff --git a/src/apex/task_wrapper.hpp b/src/apex/task_wrapper.hpp index 71024bf4..7b2ae94b 100644 --- a/src/apex/task_wrapper.hpp +++ b/src/apex/task_wrapper.hpp @@ -1,8 +1,13 @@ #pragma once +namespace apex { +struct task_wrapper; +}; + #include "task_identifier.hpp" #include "profiler.hpp" #include +#include namespace apex { @@ -11,10 +16,39 @@ struct task_wrapper { profiler * prof; uint64_t guid; uint64_t parent_guid; + std::shared_ptr parent; std::vector data_ptr; + std::unordered_set aliases; task_wrapper(void) : - task_id(nullptr), prof(nullptr), guid(0ull), - parent_guid(0ull) { } + task_id(nullptr), + prof(nullptr), + guid(0ull), + parent_guid(0ull), + parent(nullptr) + { } + inline task_identifier * get_task_id(void) { + if (!aliases.empty()) { + task_identifier * id = nullptr; + // find the first alias that isn't the same as the original name + for (auto tmp : aliases) { + if (tmp != id) { + id = tmp; + return id; + } + } + } + return task_id; + } + static inline std::shared_ptr & get_apex_main_wrapper(void) { + static std::shared_ptr tt_ptr(nullptr); + if (tt_ptr.get() != nullptr) { + return tt_ptr; + } + const std::string apex_main_str("APEX MAIN"); + tt_ptr = std::make_shared(); + tt_ptr->task_id = task_identifier::get_task_id(apex_main_str); + return tt_ptr; + } }; // struct task_wrapper -}; // namespace apex \ No newline at end of file +}; // namespace apex diff --git a/src/apex/tau_listener.cpp b/src/apex/tau_listener.cpp index e1447648..656e53ec 100644 --- a/src/apex/tau_listener.cpp +++ b/src/apex/tau_listener.cpp @@ -139,30 +139,30 @@ void tau_listener::on_exit_thread(event_data &data) { return; } -inline bool tau_listener::_common_start(task_wrapper * tt_ptr) { +inline bool tau_listener::_common_start(std::shared_ptr &tt_ptr) { if (!_terminate) { - my_Tau_start(tt_ptr->task_id->get_name().c_str()); + my_Tau_start(tt_ptr->get_task_id()->get_name().c_str()); } else { return false; } return true; } -bool tau_listener::on_start(task_wrapper * tt_ptr) { +bool tau_listener::on_start(std::shared_ptr &tt_ptr) { return _common_start(tt_ptr); } -bool tau_listener::on_resume(task_wrapper * tt_ptr) { +bool tau_listener::on_resume(std::shared_ptr &tt_ptr) { return _common_start(tt_ptr); } inline void tau_listener::_common_stop(std::shared_ptr &p) { static string empty(""); if (!_terminate) { - if (p->task_id->get_name().compare(empty) == 0) { + if (p->tt_ptr->get_task_id()->get_name().compare(empty) == 0) { my_Tau_global_stop(); // stop the top level timer } else { - my_Tau_stop(p->task_id->get_name().c_str()); + my_Tau_stop(p->tt_ptr->get_task_id()->get_name().c_str()); } } return; diff --git a/src/apex/tau_listener.hpp b/src/apex/tau_listener.hpp index bf5618b7..1216b717 100644 --- a/src/apex/tau_listener.hpp +++ b/src/apex/tau_listener.hpp @@ -14,7 +14,7 @@ class tau_listener : public event_listener { private: void _init(void); bool _terminate; - bool _common_start(task_wrapper * tt_ptr); + bool _common_start(std::shared_ptr &tt_ptr); void _common_stop(std::shared_ptr &p); public: tau_listener (void); @@ -28,13 +28,12 @@ class tau_listener : public event_listener { void on_new_node(node_event_data &data); void on_new_thread(new_thread_event_data &data); void on_exit_thread(event_data &data); - bool on_start(task_wrapper * tt_ptr); + bool on_start(std::shared_ptr &tt_ptr); void on_stop(std::shared_ptr &p); void on_yield(std::shared_ptr &p); - bool on_resume(task_wrapper * tt_ptr); - void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) { + bool on_resume(std::shared_ptr &tt_ptr); + void on_task_complete(std::shared_ptr &tt_ptr) { APEX_UNUSED(tt_ptr); - APEX_UNUSED(parent_ptr); }; void on_sample_value(sample_value_event_data &data); void on_periodic(periodic_event_data &data); diff --git a/src/apex/thread_instance.cpp b/src/apex/thread_instance.cpp index 107fe226..ddbfa970 100644 --- a/src/apex/thread_instance.cpp +++ b/src/apex/thread_instance.cpp @@ -66,7 +66,6 @@ std::unordered_set thread_instance::open_profilers; // Global static unordered map of parent GUIDs to child GUIDs // to handle "overlapping timer" problem. std::unordered_map* > thread_instance::children_to_resume; -static std::mutex _profiler_stack_mutex; thread_instance& thread_instance::instance(bool is_worker) { if( _instance == nullptr ) { @@ -266,7 +265,7 @@ void thread_instance::set_current_profiler(profiler * the_profiler) { instance().current_profilers.push_back(the_profiler); } -profiler * thread_instance::restore_children_profilers(task_wrapper * tt_ptr) { +profiler * thread_instance::restore_children_profilers(std::shared_ptr &tt_ptr) { profiler * parent = instance().get_current_profiler(); // if there are no children to restore, return. if (tt_ptr == nullptr || tt_ptr->data_ptr.size() == 0) {return parent;} @@ -287,7 +286,7 @@ profiler * thread_instance::restore_children_profilers(task_wrapper * tt_ptr) { return parent; } -void thread_instance::clear_current_profiler(profiler * the_profiler, bool save_children, task_wrapper * tt_ptr) { +void thread_instance::clear_current_profiler(profiler * the_profiler, bool save_children, std::shared_ptr &tt_ptr) { // this is a stack variable that provides safety when using recursion. static APEX_NATIVE_TLS bool fixing_stack = false; // this is a serious problem... @@ -324,7 +323,7 @@ void thread_instance::clear_current_profiler(profiler * the_profiler, bool save_ tt_ptr->data_ptr.push_back(tmp); /* Stop the copy. The original will get reset when the parent resumes. */ - stop(profiler_copy); // we better be re-entrant safe! + stop(profiler_copy, false); // we better be re-entrant safe! } else { // since we aren't yielding, just stop the children. stop(tmp); // we better be re-entrant safe! diff --git a/src/apex/thread_instance.hpp b/src/apex/thread_instance.hpp index 8f896db7..7fb718b4 100644 --- a/src/apex/thread_instance.hpp +++ b/src/apex/thread_instance.hpp @@ -84,10 +84,10 @@ class thread_instance { static bool map_id_to_worker(int id); static int get_num_threads(void) { return _num_threads; }; std::string map_addr_to_name(apex_function_address function_address); - static profiler * restore_children_profilers(task_wrapper * tt_ptr); + static profiler * restore_children_profilers(std::shared_ptr &tt_ptr); static void set_current_profiler(profiler * the_profiler); static profiler * get_current_profiler(void); - static void clear_current_profiler(profiler * the_profiler, bool save_children, task_wrapper * tt_ptr); + static void clear_current_profiler(profiler * the_profiler, bool save_children, std::shared_ptr &tt_ptr); static const char * program_path(void); static bool is_worker() { return instance()._is_worker; } static uint64_t get_guid() { return instance()._get_guid(); } @@ -97,7 +97,7 @@ class thread_instance { static void add_open_profiler(profiler* p) { std::unique_lock l(_open_profiler_mutex); std::stringstream ss; - ss << p->task_id->get_name(); + ss << p->get_task_id()->get_name(); ss << p->time_point_to_nanoseconds(p->start); open_profilers.insert(ss.str()); } @@ -105,7 +105,7 @@ class thread_instance { if (p == NULL) return; std::unique_lock l(_open_profiler_mutex); std::stringstream ss; - ss << p->task_id->get_name(); + ss << p->get_task_id()->get_name(); ss << p->time_point_to_nanoseconds(p->start); auto tmp = open_profilers.find(ss.str()); if (tmp != open_profilers.end()) { diff --git a/src/examples/Overhead/testOverhead.cpp b/src/examples/Overhead/testOverhead.cpp index 70a04d38..1d80025e 100644 --- a/src/examples/Overhead/testOverhead.cpp +++ b/src/examples/Overhead/testOverhead.cpp @@ -31,7 +31,7 @@ inline int foo (int i) { return dummy; } -inline int bar (int i, apex::task_wrapper * foo_ptr) { +inline int bar (int i, std::shared_ptr foo_ptr) { static int limit = sqrt(INT_MAX >> 1); int j; int dummy = 1; @@ -86,7 +86,7 @@ void* someThread(void* tmp) { // only time this for loop apex::profiler * st = apex::start((apex_function_address)someThread); for (i = 0 ; i < ITERATIONS ; i++) { - apex::task_wrapper * foo_ptr = apex::new_task((apex_function_address)foo); + std::shared_ptr foo_ptr = apex::new_task((apex_function_address)foo); apex::start(foo_ptr); total += foo(i); total += bar(i, foo_ptr); diff --git a/src/unit_tests/C++/CMakeLists.txt b/src/unit_tests/C++/CMakeLists.txt index 6f3af4fc..f133cacd 100644 --- a/src/unit_tests/C++/CMakeLists.txt +++ b/src/unit_tests/C++/CMakeLists.txt @@ -35,6 +35,7 @@ set(example_programs apex_shutdown_throttling apex_hpx_direct_actions apex_hpx_task_wrapper_direct_actions + apex_hpx_annotated_functions apex_profiler_guids apex_fibonacci_std_async ) diff --git a/src/unit_tests/C++/apex_fibonacci_std_async.cpp b/src/unit_tests/C++/apex_fibonacci_std_async.cpp index abb42005..9a7417a7 100644 --- a/src/unit_tests/C++/apex_fibonacci_std_async.cpp +++ b/src/unit_tests/C++/apex_fibonacci_std_async.cpp @@ -7,7 +7,7 @@ #define FIB_RESULTS_PRE 41 int fib_results[FIB_RESULTS_PRE] = {0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765,10946,17711,28657,46368,75025,121393,196418,317811,514229,832040,1346269,2178309,3524578,5702887,9227465,14930352,24157817,39088169,63245986,102334155}; -int fib (int in, apex::task_wrapper * parent) { +int fib (int in, std::shared_ptr parent) { apex::scoped_thread st("fib thread"); apex::scoped_timer ast((uint64_t)&fib, parent); if (in == 0) { diff --git a/src/unit_tests/C++/apex_hpx_annotated_functions.cpp b/src/unit_tests/C++/apex_hpx_annotated_functions.cpp new file mode 100644 index 00000000..0d11b21d --- /dev/null +++ b/src/unit_tests/C++/apex_hpx_annotated_functions.cpp @@ -0,0 +1,136 @@ +#include +#include +#include +#include +#include +#include +#include "apex_api.hpp" + +uint32_t numthreads = 0; +int threads_per_core = 8; +__thread uint64_t guid = 0; +const int num_iterations = 10; + +#ifdef DEBUG +#define __DEBUG_PRINT__ 1 +#endif + +#ifndef __APPLE__ +pthread_barrier_t barrier; +#endif + +int nsleep(long miliseconds, int tid) +{ + struct timespec req, rem; + // add some variation + double randval = 1.0 + (((double)(rand())) / RAND_MAX); + miliseconds = (int)(miliseconds * randval); + + if(miliseconds > 999) + { + req.tv_sec = (int)(miliseconds / 1000); /* Must be Non-Negative */ + req.tv_nsec = (miliseconds - ((long)req.tv_sec * 1000)) * 1000000; /* Must be in range of 0 to 999999999 */ + } + else + { + req.tv_sec = 0; /* Must be Non-Negative */ + req.tv_nsec = miliseconds * 1000000; /* Must be in range of 0 to 999999999 */ + } + +#ifdef __DEBUG_PRINT__ + std::stringstream buf; + buf << "APP: " << tid << ": Computing " << miliseconds << " miliseconds\n"; std::cout << buf.str(); +#endif + return nanosleep(&req , &rem); +} + +static void init_guid(int tid) { + guid = ((UINT64_MAX/numthreads) * tid); +} + +void innerLoop(int *tid) { + std::shared_ptr tt_ptr = apex::new_task(__func__); + apex::update_task(tt_ptr, "foo"); + apex::start(tt_ptr); + + /* do some computation */ + int ret = nsleep(10, *tid); // after - t: 10, af: 0 + + /* stop the timer */ + apex::stop(tt_ptr); +} + +void* someThread(void* tmp) +{ + /* Register this thread with APEX */ + int* tid = (int*)tmp; + char name[32]; + sprintf(name, "worker-thread %d", *tid); +#ifndef __APPLE__ +#ifndef APEX_HAVE_OTF2 + pthread_barrier_wait(&barrier); +#endif +#endif + apex::register_thread(name); + init_guid(*tid); + + apex::profiler* p = apex::start(__func__); + for (int i = 0 ; i < num_iterations ; i++) { + innerLoop(tid); + } + apex::stop(p); + + /* tell APEX that this thread is exiting */ + apex::exit_thread(); + return NULL; +} + +int main (int argc, char** argv) { + /* initialize APEX */ + apex::init("apex::start unit test", 0, 1); + /* important, to make sure we get correct profiles at the end */ + apex::apex_options::use_screen_output(true); + /* start a timer */ + apex::profiler* p = apex::start("main"); + /* Spawn X threads */ + if (argc > 1) { + numthreads = strtoul(argv[1],NULL,0); + } else { + numthreads = apex::hardware_concurrency() * threads_per_core; // many threads per core. Stress it! + } +#ifndef __APPLE__ + pthread_barrier_init(&barrier, NULL, numthreads); +#endif + if (apex::apex_options::use_tau() || apex::apex_options::use_otf2()) { + numthreads = std::min(numthreads, apex::hardware_concurrency()); + } + int * tids = (int*)calloc(numthreads, sizeof(int)); + pthread_t * thread = (pthread_t*)calloc(numthreads, sizeof(pthread_t)); + for (uint32_t i = 0 ; i < numthreads ; i++) { + tids[i] = i; + pthread_create(&(thread[i]), NULL, someThread, &(tids[i])); + } + /* wait for the threads to finish */ + for (uint32_t i = 0 ; i < numthreads ; i++) { + pthread_join(thread[i], NULL); + } + free(tids); + free(thread); + /* stop our main timer */ + apex::stop(p); + /* finalize APEX */ + apex::finalize(); + apex_profile * profile1 = apex::get_profile("direct_action"); + apex_profile * profile2 = apex::get_profile("innerLoop"); + if (profile1 && profile2) { + std::cout << "direct_action reported calls : " << profile1->calls << std::endl; + std::cout << "innerLoop reported calls : " << profile2->calls << std::endl; + if (profile1->calls == num_iterations * numthreads && + profile1->calls == profile1->calls) { + std::cout << "Test passed." << std::endl; + } + } + apex::cleanup(); + return 0; +} + diff --git a/src/unit_tests/C++/apex_hpx_direct_actions.cpp b/src/unit_tests/C++/apex_hpx_direct_actions.cpp index 6769fa07..241dd6dc 100644 --- a/src/unit_tests/C++/apex_hpx_direct_actions.cpp +++ b/src/unit_tests/C++/apex_hpx_direct_actions.cpp @@ -44,7 +44,7 @@ int nsleep(long miliseconds, int tid) } void innerLoop(int *tid) { - apex::task_wrapper * tt_ptr = apex::new_task(__func__); + std::shared_ptr tt_ptr = apex::new_task(__func__); #ifdef __DEBUG_PRINT__ std::stringstream buf; buf << "APP: " << *tid << ": Starting thread " << tt_ptr->guid << "\n"; std::cout << buf.str(); @@ -55,7 +55,7 @@ void innerLoop(int *tid) { int ret = nsleep(10, *tid); // after - t: 10, af: 0 /* Start a timer like an "direct_action" */ - apex::task_wrapper * af = apex::new_task("direct_action", UINTMAX_MAX, tt_ptr); + std::shared_ptr af = apex::new_task("direct_action", UINTMAX_MAX, tt_ptr); #ifdef __DEBUG_PRINT__ buf.str(""); buf.clear(); buf << "APP: " << *tid << ": Starting direct_action " << af->guid << "\n"; std::cout << buf.str(); diff --git a/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp b/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp index 1137fb56..bb137c10 100644 --- a/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp +++ b/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp @@ -49,7 +49,7 @@ static void init_guid(int tid) { } void innerLoop(int *tid) { - apex::task_wrapper * tt_ptr = apex::new_task(__func__); + std::shared_ptr tt_ptr = apex::new_task(__func__); #ifdef __DEBUG_PRINT__ std::stringstream buf; buf << "APP: " << *tid << ": Starting thread " << tt_ptr->guid << "\n"; std::cout << buf.str(); @@ -60,7 +60,7 @@ void innerLoop(int *tid) { int ret = nsleep(10, *tid); // after - t: 10, af: 0 /* Start a timer like an "direct_action" */ - apex::task_wrapper * af = apex::new_task("direct_action"); + std::shared_ptr af = apex::new_task("direct_action"); #ifdef __DEBUG_PRINT__ buf.str(""); buf.clear(); buf << "APP: " << *tid << ": Starting direct_action " << af->guid << "\n"; std::cout << buf.str(); diff --git a/src/unit_tests/C++/apex_task_wrapper.cpp b/src/unit_tests/C++/apex_task_wrapper.cpp index f340b30b..507ab480 100644 --- a/src/unit_tests/C++/apex_task_wrapper.cpp +++ b/src/unit_tests/C++/apex_task_wrapper.cpp @@ -8,7 +8,7 @@ #define FIB_RESULTS_PRE 41 int fib_results[FIB_RESULTS_PRE] = {0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765,10946,17711,28657,46368,75025,121393,196418,317811,514229,832040,1346269,2178309,3524578,5702887,9227465,14930352,24157817,39088169,63245986,102334155}; -int fib (int in, apex::task_wrapper * tt) { +int fib (int in, std::shared_ptr tt) { apex::start(tt); if (in == 0) { apex::stop(tt); @@ -21,13 +21,13 @@ int fib (int in, apex::task_wrapper * tt) { int a = in-1; // called from the parent, not when the child is spawned! // specify all arguments - apex::task_wrapper * att = apex::new_task((apex_function_address)&fib, UINTMAX_MAX, tt); + std::shared_ptr att = apex::new_task((apex_function_address)&fib, UINTMAX_MAX, tt); auto future_a = std::async(std::launch::async, fib, a, att); int b = in-2; // called from the parent, not when the child is spawned! // use default arguments - apex::task_wrapper * btt = apex::new_task((apex_function_address)&fib); + std::shared_ptr btt = apex::new_task((apex_function_address)&fib); auto future_b = std::async(std::launch::async, fib, b, btt); int result_a = future_a.get(); @@ -59,7 +59,7 @@ int main(int argc, char *argv[]) { } // called from the parent, not when the child is spawned! - apex::task_wrapper * tt = apex::new_task((apex_function_address)&fib); + std::shared_ptr tt = apex::new_task((apex_function_address)&fib); auto future = std::async(std::launch::async, fib, i, tt); int result = future.get(); std::cout << "fib of " << i << " is " << result << " (valid value: " << fib_results[i] << ")" << std::endl;