From e335d24ded6b5f497a1c814b6658870f8a2d8b61 Mon Sep 17 00:00:00 2001 From: Kevin Huck Date: Mon, 9 Aug 2021 14:12:25 -0700 Subject: [PATCH] Don't use shared pointers when doing synchronous processing --- src/apex/profiler_listener.cpp | 145 ++++++++++++++++++++------------- src/apex/profiler_listener.hpp | 3 +- 2 files changed, 90 insertions(+), 58 deletions(-) diff --git a/src/apex/profiler_listener.cpp b/src/apex/profiler_listener.cpp index e57b6b03..0595f38b 100644 --- a/src/apex/profiler_listener.cpp +++ b/src/apex/profiler_listener.cpp @@ -257,10 +257,15 @@ std::unordered_set free_profiles; unsigned int profiler_listener::process_profile( std::shared_ptr &p, unsigned int tid) { - APEX_UNUSED(tid); if(p == nullptr) return 0; + return process_profile(*p,tid); + } + + unsigned int profiler_listener::process_profile(profiler& p, unsigned int tid) + { + APEX_UNUSED(tid); profile * theprofile; - if(p->is_reset == reset_type::ALL) { + if(p.is_reset == reset_type::ALL) { reset_all(); return 0; } @@ -269,8 +274,8 @@ std::unordered_set free_profiles; #if APEX_HAVE_PAPI tmp_num_counters = num_papi_counters; for (int i = 0 ; i < num_papi_counters ; i++) { - if (p->papi_stop_values[i] > p->papi_start_values[i]) { - values[i] = p->papi_stop_values[i] - p->papi_start_values[i]; + if (p.papi_stop_values[i] > p.papi_start_values[i]) { + values[i] = p.papi_stop_values[i] - p.papi_start_values[i]; } else { values[i] = 0.0; } @@ -278,21 +283,21 @@ std::unordered_set free_profiles; #endif std::unique_lock task_map_lock(_task_map_mutex); unordered_map::const_iterator it = - task_map.find(*(p->get_task_id())); + task_map.find(*(p.get_task_id())); if (it != task_map.end()) { // A profile for this ID already exists. theprofile = (*it).second; task_map_lock.unlock(); - if(p->is_reset == reset_type::CURRENT) { + if(p.is_reset == reset_type::CURRENT) { theprofile->reset(); } else { if (apex_options::track_memory()) { - theprofile->increment(p->elapsed(), tmp_num_counters, - values, p->allocations, p->frees, p->bytes_allocated, - p->bytes_freed, p->is_resume); + theprofile->increment(p.elapsed(), tmp_num_counters, + values, p.allocations, p.frees, p.bytes_allocated, + p.bytes_freed, p.is_resume); } else { - theprofile->increment(p->elapsed(), tmp_num_counters, - values, p->is_resume); + theprofile->increment(p.elapsed(), tmp_num_counters, + values, p.is_resume); } } #if defined(APEX_THROTTLE) @@ -304,22 +309,22 @@ std::unordered_set free_profiles; unordered_set::const_iterator it2; { read_lock_type l(throttled_event_set_mutex); - it2 = throttled_tasks.find(*(p->get_task_id())); + it2 = throttled_tasks.find(*(p.get_task_id())); } if (it2 == throttled_tasks.end()) { // lock the set for insert { write_lock_type l(throttled_event_set_mutex); // was it inserted when we were waiting? - it2 = throttled_tasks.find(*(p->get_task_id())); + it2 = throttled_tasks.find(*(p.get_task_id())); // no? OK - insert it. if (it2 == throttled_tasks.end()) { - throttled_tasks.insert(*(p->get_task_id())); + throttled_tasks.insert(*(p.get_task_id())); } } if (apex_options::use_verbose()) { cout << "APEX: disabling lightweight timer " - << p->get_task_id()->get_name() + << p.get_task_id()->get_name() << endl; fflush(stdout); } @@ -329,33 +334,33 @@ std::unordered_set free_profiles; #endif } else { // Create a new profile for this name. - if (apex_options::track_memory() && !p->is_counter) { - theprofile = new profile(p->is_reset == - reset_type::CURRENT ? 0.0 : p->elapsed(), - tmp_num_counters, values, p->is_resume, - p->allocations, p->frees, p->bytes_allocated, - p->bytes_freed); - task_map[*(p->get_task_id())] = theprofile; + if (apex_options::track_memory() && !p.is_counter) { + theprofile = new profile(p.is_reset == + reset_type::CURRENT ? 0.0 : p.elapsed(), + tmp_num_counters, values, p.is_resume, + p.allocations, p.frees, p.bytes_allocated, + p.bytes_freed); + task_map[*(p.get_task_id())] = theprofile; } else { - theprofile = new profile(p->is_reset == - reset_type::CURRENT ? 0.0 : p->elapsed(), - tmp_num_counters, values, p->is_resume, - p->is_counter ? APEX_COUNTER : APEX_TIMER); - task_map[*(p->get_task_id())] = theprofile; + theprofile = new profile(p.is_reset == + reset_type::CURRENT ? 0.0 : p.elapsed(), + tmp_num_counters, values, p.is_resume, + p.is_counter ? APEX_COUNTER : APEX_TIMER); + task_map[*(p.get_task_id())] = theprofile; } task_map_lock.unlock(); #ifdef APEX_HAVE_HPX #ifdef APEX_REGISTER_HPX3_COUNTERS if(!_done) { if(get_hpx_runtime_ptr() != nullptr && - p->get_task_id()->has_name()) { - std::string timer_name(p->get_task_id()->get_name()); + p.get_task_id()->has_name()) { + std::string timer_name(p.get_task_id()->get_name()); //Don't register timers containing "/" if(timer_name.find("/") == std::string::npos) { hpx::performance_counters::install_counter_type( std::string("/apex/") + timer_name, [p](bool r)->std::int64_t{ - std::int64_t value(p->elapsed()); + std::int64_t value(p.elapsed()); return value; }, std::string("APEX counter ") + timer_name, @@ -371,16 +376,16 @@ std::unordered_set free_profiles; } /* write the sample to the file */ if (apex_options::task_scatterplot()) { - if (!p->is_counter) { + if (!p.is_counter) { static int thresh = std::round((double)(RAND_MAX) * apex_options::scatterplot_fraction()); if (std::rand() < thresh) { - /* before calling p->get_task_id()->get_name(), make sure we create + /* before calling p.get_task_id()->get_name(), make sure we create * a thread_instance object that is NOT a worker. */ thread_instance::instance(false); std::unique_lock task_map_lock(_mtx); - task_scatterplot_samples << p->normalized_timestamp() << " " - << p->elapsed() << " " - << "'" << p->get_task_id()->get_name() << "'" << endl; + task_scatterplot_samples << p.normalized_timestamp() << " " + << p.elapsed() << " " + << "'" << p.get_task_id()->get_name() << "'" << endl; int loc0 = task_scatterplot_samples.tellp(); if (loc0 > 32768) { task_scatterplot_sample_file() << task_scatterplot_samples.rdbuf(); @@ -391,9 +396,9 @@ std::unordered_set free_profiles; } else { thread_instance::instance(false); std::unique_lock task_map_lock(_mtx); - counter_scatterplot_samples << p->normalized_timestamp() << " " - << p->elapsed() << " " - << "'" << p->get_task_id()->get_name() << "'" << endl; + counter_scatterplot_samples << p.normalized_timestamp() << " " + << p.elapsed() << " " + << "'" << p.get_task_id()->get_name() << "'" << endl; int loc0 = task_scatterplot_samples.tellp(); if (loc0 > 32768) { counter_scatterplot_sample_file() << counter_scatterplot_samples.rdbuf(); @@ -402,8 +407,8 @@ std::unordered_set free_profiles; } } } - if (apex_options::use_tasktree_output() && !p->is_counter && p->tt_ptr != nullptr) { - p->tt_ptr->tree_node->addAccumulated(p->elapsed_seconds(), p->is_resume); + if (apex_options::use_tasktree_output() && !p.is_counter && p.tt_ptr != nullptr) { + p.tt_ptr->tree_node->addAccumulated(p.elapsed_seconds(), p.is_resume); } return 1; } @@ -651,7 +656,7 @@ std::unordered_set free_profiles; #else queue_signal.post(); #endif -#endif +#endif // APEX_SYNCHRONOUS_PROCESSING // wait for profiles to update std::this_thread::sleep_for(std::chrono::microseconds(100)); total_time = get_profile(main_id); @@ -1392,7 +1397,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl // stop the main timer, and process that profile? yield_main_timer(); - push_profiler((unsigned int)thread_instance::get_id(), main_timer); + push_profiler((unsigned int)thread_instance::get_id(), *main_timer); // restart the main timer resume_main_timer(); @@ -1407,10 +1412,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl // synchronous_flush = false; #else queue_signal.post(); -#endif -#endif // wait until any other threads are done processing dependencies while(consumer_task_running.test_and_set(memory_order_acq_rel)) { } +#endif +#endif // APEX_SYNCHRONOUS_PROCESSING // output to screen? if ((apex_options::use_screen_output() && node_id == 0) || @@ -1483,8 +1488,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl if (data.reset) { reset_all(); } +#ifndef APEX_SYNCHRONOUS_PROCESSING // on_dump() releasing the "task_running" flag consumer_task_running.clear(memory_order_release); +#endif } void profiler_listener::on_reset(task_identifier * id) { @@ -1605,22 +1612,26 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl return true; } - inline void profiler_listener::push_profiler(int my_tid, - std::shared_ptr &p) { + inline void profiler_listener::push_profiler(int my_tid, profiler& p) { APEX_UNUSED(my_tid); // if we aren't processing profiler objects, just return. if (!apex_options::process_async_state()) { return; } #ifdef APEX_TRACE_APEX - if (p->get_task_id()->name == "apex::process_profiles") { return; } + if (p->get_task_id()->name == "apex::process_profiles_sync") { return; } #endif - -#ifdef APEX_SYNCHRONOUS_PROCESSING process_profile(p,0); - /* Now that we synchronously process, return! */ return; -#else - thequeue()->enqueue(p); + } + inline void profiler_listener::push_profiler(int my_tid, + std::shared_ptr &p) { + APEX_UNUSED(my_tid); + // if we aren't processing profiler objects, just return. + if (!apex_options::process_async_state()) { return; } +#ifdef APEX_TRACE_APEX + if (p->get_task_id()->name == "apex::process_profiles_async") { return; } +#endif + thequeue()->enqueue(p); #ifndef APEX_HAVE_HPX // Check to see if the consumer is already running, to avoid calling // "post" too frequently - it is rather costly. @@ -1634,8 +1645,6 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl apex_schedule_process_profiles(); } #endif - -#endif //APEX_SYNCHRONOUS_PROCESSING } /* Stop the timer, if applicable, and queue the profiler object */ @@ -1695,10 +1704,17 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl /* When a sample value is processed, save it as a profiler object, and queue it. */ void profiler_listener::on_sample_value(sample_value_event_data &data) { if (!_done) { + // don't make a shared pointer if not necessary! +#ifdef APEX_SYNCHRONOUS_PROCESSING + profiler p(task_identifier::get_task_id( + *data.counter_name), data.counter_value); + p.is_counter = data.is_counter; +#else // APEX_SYNCHRONOUS_PROCESSING std::shared_ptr p = std::make_shared(task_identifier::get_task_id( *data.counter_name), data.counter_value); p->is_counter = data.is_counter; +#endif // APEX_SYNCHRONOUS_PROCESSING push_profiler(my_tid, p); } } @@ -1722,8 +1738,13 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl /* Communication send event. Save the number of bytes. */ void profiler_listener::on_send(message_event_data &data) { if (!_done) { + // don't make a shared pointer if not necessary! +#ifdef APEX_SYNCHRONOUS_PROCESSING + profiler p(task_identifier::get_task_id("Bytes Sent"), (double)data.size); +#else // APEX_SYNCHRONOUS_PROCESSING std::shared_ptr p = std::make_shared( task_identifier::get_task_id("Bytes Sent"), (double)data.size); +#endif // APEX_SYNCHRONOUS_PROCESSING push_profiler(0, p); } } @@ -1731,8 +1752,13 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl /* Communication recv event. Save the number of bytes. */ void profiler_listener::on_recv(message_event_data &data) { if (!_done) { + // don't make a shared pointer if not necessary! +#ifdef APEX_SYNCHRONOUS_PROCESSING + profiler p(task_identifier::get_task_id("Bytes Received"), (double)data.size); +#else // APEX_SYNCHRONOUS_PROCESSING std::shared_ptr p = std::make_shared( task_identifier::get_task_id("Bytes Received"), (double)data.size); +#endif // APEX_SYNCHRONOUS_PROCESSING push_profiler(0, p); } } @@ -1752,8 +1778,13 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl } void profiler_listener::reset(task_identifier * id) { - std::shared_ptr p; - p = std::make_shared(id, false, reset_type::CURRENT); + // don't make a shared pointer if not necessary! +#ifdef APEX_SYNCHRONOUS_PROCESSING + profiler p(id, false, reset_type::CURRENT); +#else // APEX_SYNCHRONOUS_PROCESSING + std::shared_ptr p = + std::make_shared(id, false, reset_type::CURRENT); +#endif // APEX_SYNCHRONOUS_PROCESSING push_profiler(my_tid, p); } @@ -1767,7 +1798,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl delete consumer_thread; #endif #endif -#endif +#endif // APEX_SYNCHRONOUS_PROCESSING std::unique_lock queue_lock(queue_mtx); while (allqueues.size() > 0) { auto tmp = allqueues.back(); diff --git a/src/apex/profiler_listener.hpp b/src/apex/profiler_listener.hpp index 6fe063ce..1bcd1ceb 100644 --- a/src/apex/profiler_listener.hpp +++ b/src/apex/profiler_listener.hpp @@ -108,7 +108,7 @@ class profiler_listener : public event_listener { void schedule_process_profiles(void); #endif unsigned int process_profile(std::shared_ptr &p, unsigned int tid); - unsigned int process_profile(profiler* p, unsigned int tid); + unsigned int process_profile(profiler& p, unsigned int tid); unsigned int process_dependency(task_dependency* td); int node_id; std::mutex _mtx; @@ -117,6 +117,7 @@ class profiler_listener : public event_listener { void _common_stop(std::shared_ptr &p, bool is_yield); // internal, inline function void push_profiler(int my_tid, std::shared_ptr &p); + void push_profiler(int my_tid, profiler &p); std::unordered_map task_map; std::mutex _task_map_mutex; std::unordered_map