Skip to content

Commit

Permalink
Fixing the task dependency tracking
Browse files Browse the repository at this point in the history
Because HPX uses annotated tasks to "change" the name of
tasks when they execute, we track the "aliases" that a task has.
Then, when we track the dependencies, we use the alias rather
than the HPX thread name.  This requres keeping track of the
task_wrapper objects inside the profiler object, and it also
means that we might have either a memory leak of task_wrapper
objects or that if a parent finishes before a child, that the
task_wrapper won't be available to generate the dependency
correctly.  Either would be bad.  Need more testing to see how
it behaves with HPX.
  • Loading branch information
khuck committed May 9, 2018
1 parent 2e48b47 commit 498dfeb
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 88 deletions.
81 changes: 45 additions & 36 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,16 @@ inline task_wrapper * _new_task(task_identifier * id, uint64_t task_id,
// was a parent passed in?
if (parent_task != nullptr) {
tt_ptr->parent_guid = parent_task->guid;
tt_ptr->parent = parent_task;
} else {
// if not, is there a current timer?
profiler * p = thread_instance::instance().get_current_profiler();
if (p != nullptr) {
tt_ptr->parent_guid = p->guid;
tt_ptr->parent = p->tt_ptr;
} else {
tt_ptr->parent = task_wrapper::get_apex_main_wrapper();
// tt_ptr->parent_guid is 0 by default
}
}
if (task_id == UINTMAX_MAX) {
Expand All @@ -438,12 +443,6 @@ inline task_wrapper * _new_task(task_identifier * id, uint64_t task_id,
// use the runtime provided GUID
tt_ptr->guid = task_id;
}
if (_notify_listeners) {
//read_lock_type l(instance->listener_mutex);
for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) {
instance->listeners[i]->on_new_task(tt_ptr, parent_task);
}
}
return tt_ptr;
}

Expand Down Expand Up @@ -560,7 +559,7 @@ void debug_print(const char * event, task_wrapper * tt_ptr) {
<< endl; fflush(stdout);
} else {
cout << thread_instance::get_id() << " " << event << " : " << tt_ptr->guid << " : " <<
tt_ptr->task_id->get_name() << endl; fflush(stdout);
tt_ptr->get_task_id()->get_name() << endl; fflush(stdout);
}
}

Expand Down Expand Up @@ -712,22 +711,24 @@ profiler* resume(profiler * p) {
p->restart();
if (_notify_listeners) {
try {
/*
task_wrapper * tt_ptr = new task_wrapper();
tt_ptr->task_id = p->task_id;
tt_ptr->prof = p;
tt_ptr->guid = p->guid;
*/
// skip the profiler_listener - we are restoring a child timer
// for a parent that was yielded.
for (unsigned int i = 1 ; i < instance->listeners.size() ; i++) {
instance->listeners[i]->on_resume(tt_ptr);
instance->listeners[i]->on_resume(p->tt_ptr);
}
} catch (disabled_profiler_exception e) {
APEX_UTIL_REF_COUNT_FAILED_RESUME
return profiler::get_disabled_profiler();
}
}
static std::string apex_process_profile_str("apex::process_profiles");
if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) {
if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) {
APEX_UTIL_REF_COUNT_APEX_INTERNAL_RESUME
} else {
APEX_UTIL_REF_COUNT_RESUME
Expand Down Expand Up @@ -774,7 +775,7 @@ void set_state(apex_thread_state state) {
instance->set_state(thread_instance::get_id(), state);
}

void stop(profiler* the_profiler) {
void stop(profiler* the_profiler, bool cleanup) {
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_DISABLED_STOP
Expand Down Expand Up @@ -806,13 +807,24 @@ void stop(profiler* the_profiler) {
instance->listeners[i]->on_stop(p);
}
}
//cout << thread_instance::get_id() << " Stop : " << the_profiler->task_id->get_name() << endl; fflush(stdout);
//cout << thread_instance::get_id() << " Stop : " << the_profiler->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout);
static std::string apex_process_profile_str("apex::process_profiles");
if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) {
if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) {
APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP
} else {
APEX_UTIL_REF_COUNT_STOP
}
if (cleanup) {
if (_notify_listeners) {
//read_lock_type l(instance->listener_mutex);
for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) {
instance->listeners[i]->on_task_complete(p->tt_ptr);
}
}
// TODO - need to think about how to safely do this. The parent may finish before the child.
delete(p->tt_ptr);
p->tt_ptr = nullptr;
}
}

