Skip to content

Commit

Permalink
Adding OTF2 support for CUDA!
Browse files Browse the repository at this point in the history
CUDA offloaded events are now supported in APEX when writing
out to OTF2.  Still to do - the stream "threads" need to
be annotated as GPU threads, and given device/context/stream labels.
  • Loading branch information
khuck committed Jul 30, 2020
1 parent f606f26 commit ce512e9
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 58 deletions.
24 changes: 15 additions & 9 deletions src/apex/activity_trace_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "thread_instance.hpp"
#include "apex_options.hpp"
#include "trace_event_listener.hpp"
#include "otf2_listener.hpp"

static void __attribute__((constructor)) initTrace(void);
//static void __attribute__((destructor)) flushTrace(void);
Expand Down Expand Up @@ -105,6 +106,11 @@ void store_profiler_data(const std::string &name, uint32_t correlationId,
(apex::trace_event_listener*)instance->the_trace_event_listener;
tel->on_async_event(device, context, stream, prof);
}
if (apex::apex_options::use_otf2()) {
apex::otf2_listener * tol =
(apex::otf2_listener*)instance->the_otf2_listener;
tol->on_async_event(device, context, stream, prof);
}

// have the listeners handle the end of this task
instance->complete_task(tt);
Expand Down Expand Up @@ -144,23 +150,23 @@ static const char * getMemcpyKindString(uint8_t kind)
{
switch (kind) {
case CUPTI_ACTIVITY_MEMCPY_KIND_HTOD:
return "Memcpy HtoD";
return "Memory copy HtoD";
case CUPTI_ACTIVITY_MEMCPY_KIND_DTOH:
return "Memcpy DtoH";
return "Memory copy DtoH";
case CUPTI_ACTIVITY_MEMCPY_KIND_HTOA:
return "Memcpy HtoA";
return "Memory copy HtoA";
case CUPTI_ACTIVITY_MEMCPY_KIND_ATOH:
return "Memcpy AtoH";
return "Memory copy AtoH";
case CUPTI_ACTIVITY_MEMCPY_KIND_ATOA:
return "Memcpy AtoA";
return "Memory copy AtoA";
case CUPTI_ACTIVITY_MEMCPY_KIND_ATOD:
return "Memcpy AtoD";
return "Memory copy AtoD";
case CUPTI_ACTIVITY_MEMCPY_KIND_DTOA:
return "Memcpy DtoA";
return "Memory copy DtoA";
case CUPTI_ACTIVITY_MEMCPY_KIND_DTOD:
return "Memcpy DtoD";
return "Memory copy DtoD";
case CUPTI_ACTIVITY_MEMCPY_KIND_HTOH:
return "Memcpy HtoH";
return "Memory copy HtoH";
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ void apex::_initialize()
#ifdef APEX_HAVE_OTF2
if (apex_options::use_otf2())
{
listeners.push_back(new otf2_listener());
the_otf2_listener = new otf2_listener();
listeners.push_back(the_otf2_listener);
}
#endif
if (apex_options::use_trace_event())
Expand Down
35 changes: 35 additions & 0 deletions src/apex/cuda_thread_node.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2014 University of Oregon
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

namespace apex {

class cuda_thread_node {
public:
uint32_t _device;
uint32_t _context;
uint32_t _stream;
cuda_thread_node(uint32_t device, uint32_t context, uint32_t stream) :
_device(device), _context(context), _stream(stream) { }
bool operator==(const cuda_thread_node &rhs) const {
return (_device==rhs._device &&
_context==rhs._context &&
_stream==rhs._stream);
}
bool operator<(const cuda_thread_node &rhs) const {
if (_device<rhs._device) {
return true;
} else if (_device == rhs._device && _context<rhs._context) {
return true;
} else if (_device == rhs._device && _context==rhs._context && _stream<rhs._stream) {
return true;
}
return false;
}
};

}

152 changes: 134 additions & 18 deletions src/apex/otf2_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,39 @@ using namespace std;

namespace apex {

uint32_t otf2_listener::make_vtid (uint32_t device, uint32_t context, uint32_t stream) {
cuda_thread_node tmp(device, context, stream);
size_t tid;
/* There is a potential for overlap here, but not a high potential. The CPU and the GPU
* would BOTH have to spawn 64k+ threads/streams for this to happen. */
if (vthread_map.count(tmp) == 0) {
// lock the archive lock, we need to make an event writer
write_lock_type lock(_archive_mutex);
// lock the set of thread IDs
_event_set_mutex.lock();
// get the next ID
uint32_t id = (uint32_t)_event_threads.size();
// reverse it, so as to avoid collisions with CPU threads
// uint32_t id_reversed = simple_reverse(id);
// insert it.
//std::cout << "GPU Inserting " << _event_threads.size() << std::endl;
_event_threads.insert(_event_threads.size());
// done with the set of event threads, so unlock.
_event_set_mutex.unlock();
// use the OTF2 thread index (not reversed) for the vthread_map
vthread_map.insert(std::pair<cuda_thread_node, size_t>(tmp,id));
// construct a globally unique ID for this thread on this rank
uint64_t my_node_id = my_saved_node_id;
my_node_id = (my_node_id << 32) + id;
// construct the event writer
OTF2_EvtWriter* evt_writer = OTF2_Archive_GetEvtWriter( archive, my_node_id );
// add it to the map of virtual thread IDs to event writers
vthread_evt_writer_map.insert(std::pair<size_t, OTF2_EvtWriter*>(id, evt_writer));
}
tid = vthread_map[tmp];
return tid;
}

OTF2_FlushCallbacks otf2_listener::flush_callbacks;

/* Stupid Intel compiler CLAIMS to be C++14, but doesn't have support for
Expand Down Expand Up @@ -354,14 +387,16 @@ namespace apex {
if (evt_writer == nullptr && create) {
// should already be locked by the "new thread" event.
uint64_t my_node_id = my_saved_node_id;
std::unique_lock<std::mutex> l(_event_set_mutex);
//my_node_id = (my_node_id << 32) + thread_instance::get_id();
my_node_id = (my_node_id << 32) + _event_threads.size();
evt_writer = OTF2_Archive_GetEvtWriter( archive, my_node_id );
if (thread_instance::get_id() == 0) {
comm_evt_writer = evt_writer;
}
std::unique_lock<std::mutex> l(_event_set_mutex);
_event_threads.insert(thread_instance::get_id());
//_event_threads.insert(thread_instance::get_id());
//std::cout << "CPU Inserting " << _event_threads.size() << std::endl;
_event_threads.insert(_event_threads.size());
// Are we closing an event writer?
} else if (!create) {
if (thread_instance::get_id() == 0) {
Expand All @@ -381,14 +416,14 @@ namespace apex {
return evt_writer;
}

bool otf2_listener::event_file_exists (int threadid) {
bool otf2_listener::event_file_exists (uint32_t threadid) {
// get exclusive access to the set - unlocks on exit
std::unique_lock<std::mutex> l(_event_set_mutex);
if (_event_threads.find(threadid) == _event_threads.end())
{return false;} else {return true;}
}

OTF2_DefWriter* otf2_listener::getDefWriter(int threadid) {
OTF2_DefWriter* otf2_listener::getDefWriter(uint32_t threadid) {
OTF2_DefWriter* def_writer;
//printf("creating definition writer for thread %d\n", threadid);
//fflush(stdout);
Expand All @@ -401,7 +436,7 @@ namespace apex {
/* constructor for the OTF2 listener class */
otf2_listener::otf2_listener (void) : _terminate(false),
_initialized(false), comm_evt_writer(nullptr),
global_def_writer(nullptr) {
global_def_writer(nullptr), dropped(0) {
/* get a start time for the trace */
globalOffset = get_time();
/* set the flusher */
Expand Down Expand Up @@ -539,19 +574,24 @@ namespace apex {
OTF2_GlobalDefWriter_WriteString( global_def_writer,
get_string_index(id), id.c_str() );
OTF2_Paradigm paradigm = OTF2_PARADIGM_USER;
OTF2_RegionRole role = OTF2_REGION_ROLE_TASK;
string uppercase;
convert_upper(id, uppercase);
size_t found = uppercase.find(string("APEX"));
// does the original string contain APEX?
size_t found = id.find(string("APEX"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_MEASUREMENT_SYSTEM;
role = OTF2_REGION_ROLE_ARTIFICIAL;
}
found = uppercase.find(string("UNRESOLVED"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_MEASUREMENT_SYSTEM;
role = OTF2_REGION_ROLE_ARTIFICIAL;
}
found = uppercase.find(string("OPENMP"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_OPENMP;
role = OTF2_REGION_ROLE_WRAPPER;
}
found = uppercase.find(string("PTHREAD"));
if (found != std::string::npos) {
Expand All @@ -560,14 +600,46 @@ namespace apex {
found = uppercase.find(string("MPI"));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_MPI;
role = OTF2_REGION_ROLE_WRAPPER;
}
// does the original string start with GPU:?
found = id.find(string("GPU: "));
if (found != std::string::npos) {
paradigm = OTF2_PARADIGM_CUDA;
role = OTF2_REGION_ROLE_FUNCTION;
found = id.find(string("Memory copy"));
if (found != std::string::npos) {
role = OTF2_REGION_ROLE_DATA_TRANSFER;
}
}
// does the original string start with cuda?
found = id.rfind(string("cuda"),0);
if (found == 0) {
paradigm = OTF2_PARADIGM_CUDA;
role = OTF2_REGION_ROLE_WRAPPER;
found = uppercase.find(string("MEMCPY"));
if (found != std::string::npos) {
role = OTF2_REGION_ROLE_DATA_TRANSFER;
}
found = uppercase.find(string("SYNC"));
if (found != std::string::npos) {
role = OTF2_REGION_ROLE_TASK_WAIT;
}
found = uppercase.find(string("MALLOC"));
if (found != std::string::npos) {
role = OTF2_REGION_ROLE_ALLOCATE;
}
found = uppercase.find(string("FREE"));
if (found != std::string::npos) {
role = OTF2_REGION_ROLE_DEALLOCATE;
}
}
OTF2_GlobalDefWriter_WriteRegion( global_def_writer,
idx /* id */,
get_string_index(id) /* region name */,
get_string_index(empty) /* alternative name */,
get_string_index(empty) /* description */,
(found != std::string::npos) ? OTF2_REGION_ROLE_ARTIFICIAL
: OTF2_REGION_ROLE_TASK,
role,
paradigm,
OTF2_REGION_FLAG_NONE,
get_string_index(empty) /* source file */,
Expand Down Expand Up @@ -625,12 +697,9 @@ namespace apex {
uint64_t map_size = global_region_indices.size();
OTF2_IdMap * my_map = OTF2_IdMap_CreateFromUint64Array(map_size,
mappings, false);
//for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
for (size_t i = 0 ; i < _event_threads.size() ; i++) {
//if (event_file_exists(i)) {
OTF2_DefWriter_WriteMappingTable(getDefWriter(i),
OTF2_MAPPING_REGION, my_map);
//}
OTF2_DefWriter_WriteMappingTable(getDefWriter(i),
OTF2_MAPPING_REGION, my_map);
}
// free the map
OTF2_IdMap_Free(my_map);
Expand All @@ -654,12 +723,9 @@ namespace apex {
uint64_t map_size = global_metric_indices.size();
OTF2_IdMap * my_map = OTF2_IdMap_CreateFromUint64Array(map_size,
mappings, false);
//for (int i = 0 ; i < thread_instance::get_num_threads() ; i++) {
for (size_t i = 0 ; i < _event_threads.size() ; i++) {
//if (event_file_exists(i)) {
OTF2_DefWriter_WriteMappingTable(getDefWriter(i),
OTF2_MAPPING_METRIC, my_map);
//}
OTF2_DefWriter_WriteMappingTable(getDefWriter(i),
OTF2_MAPPING_METRIC, my_map);
}
// free the map
OTF2_IdMap_Free(my_map);
Expand Down Expand Up @@ -1845,5 +1911,55 @@ namespace apex {

#endif

void otf2_listener::on_async_event(uint32_t device, uint32_t context,
uint32_t stream, std::shared_ptr<profiler> &p) {
uint32_t tid{make_vtid(device, context, stream)};
task_identifier * id = p->tt_ptr->get_task_id();
uint64_t idx = get_region_index(id);
//static map<uint32_t,std::string> last_p;
if (last_ts.count(tid) == 0) {
last_ts[tid] = 0ULL;
}
uint64_t last = last_ts[tid];
// not likely, but just in case...
if (_terminate) { return; }
/* validate the time stamp. CUPTI is notorious for giving out-of-order
* events, so make sure this one isn't before the previous. */
uint64_t stamp = 0L;
stamp = p->get_start_ns() - globalOffset;
if(stamp < last) {
dropped++;
/*
std::cerr << "APEX: Warning - Events delivered out of order on Device "
<< device << ", Context " << context << ", Stream " << stream
<< ".\nIgnoring event " << p->tt_ptr->task_id->get_name()
<< " with timestamp of " << stamp << " after last event "
<< "with timestamp of " << last << std::endl;
*/
return;
}
// don't close the archive on us!
read_lock_type lock(_archive_mutex);
// before we process the event, make sure the event write is open
OTF2_EvtWriter* local_evt_writer = vthread_evt_writer_map[tid];
if (local_evt_writer != nullptr) {
// create an attribute list
OTF2_AttributeList * al = OTF2_AttributeList_New();
// create an attribute
OTF2_AttributeList_AddUint64( al, 0, p->tt_ptr->guid );
OTF2_AttributeList_AddUint64( al, 1, p->tt_ptr->parent_guid );
OTF2_EC(OTF2_EvtWriter_Enter( local_evt_writer, al,
stamp, idx /* region */ ));
stamp = p->get_stop_ns() - globalOffset;
OTF2_EC(OTF2_EvtWriter_Leave( local_evt_writer, al,
stamp, idx /* region */ ));
last_ts[tid] = stamp;
//last_p[tid] = std::string(p->tt_ptr->task_id->get_name());
// delete the attribute list
OTF2_AttributeList_Delete(al);
}
return;

}

}
Loading

0 comments on commit ce512e9

Please sign in to comment.