Skip to content

Commit

Permalink
Fixing deadlock in policy shutdown, and segfault when profiles aren't…
Browse files Browse the repository at this point in the history
… processed before exit
  • Loading branch information
khuck committed Mar 25, 2020
1 parent 7b2f200 commit d037ea4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
20 changes: 10 additions & 10 deletions src/apex/apex_policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ bool apex_final = false; // When do we stop?
apex_tuning_session * thread_cap_tuning_session = nullptr;

inline int __get_thread_cap(void) {
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) { return 1; } // we terminated, RCR has shut down.
if (apex::apex_options::disable() == true) { return 1; }
std::unique_lock<std::mutex> l{shutdown_mutex};
return (int)(thread_cap_tuning_session->thread_cap);
//return (int)*(tuning_session->__ah_inputs[0]);
}

inline void __set_thread_cap(int new_cap) {
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) { return; } // we terminated, RCR has shut down.
if (apex::apex_options::disable() == true) { return; }
std::unique_lock<std::mutex> l{shutdown_mutex};
thread_cap_tuning_session->thread_cap = (long int)new_cap;
return;
}
Expand Down Expand Up @@ -194,10 +194,10 @@ inline void __increase_cap() {
inline int apex_power_throttling_policy(apex_context const context)
{
APEX_UNUSED(context);
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) return APEX_NOERROR; // we terminated, RCR has shut down.
//if (apex::apex::instance()->get_node_id() == 0) return APEX_NOERROR;
// read energy counter and memory concurrency to determine system status
std::unique_lock<std::mutex> l{shutdown_mutex};
double power = apex::current_power_high();
thread_cap_tuning_session->moving_average =
((thread_cap_tuning_session->moving_average *
Expand Down Expand Up @@ -275,8 +275,8 @@ inline int apex_power_throttling_policy(apex_context const context)

int apex_throughput_throttling_policy(apex_context const context) {
APEX_UNUSED(context);
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) return APEX_NOERROR; // we terminated, RCR has shut down.
std::unique_lock<std::mutex> l{shutdown_mutex};
// Do we have a function of interest?
// No: do nothing, return.
// Yes: Get its profile, continue.
Expand Down Expand Up @@ -463,8 +463,8 @@ int apex_throughput_throttling_policy(apex_context const context) {
/* Discrete Space Hill Climbing Algorithm */
int apex_throughput_throttling_dhc_policy(apex_context const context) {
APEX_UNUSED(context);
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) return APEX_NOERROR; // we terminated, RCR has shut down.
std::unique_lock<std::mutex> l{shutdown_mutex};

#ifdef APEX_DEBUG_THROTTLE
printf("Throttling on name: %s\n",
Expand Down Expand Up @@ -636,7 +636,6 @@ int apex_throughput_throttling_dhc_policy(apex_context const context) {
#ifdef APEX_HAVE_ACTIVEHARMONY
int apex_throughput_throttling_ah_policy(apex_context const context) {
APEX_UNUSED(context);
std::unique_lock<std::mutex> l{shutdown_mutex};
if(apex_final) {
// Already finished.
return APEX_NOERROR;
Expand All @@ -645,6 +644,7 @@ int apex_throughput_throttling_ah_policy(apex_context const context) {
// Tuning session wasn't initialized?
return APEX_ERROR;
}
std::unique_lock<std::mutex> l{shutdown_mutex};
static double previous_value = 0.0; // instead of resetting.
static bool _converged_message = false;
if (ah_converged(thread_cap_tuning_session->htask)) {
Expand Down Expand Up @@ -734,8 +734,8 @@ int apex_throughput_throttling_ah_policy(apex_context const context) {
int apex_throughput_tuning_policy(apex_context const context) {
// do something.
APEX_UNUSED(context);
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) return APEX_NOERROR; // we terminated, RCR has shut down.
std::unique_lock<std::mutex> l{shutdown_mutex};
static double previous_value = 0.0; // instead of resetting.
static bool _converged_message = false;
if (ah_converged(thread_cap_tuning_session->htask)) {
Expand Down Expand Up @@ -807,22 +807,22 @@ int apex_throughput_tuning_policy(apex_context const context) {
int apex_custom_tuning_policy(shared_ptr<apex_tuning_session> tuning_session,
apex_context const context) {
APEX_UNUSED(context);
std::unique_lock<std::mutex> l{shutdown_mutex};
if (apex_final) return APEX_NOERROR; // we terminated, RCR has shut down.
std::unique_lock<std::mutex> l{shutdown_mutex};
if (ah_converged(tuning_session->htask)) {
if (!tuning_session->converged_message) {
tuning_session->converged_message = true;
cout << "Tuning has converged for session " << tuning_session->id
<< "." << endl;

if (ah_best(tuning_session->htask) < 0) {
cerr << "Error retrieving best tuning point." << endl;
return APEX_ERROR;
}
}
return APEX_NOERROR;
}

// get a measurement of our current setting
double new_value = tuning_session->metric_of_interest();

Expand Down
19 changes: 19 additions & 0 deletions src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,18 @@ std::unordered_set<profile*> free_profiles;
int num_worker_threads = thread_instance::get_num_threads();
task_identifier main_id(APEX_MAIN);
profile * total_time = get_profile(main_id);
/* The profiles haven't been processed yet. */
while (total_time == nullptr) {
#ifdef APEX_HAVE_HPX
// schedule an HPX action
apex_schedule_process_profiles();
#else
queue_signal.post();
#endif
// wait for profiles to update
usleep(100);
total_time = get_profile(main_id);
}
double wall_clock_main = total_time->get_accumulated() *
profiler::get_cpu_mhz();
#ifdef APEX_HAVE_HPX
Expand Down Expand Up @@ -1300,6 +1312,13 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
void profiler_listener::on_dump(dump_event_data &data) {
if (_done) { return; }

// trigger statistics updating
#ifdef APEX_HAVE_HPX
// schedule an HPX action
apex_schedule_process_profiles();
#else
queue_signal.post();
#endif
// wait until any other threads are done processing dependencies
while(consumer_task_running.test_and_set(memory_order_acq_rel)) { }

Expand Down

0 comments on commit d037ea4

Please sign in to comment.