Skip to content

Commit

Permalink
Don't use shared pointers when doing synchronous processing
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Aug 9, 2021
1 parent 11d281c commit e335d24
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 58 deletions.
145 changes: 88 additions & 57 deletions src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,15 @@ std::unordered_set<profile*> free_profiles;
unsigned int profiler_listener::process_profile(
std::shared_ptr<profiler> &p, unsigned int tid)
{
APEX_UNUSED(tid);
if(p == nullptr) return 0;
return process_profile(*p,tid);
}

unsigned int profiler_listener::process_profile(profiler& p, unsigned int tid)
{
APEX_UNUSED(tid);
profile * theprofile;
if(p->is_reset == reset_type::ALL) {
if(p.is_reset == reset_type::ALL) {
reset_all();
return 0;
}
Expand All @@ -269,30 +274,30 @@ std::unordered_set<profile*> free_profiles;
#if APEX_HAVE_PAPI
tmp_num_counters = num_papi_counters;
for (int i = 0 ; i < num_papi_counters ; i++) {
if (p->papi_stop_values[i] > p->papi_start_values[i]) {
values[i] = p->papi_stop_values[i] - p->papi_start_values[i];
if (p.papi_stop_values[i] > p.papi_start_values[i]) {
values[i] = p.papi_stop_values[i] - p.papi_start_values[i];
} else {
values[i] = 0.0;
}
}
#endif
std::unique_lock<std::mutex> task_map_lock(_task_map_mutex);
unordered_map<task_identifier, profile*>::const_iterator it =
task_map.find(*(p->get_task_id()));
task_map.find(*(p.get_task_id()));
if (it != task_map.end()) {
// A profile for this ID already exists.
theprofile = (*it).second;
task_map_lock.unlock();
if(p->is_reset == reset_type::CURRENT) {
if(p.is_reset == reset_type::CURRENT) {
theprofile->reset();
} else {
if (apex_options::track_memory()) {
theprofile->increment(p->elapsed(), tmp_num_counters,
values, p->allocations, p->frees, p->bytes_allocated,
p->bytes_freed, p->is_resume);
theprofile->increment(p.elapsed(), tmp_num_counters,
values, p.allocations, p.frees, p.bytes_allocated,
p.bytes_freed, p.is_resume);
} else {
theprofile->increment(p->elapsed(), tmp_num_counters,
values, p->is_resume);
theprofile->increment(p.elapsed(), tmp_num_counters,
values, p.is_resume);
}
}
#if defined(APEX_THROTTLE)
Expand All @@ -304,22 +309,22 @@ std::unordered_set<profile*> free_profiles;
unordered_set<task_identifier>::const_iterator it2;
{
read_lock_type l(throttled_event_set_mutex);
it2 = throttled_tasks.find(*(p->get_task_id()));
it2 = throttled_tasks.find(*(p.get_task_id()));
}
if (it2 == throttled_tasks.end()) {
// lock the set for insert
{
write_lock_type l(throttled_event_set_mutex);
// was it inserted when we were waiting?
it2 = throttled_tasks.find(*(p->get_task_id()));
it2 = throttled_tasks.find(*(p.get_task_id()));
// no? OK - insert it.
if (it2 == throttled_tasks.end()) {
throttled_tasks.insert(*(p->get_task_id()));
throttled_tasks.insert(*(p.get_task_id()));
}
}
if (apex_options::use_verbose()) {
cout << "APEX: disabling lightweight timer "
<< p->get_task_id()->get_name()
<< p.get_task_id()->get_name()
<< endl;
fflush(stdout);
}
Expand All @@ -329,33 +334,33 @@ std::unordered_set<profile*> free_profiles;
#endif
} else {
// Create a new profile for this name.
if (apex_options::track_memory() && !p->is_counter) {
theprofile = new profile(p->is_reset ==
reset_type::CURRENT ? 0.0 : p->elapsed(),
tmp_num_counters, values, p->is_resume,
p->allocations, p->frees, p->bytes_allocated,
p->bytes_freed);
task_map[*(p->get_task_id())] = theprofile;
if (apex_options::track_memory() && !p.is_counter) {
theprofile = new profile(p.is_reset ==
reset_type::CURRENT ? 0.0 : p.elapsed(),
tmp_num_counters, values, p.is_resume,
p.allocations, p.frees, p.bytes_allocated,
p.bytes_freed);
task_map[*(p.get_task_id())] = theprofile;
} else {
theprofile = new profile(p->is_reset ==
reset_type::CURRENT ? 0.0 : p->elapsed(),
tmp_num_counters, values, p->is_resume,
p->is_counter ? APEX_COUNTER : APEX_TIMER);
task_map[*(p->get_task_id())] = theprofile;
theprofile = new profile(p.is_reset ==
reset_type::CURRENT ? 0.0 : p.elapsed(),
tmp_num_counters, values, p.is_resume,
p.is_counter ? APEX_COUNTER : APEX_TIMER);
task_map[*(p.get_task_id())] = theprofile;
}
task_map_lock.unlock();
#ifdef APEX_HAVE_HPX
#ifdef APEX_REGISTER_HPX3_COUNTERS
if(!_done) {
if(get_hpx_runtime_ptr() != nullptr &&
p->get_task_id()->has_name()) {
std::string timer_name(p->get_task_id()->get_name());
p.get_task_id()->has_name()) {
std::string timer_name(p.get_task_id()->get_name());
//Don't register timers containing "/"
if(timer_name.find("/") == std::string::npos) {
hpx::performance_counters::install_counter_type(
std::string("/apex/") + timer_name,
[p](bool r)->std::int64_t{
std::int64_t value(p->elapsed());
std::int64_t value(p.elapsed());
return value;
},
std::string("APEX counter ") + timer_name,
Expand All @@ -371,16 +376,16 @@ std::unordered_set<profile*> free_profiles;
}
/* write the sample to the file */
if (apex_options::task_scatterplot()) {
if (!p->is_counter) {
if (!p.is_counter) {
static int thresh = std::round((double)(RAND_MAX) * apex_options::scatterplot_fraction());
if (std::rand() < thresh) {
/* before calling p->get_task_id()->get_name(), make sure we create
/* before calling p.get_task_id()->get_name(), make sure we create
* a thread_instance object that is NOT a worker. */
thread_instance::instance(false);
std::unique_lock<std::mutex> task_map_lock(_mtx);
task_scatterplot_samples << p->normalized_timestamp() << " "
<< p->elapsed() << " "
<< "'" << p->get_task_id()->get_name() << "'" << endl;
task_scatterplot_samples << p.normalized_timestamp() << " "
<< p.elapsed() << " "
<< "'" << p.get_task_id()->get_name() << "'" << endl;
int loc0 = task_scatterplot_samples.tellp();
if (loc0 > 32768) {
task_scatterplot_sample_file() << task_scatterplot_samples.rdbuf();
Expand All @@ -391,9 +396,9 @@ std::unordered_set<profile*> free_profiles;
} else {
thread_instance::instance(false);
std::unique_lock<std::mutex> task_map_lock(_mtx);
counter_scatterplot_samples << p->normalized_timestamp() << " "
<< p->elapsed() << " "
<< "'" << p->get_task_id()->get_name() << "'" << endl;
counter_scatterplot_samples << p.normalized_timestamp() << " "
<< p.elapsed() << " "
<< "'" << p.get_task_id()->get_name() << "'" << endl;
int loc0 = task_scatterplot_samples.tellp();
if (loc0 > 32768) {
counter_scatterplot_sample_file() << counter_scatterplot_samples.rdbuf();
Expand All @@ -402,8 +407,8 @@ std::unordered_set<profile*> free_profiles;
}
}
}
if (apex_options::use_tasktree_output() && !p->is_counter && p->tt_ptr != nullptr) {
p->tt_ptr->tree_node->addAccumulated(p->elapsed_seconds(), p->is_resume);
if (apex_options::use_tasktree_output() && !p.is_counter && p.tt_ptr != nullptr) {
p.tt_ptr->tree_node->addAccumulated(p.elapsed_seconds(), p.is_resume);
}
return 1;
}
Expand Down Expand Up @@ -651,7 +656,7 @@ std::unordered_set<profile*> free_profiles;
#else
queue_signal.post();
#endif
#endif
#endif // APEX_SYNCHRONOUS_PROCESSING
// wait for profiles to update
std::this_thread::sleep_for(std::chrono::microseconds(100));
total_time = get_profile(main_id);
Expand Down Expand Up @@ -1392,7 +1397,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl

// stop the main timer, and process that profile?
yield_main_timer();
push_profiler((unsigned int)thread_instance::get_id(), main_timer);
push_profiler((unsigned int)thread_instance::get_id(), *main_timer);
// restart the main timer
resume_main_timer();

Expand All @@ -1407,10 +1412,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
// synchronous_flush = false;
#else
queue_signal.post();
#endif
#endif
// wait until any other threads are done processing dependencies
while(consumer_task_running.test_and_set(memory_order_acq_rel)) { }
#endif
#endif // APEX_SYNCHRONOUS_PROCESSING

// output to screen?
if ((apex_options::use_screen_output() && node_id == 0) ||
Expand Down Expand Up @@ -1483,8 +1488,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
if (data.reset) {
reset_all();
}
#ifndef APEX_SYNCHRONOUS_PROCESSING
// on_dump() releasing the "task_running" flag
consumer_task_running.clear(memory_order_release);
#endif
}

void profiler_listener::on_reset(task_identifier * id) {
Expand Down Expand Up @@ -1605,22 +1612,26 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
return true;
}

inline void profiler_listener::push_profiler(int my_tid,
std::shared_ptr<profiler> &p) {
inline void profiler_listener::push_profiler(int my_tid, profiler& p) {
APEX_UNUSED(my_tid);
// if we aren't processing profiler objects, just return.
if (!apex_options::process_async_state()) { return; }
#ifdef APEX_TRACE_APEX
if (p->get_task_id()->name == "apex::process_profiles") { return; }
if (p->get_task_id()->name == "apex::process_profiles_sync") { return; }
#endif

#ifdef APEX_SYNCHRONOUS_PROCESSING
process_profile(p,0);
/* Now that we synchronously process, return! */
return;
#else
thequeue()->enqueue(p);
}

inline void profiler_listener::push_profiler(int my_tid,
std::shared_ptr<profiler> &p) {
APEX_UNUSED(my_tid);
// if we aren't processing profiler objects, just return.
if (!apex_options::process_async_state()) { return; }
#ifdef APEX_TRACE_APEX
if (p->get_task_id()->name == "apex::process_profiles_async") { return; }
#endif
thequeue()->enqueue(p);
#ifndef APEX_HAVE_HPX
// Check to see if the consumer is already running, to avoid calling
// "post" too frequently - it is rather costly.
Expand All @@ -1634,8 +1645,6 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
apex_schedule_process_profiles();
}
#endif

#endif //APEX_SYNCHRONOUS_PROCESSING
}

/* Stop the timer, if applicable, and queue the profiler object */
Expand Down Expand Up @@ -1695,10 +1704,17 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
/* When a sample value is processed, save it as a profiler object, and queue it. */
void profiler_listener::on_sample_value(sample_value_event_data &data) {
if (!_done) {
// don't make a shared pointer if not necessary!
#ifdef APEX_SYNCHRONOUS_PROCESSING
profiler p(task_identifier::get_task_id(
*data.counter_name), data.counter_value);
p.is_counter = data.is_counter;
#else // APEX_SYNCHRONOUS_PROCESSING
std::shared_ptr<profiler> p =
std::make_shared<profiler>(task_identifier::get_task_id(
*data.counter_name), data.counter_value);
p->is_counter = data.is_counter;
#endif // APEX_SYNCHRONOUS_PROCESSING
push_profiler(my_tid, p);
}
}
Expand All @@ -1722,17 +1738,27 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
/* Communication send event. Save the number of bytes. */
void profiler_listener::on_send(message_event_data &data) {
if (!_done) {
// don't make a shared pointer if not necessary!
#ifdef APEX_SYNCHRONOUS_PROCESSING
profiler p(task_identifier::get_task_id("Bytes Sent"), (double)data.size);
#else // APEX_SYNCHRONOUS_PROCESSING
std::shared_ptr<profiler> p = std::make_shared<profiler>(
task_identifier::get_task_id("Bytes Sent"), (double)data.size);
#endif // APEX_SYNCHRONOUS_PROCESSING
push_profiler(0, p);
}
}

/* Communication recv event. Save the number of bytes. */
void profiler_listener::on_recv(message_event_data &data) {
if (!_done) {
// don't make a shared pointer if not necessary!
#ifdef APEX_SYNCHRONOUS_PROCESSING
profiler p(task_identifier::get_task_id("Bytes Received"), (double)data.size);
#else // APEX_SYNCHRONOUS_PROCESSING
std::shared_ptr<profiler> p = std::make_shared<profiler>(
task_identifier::get_task_id("Bytes Received"), (double)data.size);
#endif // APEX_SYNCHRONOUS_PROCESSING
push_profiler(0, p);
}
}
Expand All @@ -1752,8 +1778,13 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
}

void profiler_listener::reset(task_identifier * id) {
std::shared_ptr<profiler> p;
p = std::make_shared<profiler>(id, false, reset_type::CURRENT);
// don't make a shared pointer if not necessary!
#ifdef APEX_SYNCHRONOUS_PROCESSING
profiler p(id, false, reset_type::CURRENT);
#else // APEX_SYNCHRONOUS_PROCESSING
std::shared_ptr<profiler> p =
std::make_shared<profiler>(id, false, reset_type::CURRENT);
#endif // APEX_SYNCHRONOUS_PROCESSING
push_profiler(my_tid, p);
}

Expand All @@ -1767,7 +1798,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
delete consumer_thread;
#endif
#endif
#endif
#endif // APEX_SYNCHRONOUS_PROCESSING
std::unique_lock<std::mutex> queue_lock(queue_mtx);
while (allqueues.size() > 0) {
auto tmp = allqueues.back();
Expand Down
3 changes: 2 additions & 1 deletion src/apex/profiler_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class profiler_listener : public event_listener {
void schedule_process_profiles(void);
#endif
unsigned int process_profile(std::shared_ptr<profiler> &p, unsigned int tid);
unsigned int process_profile(profiler* p, unsigned int tid);
unsigned int process_profile(profiler& p, unsigned int tid);
unsigned int process_dependency(task_dependency* td);
int node_id;
std::mutex _mtx;
Expand All @@ -117,6 +117,7 @@ class profiler_listener : public event_listener {
void _common_stop(std::shared_ptr<profiler> &p,
bool is_yield); // internal, inline function
void push_profiler(int my_tid, std::shared_ptr<profiler> &p);
void push_profiler(int my_tid, profiler &p);
std::unordered_map<task_identifier, profile*> task_map;
std::mutex _task_map_mutex;
std::unordered_map<task_identifier, std::unordered_map<task_identifier,
Expand Down

0 comments on commit e335d24

Please sign in to comment.