void stop(task_wrapper * tt_ptr) {
Expand All @@ -822,7 +834,7 @@ void stop(task_wrapper * tt_ptr) {
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_DISABLED_STOP
free(tt_ptr);
delete(tt_ptr);
return;
}
if (tt_ptr == nullptr || tt_ptr->prof == nullptr) {
Expand All @@ -831,20 +843,20 @@ void stop(task_wrapper * tt_ptr) {
}
if (tt_ptr->prof == profiler::get_disabled_profiler()) {
APEX_UTIL_REF_COUNT_DISABLED_STOP
free(tt_ptr);
delete(tt_ptr);
return; // profiler was throttled.
}
if (tt_ptr->prof->stopped) {
APEX_UTIL_REF_COUNT_DOUBLE_STOP
free(tt_ptr);
delete(tt_ptr);
return;
}
thread_instance::instance().clear_current_profiler(tt_ptr->prof, false, nullptr);
apex* instance = apex::instance(); // get the Apex static instance
// protect against calls after finalization
if (!instance || _exited || _measurement_stopped) {
APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE
free(tt_ptr);
delete(tt_ptr);
return;
}
std::shared_ptr<profiler> p{tt_ptr->prof};
Expand All @@ -854,14 +866,21 @@ void stop(task_wrapper * tt_ptr) {
instance->listeners[i]->on_stop(p);
}
}
//cout << thread_instance::get_id() << " Stop : " << tt_ptr->task_id->get_name() << endl; fflush(stdout);
//cout << thread_instance::get_id() << " Stop : " << tt_ptr->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout);
static std::string apex_process_profile_str("apex::process_profiles");
if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) {
if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) {
APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP
} else {
APEX_UTIL_REF_COUNT_STOP
}
free(tt_ptr);
if (_notify_listeners) {
//read_lock_type l(instance->listener_mutex);
for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) {
instance->listeners[i]->on_task_complete(tt_ptr);
}
}
// TODO - need to think about how to safely do this. The parent may finish before the child.
delete(tt_ptr);
}

void yield(profiler* the_profiler)
Expand Down Expand Up @@ -897,9 +916,9 @@ void yield(profiler* the_profiler)
instance->listeners[i]->on_yield(p);
}
}
//cout << thread_instance::get_id() << " Yield : " << the_profiler->task_id->get_name() << endl; fflush(stdout);
//cout << thread_instance::get_id() << " Yield : " << the_profiler->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout);
static std::string apex_process_profile_str("apex::process_profiles");
if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) {
if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) {
APEX_UTIL_REF_COUNT_APEX_INTERNAL_YIELD
} else {
APEX_UTIL_REF_COUNT_YIELD
Expand Down Expand Up @@ -942,9 +961,9 @@ void yield(task_wrapper * tt_ptr)
instance->listeners[i]->on_yield(p);
}
}
//cout << thread_instance::get_id() << " Yield : " << tt_ptr->prof->task_id->get_name() << endl; fflush(stdout);
//cout << thread_instance::get_id() << " Yield : " << tt_ptr->prof->tt_ptr->get_task_id()->get_name() << endl; fflush(stdout);
static std::string apex_process_profile_str("apex::process_profiles");
if (p->task_id->get_name(false).compare(apex_process_profile_str) == 0) {
if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) == 0) {
APEX_UTIL_REF_COUNT_APEX_INTERNAL_YIELD
} else {
APEX_UTIL_REF_COUNT_YIELD
Expand Down Expand Up @@ -1033,19 +1052,9 @@ task_wrapper * update_task(task_wrapper * wrapper, const std::string &timer_name
if (apex_options::disable() == true) { return nullptr; }
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) { return nullptr; }
if (wrapper == nullptr) {
apex* instance = apex::instance(); // get the Apex static instance
if (!instance || _exited) { return nullptr; } // protect against calls after finalization
task_identifier * id = task_identifier::get_task_id(timer_name);
wrapper = _new_task(id, UINTMAX_MAX, nullptr, instance);
} else {
wrapper->task_id = task_identifier::get_task_id(timer_name);
/* Oh, help us if a profile has started already.
* This could be a horrible idea. */
if (wrapper->prof != nullptr) {
wrapper->prof->task_id = wrapper->task_id;
}
}
assert(wrapper);
task_identifier * id = task_identifier::get_task_id(timer_name);
wrapper->aliases.insert(id);
return wrapper;
}

Expand Down
10 changes: 5 additions & 5 deletions src/apex/apex_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
#include <hpx/config.hpp>
#endif

#include <string>
#include <set>
#include <vector>
#include <stdint.h>
#include "apex_types.h"
#include "apex_options.hpp"
#include "apex_export.h"
Expand All @@ -26,6 +22,10 @@
#include "task_wrapper.hpp"
#include <functional>
#include <stdio.h>
#include <string>
#include <set>
#include <vector>
#include <stdint.h>

#endif /* DOXYGEN_SHOULD_SKIP_THIS */

Expand Down Expand Up @@ -166,7 +166,7 @@ APEX_EXPORT profiler * start(task_wrapper * task_wrapper_ptr);
\return No return value.
\sa @ref apex::start, @ref apex::yield, @ref apex::resume
*/
APEX_EXPORT void stop(profiler * the_profiler);
APEX_EXPORT void stop(profiler * the_profiler, bool cleanup=true);

