Skip to content

Commit

Permalink
Merge branch 'develop' of git.nic.uoregon.edu:/gitroot/xpress-apex in…
Browse files Browse the repository at this point in the history
…to develop
  • Loading branch information
khuck committed Nov 10, 2018
2 parents 3a0ccbc + 8eb6feb commit 8d210fe
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 18 deletions.
39 changes: 24 additions & 15 deletions src/apex/otf2_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ namespace apex {
if (evt_writer == nullptr && create) {
// should already be locked by the "new thread" event.
uint64_t my_node_id = my_saved_node_id;
my_node_id = (my_node_id << 32) + thread_instance::get_id();
//my_node_id = (my_node_id << 32) + thread_instance::get_id();
my_node_id = (my_node_id << 32) + _event_threads.size();
evt_writer = OTF2_Archive_GetEvtWriter( archive, my_node_id );
if (thread_instance::get_id() == 0) {
comm_evt_writer = evt_writer;
Expand Down Expand Up @@ -570,11 +571,12 @@ namespace apex {
uint64_t map_size = global_region_indices.size();
OTF2_IdMap * my_map = OTF2_IdMap_CreateFromUint64Array(map_size,
mappings, false);
for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
if (event_file_exists(i)) {
//for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
for (size_t i = 0 ; i < _event_threads.size() ; i++) {
//if (event_file_exists(i)) {
OTF2_DefWriter_WriteMappingTable(getDefWriter(i),
OTF2_MAPPING_REGION, my_map);
}
//}
}
// free the map
OTF2_IdMap_Free(my_map);
Expand All @@ -598,11 +600,12 @@ namespace apex {
uint64_t map_size = global_metric_indices.size();
OTF2_IdMap * my_map = OTF2_IdMap_CreateFromUint64Array(map_size,
mappings, false);
for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
if (event_file_exists(i)) {
//for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
for (size_t i = 0 ; i < _event_threads.size() ; i++) {
//if (event_file_exists(i)) {
OTF2_DefWriter_WriteMappingTable(getDefWriter(i),
OTF2_MAPPING_METRIC, my_map);
}
//}
}
// free the map
OTF2_IdMap_Free(my_map);
Expand Down Expand Up @@ -717,7 +720,8 @@ namespace apex {
/* if we are node 0, write the global definitions */
if (my_saved_node_id == 0) {
// save my number of threads
rank_thread_map[0] = thread_instance::get_num_threads();
//rank_thread_map[0] = thread_instance::get_num_threads();
rank_thread_map[0] = _event_threads.size();
std::cout << "Writing OTF2 definition files..." << std::endl;
// make a common list of regions and metrics across all nodes...
reduce_regions();
Expand Down Expand Up @@ -800,12 +804,13 @@ namespace apex {
// write out the counter names we saw
reduce_metrics();
}
for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
//for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
for (size_t i = 0 ; i < _event_threads.size() ; i++) {
/* close (and possibly create) the definition files */
if (event_file_exists(i)) {
//if (event_file_exists(i)) {
OTF2_EC(OTF2_Archive_CloseDefWriter( archive,
getDefWriter(i) ));
}
//}
}
if (my_saved_node_id == 0) {
std::cout << "Closing the archive..." << std::endl;
Expand Down Expand Up @@ -1185,7 +1190,8 @@ namespace apex {
std::string otf2_listener::write_my_regions(void) {
stringstream region_file;
// first, output our number of threads.
region_file << thread_instance::get_num_threads() << endl;
//region_file << thread_instance::get_num_threads() << endl;
region_file << _event_threads.size() << endl;
// then iterate over the regions and write them out.
for (auto const &i : global_region_indices) {
task_identifier id = i.first;
Expand Down Expand Up @@ -1324,7 +1330,8 @@ namespace apex {
std::string otf2_listener::write_my_metrics(void) {
stringstream metric_file;
// first, output our number of threads.
metric_file << thread_instance::get_num_threads() << endl;
//metric_file << thread_instance::get_num_threads() << endl;
metric_file << _event_threads.size() << endl;
// then iterate over the metrics and write them out.
for (auto const &i : global_metric_indices) {
string id = i.first;
Expand Down Expand Up @@ -1513,7 +1520,8 @@ namespace apex {
region_filename << region_filename_prefix << my_saved_node_id;
ofstream region_file(region_filename.str(), ios::out | ios::trunc );
// first, output our number of threads.
region_file << thread_instance::get_num_threads() << endl;
//region_file << thread_instance::get_num_threads() << endl;
region_file << _event_threads.size() << endl;
// then iterate over the regions and write them out.
for (auto const &i : global_region_indices) {
task_identifier id = i.first;
Expand Down Expand Up @@ -1650,7 +1658,8 @@ namespace apex {
metric_filename << metric_filename_prefix << my_saved_node_id;
ofstream metric_file(metric_filename.str(), ios::out | ios::trunc );
// first, output our number of threads.
metric_file << thread_instance::get_num_threads() << endl;
//metric_file << thread_instance::get_num_threads() << endl;
metric_file << _event_threads.size() << endl;
// then iterate over the metrics and write them out.
for (auto const &i : global_metric_indices) {
string id = i.first;
Expand Down
4 changes: 2 additions & 2 deletions src/apex/otf2_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "event_listener.hpp"
#include <otf2/otf2.h>
#include <map>
#include <unordered_set>
#include <set>
#include <string>
#include <tuple>
#include <memory>
Expand All @@ -29,7 +29,7 @@ namespace apex {
std::mutex _metric_mutex;
std::mutex _comm_mutex;
std::mutex _event_set_mutex;
std::unordered_set<int> _event_threads;
std::set<int> _event_threads;
/* this is a reader/writer lock. Don't close the archive
* if other threads are writing to it. but allow concurrent
* access from the writer threads. */
Expand Down
2 changes: 1 addition & 1 deletion src/apex/thread_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class thread_instance {
static APEX_NATIVE_TLS thread_instance * _instance;
// constructor
thread_instance (bool is_worker) :
_id(0), _id_reversed(UINTMAX_MAX), _runtime_id(-1),
_id(-1), _id_reversed(UINTMAX_MAX), _runtime_id(-1),
_top_level_timer_name(), _is_worker(is_worker), _task_count(0) {
_instance = nullptr;
};
Expand Down
1 change: 1 addition & 0 deletions src/unit_tests/C++/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(example_programs
apex_hpx_task_wrapper_direct_actions
apex_hpx_annotated_functions
apex_profiler_guids
apex_non_worker_thread
)
#apex_set_thread_cap
#apex_setup_power_cap_throttling
Expand Down
73 changes: 73 additions & 0 deletions src/unit_tests/C++/apex_non_worker_thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include "apex_api.hpp"
#include <unistd.h>
#include <stdio.h>
#include <thread>
#include <string>
#include "thread_instance.hpp"

#define MAX_OUTER 50
#define MAX_INNER 50
#define MAX_THREADS 8

uint64_t func(uint64_t i) {
char name[128];
sprintf(name, "func %lu", i);
apex::profiler* p = apex::start(std::string(name));
uint64_t j = i * i;
apex::stop(p);
return j;
}

uint64_t foo(uint64_t i) {
uint64_t j=0;
apex::register_thread(__func__);
apex::profiler* p = apex::start((apex_function_address)(&foo));
for (uint64_t x = 0 ; x < MAX_OUTER ; x++) {
for (uint64_t y = 0 ; y < MAX_INNER ; y++) {
j += func(x) * func(y) + i;
}
}
apex::stop(p);
return j;
}

// no timer!
uint64_t bar(uint64_t i) {
// ask for a thread instance, as a test.
//
//apex::thread_instance::instance(false);
// create a task, but don't start a timer.
apex::new_task((apex_function_address)&bar);
uint64_t j=0;
for (uint64_t x = 0 ; x < MAX_OUTER ; x++) {
for (uint64_t y = 0 ; y < MAX_INNER ; y++) {
j += (x*x) * (y*y) + i;
}
}
return j;
}

int main (int argc, char** argv) {
apex::init("apex_start unit test", 0, 1);
apex::profiler* p = apex::start((apex_function_address)&main);
uint64_t i = 0;
std::thread threads[MAX_THREADS];
for (i = 0 ; i < MAX_THREADS ; i++) {
//j += foo(i);
if (i % 2 == 0) {
// create a worker thread
threads[i] = std::thread(foo,i);
} else {
// create a non-worker thread
threads[i] = std::thread(bar,i);
}
}
for (i = 0 ; i < MAX_THREADS ; i++) {
threads[i].join();
}
apex::stop(p);
apex::finalize();
apex::cleanup();
return 0;
}

0 comments on commit 8d210fe

Please sign in to comment.