Skip to content

Commit

Permalink
Working OpenMP Target Offload with tracing to GTrace and OTF2
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed May 4, 2022
1 parent 337816c commit 92f8b36
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 78 deletions.
36 changes: 25 additions & 11 deletions src/apex/apex_ompt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,28 @@ class Globals{

};

std::shared_ptr<apex::task_wrapper> start_async_task(const std::string &name, uint32_t correlationId) {
std::shared_ptr<apex::task_wrapper> start_async_task(const std::string &name, uint32_t correlationId, long unsigned int& parent_thread) {
apex::in_apex prevent_deadlocks;
// get the parent GUID, then erase the correlation from the map
std::shared_ptr<apex::task_wrapper> parent = nullptr;
if (correlationId > 0) {
parent = Globals::find_timer(correlationId);
if (parent != nullptr) {
parent_thread = parent->thread_id;
}
}
// create a task_wrapper, as a GPU child of the parent on the CPU side
std::shared_ptr<apex::task_wrapper> tt = apex::new_task(name, UINT64_MAX, parent);
return tt;
}

void stop_async_task(std::shared_ptr<apex::task_wrapper> tt, uint64_t start, uint64_t end,
uint32_t correlationId, apex::base_thread_node &node) {
uint32_t correlationId, apex::ompt_thread_node &node) {
// create an APEX profiler to store this data - we can't start
// then stop because we have timestamps already.
auto prof = std::make_shared<apex::profiler>(tt);
prof->set_start(start + Globals::delta());
prof->set_end(end + Globals::delta());
std::cout << __func__ << prof->get_start_ns() << " " << prof->get_stop_ns() << std::endl;
// important! Otherwise we might get the wrong end timestamp.
prof->stopped = true;
// Get the singleton APEX instance
Expand Down Expand Up @@ -190,7 +192,7 @@ void stop_async_task(std::shared_ptr<apex::task_wrapper> tt, uint64_t start, uin
}

void store_profiler_data(const std::string &name,
uint64_t start, uint64_t end, apex::base_thread_node &node,
uint64_t start, uint64_t end, apex::ompt_thread_node &node,
std::shared_ptr<apex::task_wrapper> parent, bool otf2_trace = true) {
apex::in_apex prevent_deadlocks;
apex::async_event_data as_data;
Expand Down Expand Up @@ -229,7 +231,7 @@ void store_profiler_data(const std::string &name,

/* Handle counters from asynchronous activity */
void store_counter_data(const char * name, const std::string& ctx,
uint64_t end, double value, apex::base_thread_node &node) {
uint64_t end, double value, apex::ompt_thread_node &node) {
apex::in_apex prevent_deadlocks;
std::stringstream ss;
if (name == nullptr) {
Expand Down Expand Up @@ -263,7 +265,7 @@ void store_counter_data(const char * name, const std::string& ctx,
}

void store_counter_data(const char * name, const std::string& ctx,
uint64_t end, size_t value, apex::base_thread_node &node) {
uint64_t end, size_t value, apex::ompt_thread_node &node) {
store_counter_data(name, ctx, end, (double)(value), node);
}

Expand All @@ -279,7 +281,9 @@ static void print_record_ompt(ompt_record_ompt_t *rec) {
static std::unordered_map<ompt_id_t,int> active_target_devices;
static std::unordered_map<uint32_t, std::shared_ptr<apex::task_wrapper>> target_map;
static std::unordered_map<uint32_t, uint64_t> target_start_times;
static std::unordered_map<uint32_t, uint64_t> target_parent_thread_ids;
static std::mutex target_lock;
long unsigned int parent_thread = 0;

switch (rec->type) {
case ompt_callback_target:
Expand All @@ -296,28 +300,31 @@ static void print_record_ompt(ompt_record_ompt_t *rec) {
ss << ": UNRESOLVED ADDR " << target_rec.codeptr_ra;
}
std::string name{ss.str()};
auto tt = start_async_task(name, rec->target_id);
auto tt = start_async_task(name, rec->target_id, parent_thread);
std::unique_lock<std::mutex> l(target_lock);
target_map[rec->target_id] = tt;
target_start_times[rec->target_id] = rec->time;
active_target_addrs[rec->target_id] = target_rec.codeptr_ra;
active_target_devices[rec->target_id] = target_rec.device_num;
target_parent_thread_ids[rec->target_id] = parent_thread;
} else if (target_rec.endpoint == ompt_scope_end) {
std::shared_ptr<apex::task_wrapper> tt;
uint64_t start;
{
std::unique_lock<std::mutex> l(target_lock);
tt = target_map[rec->target_id];
start = target_start_times[rec->target_id];
parent_thread = target_parent_thread_ids[rec->target_id];
active_target_addrs.erase(rec->target_id);
active_target_devices.erase(rec->target_id);
target_map.erase(rec->target_id);
target_start_times.erase(rec->target_id);
target_parent_thread_ids.erase(rec->target_id);
}
/* If we have a target region with a device id of -1, we might not get
a target region start event - so ignore this end event for now. */
if (tt != nullptr) {
apex::base_thread_node node(target_rec.device_num, APEX_ASYNC_KERNEL);
apex::ompt_thread_node node(target_rec.device_num, parent_thread, APEX_ASYNC_KERNEL);
stop_async_task(tt, start, rec->time, rec->target_id, node);
}
}
Expand All @@ -335,7 +342,6 @@ static void print_record_ompt(ompt_record_ompt_t *rec) {
target_data_op_rec.bytes, target_data_op_rec.end_time,
target_data_op_rec.end_time - rec->time,
target_data_op_rec.codeptr_ra);
apex::base_thread_node node(target_data_op_rec.dest_device_num, APEX_ASYNC_MEMORY);
std::stringstream ss;
ss << "GPU: OpenMP Target DataOp";
switch (target_data_op_rec.optype) {
Expand Down Expand Up @@ -386,11 +392,14 @@ static void print_record_ompt(ompt_record_ompt_t *rec) {
std::unique_lock<std::mutex> l(target_lock);
tt = target_map[rec->target_id];
codeptr_ra = active_target_addrs[rec->target_id];
parent_thread = target_parent_thread_ids[rec->target_id];
}
if (codeptr_ra != nullptr) {
ss << ": UNRESOLVED ADDR " << codeptr_ra;
}
std::string name{ss.str()};
apex::ompt_thread_node node(target_data_op_rec.dest_device_num,
parent_thread, APEX_ASYNC_MEMORY);
store_profiler_data(name, rec->time,
target_data_op_rec.end_time, node, tt);
store_counter_data("OpenMP Target DataOp", "Bytes", target_data_op_rec.end_time,
Expand All @@ -416,12 +425,14 @@ static void print_record_ompt(ompt_record_ompt_t *rec) {
tt = target_map[rec->target_id];
codeptr_ra = active_target_addrs[rec->target_id];
device_num = active_target_devices[rec->target_id];
parent_thread = target_parent_thread_ids[rec->target_id];
}
if (codeptr_ra != nullptr) {
ss << ": UNRESOLVED ADDR " << codeptr_ra;
}
std::string name{ss.str()};
apex::base_thread_node node(device_num, APEX_ASYNC_KERNEL);
apex::ompt_thread_node node(device_num, parent_thread,
APEX_ASYNC_KERNEL);
store_profiler_data(name, rec->time,
target_kernel_rec.end_time, node, tt);
break;
Expand All @@ -438,7 +449,7 @@ static void print_record_ompt(ompt_record_ompt_t *rec) {
// free is used for corresponding malloc
static void delete_buffer_ompt(ompt_buffer_t *buffer) {
free(buffer);
printf("Deallocated %p\n", buffer);
DEBUG_PRINT("Deallocated %p\n", buffer);
}

/* Function pointers. These are all queried from the runtime during
Expand Down Expand Up @@ -1228,12 +1239,15 @@ extern "C" void apex_ompt_work (
sprintf(regionIDstr, "OpenMP Work %s", tmp_str);
apex_ompt_start(regionIDstr, task_data, parallel_data, true);
}
APEX_UNUSED(count_type);
/*
if (apex::apex_options::ompt_high_overhead_events()) {
std::stringstream ss;
ss << count_type << ": " << regionIDstr;
std::string tmp{ss.str()};
apex::sample_value(tmp, count);
}
*/
} else {
DEBUG_PRINT("%" PRId64 ": %s End task: %p, region: %p\n", apex_threadid, tmp_str,
(void*)task_data, (void*)parallel_data);
Expand Down
111 changes: 58 additions & 53 deletions src/apex/async_thread_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,19 @@ namespace apex {
class base_thread_node {
public:
uint32_t _device;
apex_async_activity_t _activity;
base_thread_node(uint32_t device, apex_async_activity_t activity) :
_device(device), _activity(activity) { }
virtual bool operator==(const base_thread_node &rhs) const {
return (_device == rhs._device && _activity == rhs._activity);
}
virtual bool operator<(const base_thread_node &rhs) const {
if (_device<rhs._device) {
return true;
} else if (_device == rhs._device &&
_activity < rhs._activity && apex_options::use_otf2()) {
return true;
}
return false;
}
virtual std::string name () {
std::stringstream ss;
ss << "\"GPU [" << _device << "]";
std::string tmp{ss.str()};
return tmp;
}
};


class cuda_thread_node : public base_thread_node {
public:
uint32_t _context;
uint32_t _stream;
cuda_thread_node(uint32_t device, uint32_t context, uint32_t stream,
apex_async_activity_t _activity;
base_thread_node(uint32_t device, uint32_t context, uint32_t stream,
apex_async_activity_t activity) :
base_thread_node(device, activity),
_context(context), _stream(stream) { }
virtual bool operator==(const cuda_thread_node &rhs) const {
_device(device), _context(context), _stream(stream), _activity(activity) { }
virtual bool operator==(const base_thread_node &rhs) const {
return (_device == rhs._device &&
_context == rhs._context &&
_stream == rhs._stream &&
_activity == rhs._activity);
}
virtual bool operator<(const cuda_thread_node &rhs) const {
virtual bool operator<(const base_thread_node &rhs) const {
if (_device<rhs._device) {
return true;
} else if (_device == rhs._device && _context < rhs._context) {
Expand All @@ -72,43 +46,74 @@ namespace apex {
}
virtual std::string name () {
std::stringstream ss;
ss << "\"CUDA [" << _device << ":" << _context
ss << "GPU [" << _device << "]";
std::string tmp{ss.str()};
return tmp;
}
virtual uint32_t sortable_tid () {
uint32_t tid = ((_device+1) << 28);
return tid;
}
};

class ompt_thread_node : public base_thread_node {
public:
ompt_thread_node(uint32_t device, uint32_t thread,
apex_async_activity_t activity) :
base_thread_node(device, 0, thread, activity) { }
virtual std::string name () {
std::stringstream ss;
ss << "GPU [" << _device << ":" << _stream << "]";
std::string tmp{ss.str()};
printf("Device: %u, Thread: %u, string: %s\n", _device, _stream, tmp.c_str());
return tmp;
}
virtual uint32_t sortable_tid () {
uint32_t tid = ((_device+1) << 28);
tid = tid + _stream;
printf("Device: %u, Thread: %u, sort_index: %u\n", _device, _stream, tid);
return tid;
}
};


class cuda_thread_node : public base_thread_node {
public:
cuda_thread_node(uint32_t device, uint32_t context, uint32_t stream,
apex_async_activity_t activity) :
base_thread_node(device, context, stream, activity) { }
virtual std::string name () {
std::stringstream ss;
ss << "CUDA [" << _device << ":" << _context
<< ":" << std::setfill('0') << std::setw(5) << _stream << "]";
std::string tmp{ss.str()};
return tmp;
}
virtual uint32_t sortable_tid () {
uint32_t tid = ((_device+1) << 28);
tid = tid + (_context << 22);
tid = tid + _stream;
return tid;
}
};

class hip_thread_node : public base_thread_node {
public:
uint32_t _queue;
hip_thread_node(uint32_t device, uint32_t command_queue,
apex_async_activity_t activity) :
base_thread_node(device, activity),
_queue(command_queue) { }
virtual bool operator==(const hip_thread_node &rhs) const {
return (_device == rhs._device &&
_queue == rhs._queue &&
_activity == rhs._activity);
}
virtual bool operator<(const hip_thread_node &rhs) const {
if (_device<rhs._device) {
return true;
} else if (_device == rhs._device && _queue < rhs._queue) {
return true;
} else if (_device == rhs._device && _queue == rhs._queue &&
_activity < rhs._activity && apex_options::use_otf2()) {
return true;
}
return false;
}
base_thread_node(device, 0, command_queue, activity) { }
virtual std::string name () {
std::stringstream ss;
ss << "\"HIP [" << _device
<< ":" << std::setfill('0') << std::setw(5) << _queue << "]";
ss << "HIP [" << _device
<< ":" << std::setfill('0') << std::setw(5) << _stream << "]";
std::string tmp{ss.str()};
return tmp;
}
virtual uint32_t sortable_tid () {
uint32_t tid = ((_device+1) << 28);
tid = tid + _stream;
return tid;
}
};

}
Expand Down
23 changes: 14 additions & 9 deletions src/apex/otf2_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,12 +632,6 @@ namespace apex {
role = OTF2_REGION_ROLE_ARTIFICIAL;
return;
}
found = uppercase.find(string("OPENMP"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_OPENMP;
role = OTF2_REGION_ROLE_FUNCTION;
return;
}
found = uppercase.find(string("PTHREAD"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_PTHREAD;
Expand All @@ -655,26 +649,37 @@ namespace apex {
paradigm = OTF2_PARADIGM_CUDA;
role = OTF2_REGION_ROLE_FUNCTION;
found = uppercase.find(string("MEMCPY"));
if (found != std::string::npos) {
size_t found2 = uppercase.find(string("OPENMP TARGET DATAOP XFER"));
if (found != std::string::npos || found2 != std::string::npos) {
role = OTF2_REGION_ROLE_DATA_TRANSFER;
} else {
found = uppercase.find(string("SYNC"));
if (found != std::string::npos) {
role = OTF2_REGION_ROLE_TASK_WAIT;
} else {
found = uppercase.find(string("MEMSET"));
if (found != std::string::npos) {
found2 = uppercase.find(string("OPENMP TARGET DATAOP ALLOC"));
size_t found3 = uppercase.find(string("OPENMP TARGET DATAOP DELETE"));
if (found != std::string::npos ||
found2 != std::string::npos || found3 != std::string::npos) {
role = OTF2_REGION_ROLE_DATA_TRANSFER;
} else {
found = uppercase.find(string("STREAM WAIT"));
if (found != std::string::npos) {
found2 = uppercase.find(string("OPENMP TARGET:"));
if (found != std::string::npos || found2 != std::string::npos) {
role = OTF2_REGION_ROLE_TASK_WAIT;
}
}
}
}
return;
}
found = uppercase.find(string("OPENMP"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_OPENMP;
role = OTF2_REGION_ROLE_FUNCTION;
return;
}
// does the original string start with cu or cuda?
if (apex_options::use_cuda_driver_api()) {
found = id.rfind(string("cu"),0);
Expand Down
8 changes: 3 additions & 5 deletions src/apex/trace_event_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,14 @@ std::string trace_event_listener::make_tid (base_thread_node &node) {
/* There is a potential for overlap here, but not a high potential. The CPU and the GPU
* would BOTH have to spawn 64k+ threads/streams for this to happen. */
if (vthread_map.count(node) == 0) {
size_t id = vthread_map.size()+1;
//uint32_t id_reversed = simple_reverse(id);
uint32_t id_shifted = id << 16;
uint32_t id_shifted = node.sortable_tid();
vthread_map.insert(std::pair<base_thread_node, size_t>(node,id_shifted));
std::stringstream ss;
ss << fixed;
ss << "{\"name\":\"thread_name\""
<< ",\"ph\":\"M\",\"pid\":" << saved_node_id
<< ",\"tid\":" << id_shifted
<< ",\"args\":{\"name\":";
<< ",\"args\":{\"name\":\"";
ss << node.name();
//ss << "" << activity_to_string(node._activity);
ss << "\"";
Expand All @@ -270,7 +268,7 @@ std::string trace_event_listener::make_tid (base_thread_node &node) {
ss << "{\"name\":\"thread_sort_index\""
<< ",\"ph\":\"M\",\"pid\":" << saved_node_id
<< ",\"tid\":" << id_shifted
<< ",\"args\":{\"sort_index\":" << UINT32_MAX << "}},\n";
<< ",\"args\":{\"sort_index\":" << id_shifted << "}},\n";
write_to_trace(ss);
}
tid = vthread_map[node];
Expand Down

0 comments on commit 92f8b36

Please sign in to comment.