/**
\brief Stop a timer.
Expand Down
4 changes: 2 additions & 2 deletions src/apex/concurrency_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ bool concurrency_handler::common_start(task_identifier *id) {
}

bool concurrency_handler::on_start(task_wrapper * tt_ptr) {
return common_start(tt_ptr->task_id);
return common_start(tt_ptr->get_task_id());
}

bool concurrency_handler::on_resume(task_wrapper * tt_ptr) {
return common_start(tt_ptr->task_id);
return common_start(tt_ptr->get_task_id());
}

void concurrency_handler::common_stop(std::shared_ptr<profiler> &p) {
Expand Down
3 changes: 1 addition & 2 deletions src/apex/concurrency_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ class concurrency_handler : public handler, public event_listener {
void on_stop(std::shared_ptr<profiler> &p);
void on_yield(std::shared_ptr<profiler> &p);
bool on_resume(task_wrapper * tt_ptr);
void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) {
void on_task_complete(task_wrapper * tt_ptr) {
APEX_UNUSED(tt_ptr);
APEX_UNUSED(parent_ptr);
};
void on_sample_value(sample_value_event_data &data) { APEX_UNUSED(data); };
void on_periodic(periodic_event_data &data) { APEX_UNUSED(data); };
Expand Down
2 changes: 1 addition & 1 deletion src/apex/event_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ timer_event_data::timer_event_data(task_identifier * id) : task_id(id) {

/* this object never actually gets instantiated. too much overhead. */
timer_event_data::timer_event_data(std::shared_ptr<profiler> &the_profiler) : my_profiler(the_profiler) {
this->task_id = the_profiler->task_id;
this->task_id = the_profiler->tt_ptr->get_task_id();
}

timer_event_data::~timer_event_data() {
Expand Down
2 changes: 1 addition & 1 deletion src/apex/event_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class event_listener
virtual void on_stop(std::shared_ptr<profiler> &p) = 0;
virtual void on_yield(std::shared_ptr<profiler> &p) = 0;
virtual bool on_resume(task_wrapper * tt_ptr) = 0;
virtual void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) = 0;
virtual void on_task_complete(task_wrapper * tt_ptr) = 0;
virtual void on_sample_value(sample_value_event_data &data) = 0;
virtual void on_periodic(periodic_event_data &data) = 0;
virtual void on_custom_event(custom_event_data &data) = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/apex/otf2_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ namespace apex {
}

bool otf2_listener::on_start(task_wrapper * tt_ptr) {
task_identifier * id = tt_ptr->task_id;
task_identifier * id = tt_ptr->get_task_id();
// don't close the archive on us!
read_lock_type lock(_archive_mutex);
// not likely, but just in case...
Expand Down
3 changes: 1 addition & 2 deletions src/apex/otf2_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ namespace apex {
void on_yield(std::shared_ptr<profiler> &p);
bool on_resume(task_wrapper * tt_ptr);
void on_sample_value(sample_value_event_data &data);
void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) {
void on_task_complete(task_wrapper * tt_ptr) {
APEX_UNUSED(tt_ptr);
APEX_UNUSED(parent_ptr);
};
void on_periodic(periodic_event_data &data)
{ APEX_UNUSED(data); };
Expand Down
8 changes: 4 additions & 4 deletions src/apex/policy_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,21 +443,21 @@ namespace apex {
}

bool policy_handler::on_start(task_wrapper * tt_ptr) {
call_policies(start_event_policies, (void *)tt_ptr->task_id, APEX_START_EVENT);
call_policies(start_event_policies, (void *)tt_ptr->get_task_id(), APEX_START_EVENT);
return true;
}

bool policy_handler::on_resume(task_wrapper * tt_ptr) {
call_policies(resume_event_policies, (void *)tt_ptr->task_id, APEX_RESUME_EVENT);
call_policies(resume_event_policies, (void *)tt_ptr->get_task_id(), APEX_RESUME_EVENT);
return true;
}

void policy_handler::on_stop(std::shared_ptr<profiler> &p) {
call_policies(stop_event_policies, (void *)p->task_id, APEX_STOP_EVENT);
call_policies(stop_event_policies, (void *)p->tt_ptr->get_task_id(), APEX_STOP_EVENT);
}

void policy_handler::on_yield(std::shared_ptr<profiler> &p) {
call_policies(yield_event_policies, (void *)p->task_id, APEX_YIELD_EVENT);
call_policies(yield_event_policies, (void *)p->tt_ptr->get_task_id(), APEX_YIELD_EVENT);
}

void policy_handler::on_sample_value(sample_value_event_data &data) {
Expand Down
3 changes: 1 addition & 2 deletions src/apex/policy_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ class policy_handler : public handler, public event_listener
void on_stop(std::shared_ptr<profiler> &p);
void on_yield(std::shared_ptr<profiler> &p);
bool on_resume(task_wrapper * tt_ptr);
void on_new_task(task_wrapper * tt_ptr, task_wrapper * parent_ptr) {
void on_task_complete(task_wrapper * tt_ptr) {
APEX_UNUSED(tt_ptr);
APEX_UNUSED(parent_ptr);
};
void on_sample_value(sample_value_event_data &data);
void on_custom_event(custom_event_data &data);
Expand Down
Loading

0 comments on commit 498dfeb

Please sign in to comment.