diff --git a/src/apex/apex_ompt.cpp b/src/apex/apex_ompt.cpp index 26982128..d07e01c0 100644 --- a/src/apex/apex_ompt.cpp +++ b/src/apex/apex_ompt.cpp @@ -19,12 +19,19 @@ #include #include "apex_assert.h" #include "inttypes.h" +#include "event_listener.hpp" +#include "async_thread_node.hpp" +#include "apex.hpp" +#include "trace_event_listener.hpp" +#include "otf2_listener.hpp" std::mutex apex_apex_threadid_mutex; std::atomic apex_numthreads{0}; APEX_NATIVE_TLS int64_t apex_threadid{-1}; static std::atomic enabled{false}; +constexpr size_t apex_ompt_buffer_request_size{16*1024}; + class linked_timer { public: void * prev; @@ -58,10 +65,380 @@ class linked_timer { } }; +/* This class is necessary so we can clean up before our globals are destroyed at exit */ +class Globals{ +private: + Globals() : deltaTimestamp(0) {} // Disallow instantiation outside of the class. + // Timestamp at trace initialization time. Used to normalized other + // timestamps + int64_t deltaTimestamp; + std::mutex correlation_map_mutex; + std::unordered_map> timer_map; + std::unordered_map data_map; +public: + Globals(const Globals&) = delete; + Globals& operator=(const Globals &) = delete; + Globals(Globals &&) = delete; + Globals & operator=(Globals &&) = delete; + // if our globals are destroyed before APEX can finalize, then finalize now! */ + ~Globals() { + apex::finalize(); + } + + static auto& instance(){ + static Globals test; + return test; + } + static int64_t& delta() { + return instance().deltaTimestamp; + } + + void lock() { correlation_map_mutex.lock(); } + void unlock() { correlation_map_mutex.unlock(); } + + static void insert_timer(uint32_t id, std::shared_ptr timer) { + instance().correlation_map_mutex.lock(); + instance().timer_map[id] = timer; + instance().correlation_map_mutex.unlock(); + } + + static std::shared_ptr find_timer(uint32_t id) { + std::shared_ptr timer = nullptr; + Globals& g = instance(); + g.lock(); + auto iter = g.timer_map.find(id); + if (iter != g.timer_map.end()) { + timer = iter->second; + } + g.timer_map.erase(id); + g.unlock(); + return timer; + } + + static void insert_data(uint32_t id, apex::async_event_data& data) { + instance().correlation_map_mutex.lock(); + instance().data_map[id] = data; + instance().correlation_map_mutex.unlock(); + } + + static apex::async_event_data find_data(uint32_t id) { + apex::async_event_data as_data; + Globals& g = instance(); + g.lock(); + auto iter = g.data_map.find(id); + if (iter != g.data_map.end()) { + as_data = iter->second; + } + g.data_map.erase(id); + g.unlock(); + return as_data; + } + +}; + +std::shared_ptr start_async_task(const std::string &name, uint32_t correlationId) { + apex::in_apex prevent_deadlocks; + // get the parent GUID, then erase the correlation from the map + std::shared_ptr parent = nullptr; + if (correlationId > 0) { + parent = Globals::find_timer(correlationId); + } + // create a task_wrapper, as a GPU child of the parent on the CPU side + std::shared_ptr tt = apex::new_task(name, UINT64_MAX, parent); + return tt; +} + +void stop_async_task(std::shared_ptr tt, uint64_t start, uint64_t end, + uint32_t correlationId, apex::base_thread_node &node) { + // create an APEX profiler to store this data - we can't start + // then stop because we have timestamps already. + auto prof = std::make_shared(tt); + prof->set_start(start + Globals::delta()); + prof->set_end(end + Globals::delta()); + std::cout << __func__ << prof->get_start_ns() << " " << prof->get_stop_ns() << std::endl; + // important! Otherwise we might get the wrong end timestamp. + prof->stopped = true; + // Get the singleton APEX instance + static apex::apex* instance = apex::apex::instance(); + // fake out the profiler_listener + instance->the_profiler_listener->push_profiler_public(prof); + // Handle tracing, if necessary + apex::async_event_data as_data; + if (correlationId > 0) { + as_data = Globals::find_data(correlationId); + } + if (apex::apex_options::use_trace_event()) { + apex::trace_event_listener * tel = + (apex::trace_event_listener*)instance->the_trace_event_listener; + as_data.cat = "ControlFlow"; + as_data.reverse_flow = false; + tel->on_async_event(node, prof, as_data); + } +#ifdef APEX_HAVE_OTF2 + if (apex::apex_options::use_otf2()) { + apex::otf2_listener * tol = + (apex::otf2_listener*)instance->the_otf2_listener; + tol->on_async_event(node, prof); + } +#else + APEX_UNUSED(otf2_trace); +#endif + // have the listeners handle the end of this task + instance->complete_task(tt); +} + +void store_profiler_data(const std::string &name, + uint64_t start, uint64_t end, apex::base_thread_node &node, + std::shared_ptr parent, bool otf2_trace = true) { + apex::in_apex prevent_deadlocks; + apex::async_event_data as_data; + as_data.flow = false; + // create a task_wrapper, as a GPU child of the parent on the CPU side + auto tt = apex::new_task(name, UINT64_MAX, parent); + // create an APEX profiler to store this data - we can't start + // then stop because we have timestamps already. + auto prof = std::make_shared(tt); + prof->set_start(start + Globals::delta()); + prof->set_end(end + Globals::delta()); + // important! Otherwise we might get the wrong end timestamp. + prof->stopped = true; + // Get the singleton APEX instance + static apex::apex* instance = apex::apex::instance(); + // fake out the profiler_listener + instance->the_profiler_listener->push_profiler_public(prof); + // Handle tracing, if necessary + if (apex::apex_options::use_trace_event()) { + apex::trace_event_listener * tel = + (apex::trace_event_listener*)instance->the_trace_event_listener; + tel->on_async_event(node, prof, as_data); + } +#ifdef APEX_HAVE_OTF2 + if (apex::apex_options::use_otf2() && otf2_trace) { + apex::otf2_listener * tol = + (apex::otf2_listener*)instance->the_otf2_listener; + tol->on_async_event(node, prof); + } +#else + APEX_UNUSED(otf2_trace); +#endif + // have the listeners handle the end of this task + instance->complete_task(tt); +} + +/* Handle counters from asynchronous activity */ +void store_counter_data(const char * name, const std::string& ctx, + uint64_t end, double value, apex::base_thread_node &node) { + apex::in_apex prevent_deadlocks; + std::stringstream ss; + if (name == nullptr) { + ss << "GPU: " << ctx; + } else { + ss << "GPU: " << name << " " << ctx; + } + std::string tmp{ss.str()}; + auto task_id = apex::task_identifier::get_task_id(tmp); + std::shared_ptr prof = + std::make_shared(task_id, value); + prof->is_counter = true; + prof->set_end(end + Globals::delta()); + // Get the singleton APEX instance + static apex::apex* instance = apex::apex::instance(); + // fake out the profiler_listener + instance->the_profiler_listener->push_profiler_public(prof); + // Handle tracing, if necessary + if (apex::apex_options::use_trace_event()) { + apex::trace_event_listener * tel = + (apex::trace_event_listener*)instance->the_trace_event_listener; + tel->on_async_metric(node, prof); + } +#ifdef APEX_HAVE_OTF2 + if (apex::apex_options::use_otf2()) { + apex::otf2_listener * tol = + (apex::otf2_listener*)instance->the_otf2_listener; + tol->on_async_metric(node, prof); + } +#endif +} + +void store_counter_data(const char * name, const std::string& ctx, + uint64_t end, size_t value, apex::base_thread_node &node) { + store_counter_data(name, ctx, end, (double)(value), node); +} + +// Simple print routine that this example uses while traversing +// through the trace records returned as part of the buffer-completion callback +static void print_record_ompt(ompt_record_ompt_t *rec) { + if (rec == NULL) return; + + DEBUG_PRINT("rec=%p type=%d time=%lu thread_id=%lu target_id=%lu\n", + rec, rec->type, rec->time, rec->thread_id, rec->target_id); + + static std::unordered_map active_target_addrs; + static std::unordered_map active_target_devices; + static std::unordered_map> target_map; + static std::unordered_map target_start_times; + static std::mutex target_lock; + + switch (rec->type) { + case ompt_callback_target: + case ompt_callback_target_emi: + { + ompt_record_target_t target_rec = rec->record.target; + DEBUG_PRINT("\tRecord Target: kind=%d endpoint=%d device=%d task_id=%lu target_id=%lu codeptr=%p\n", + target_rec.kind, target_rec.endpoint, target_rec.device_num, + target_rec.task_id, target_rec.target_id, target_rec.codeptr_ra); + if (target_rec.endpoint == ompt_scope_begin) { + std::stringstream ss; + ss << "GPU: OpenMP Target"; + if (target_rec.codeptr_ra != nullptr) { + ss << ": UNRESOLVED ADDR " << target_rec.codeptr_ra; + } + std::string name{ss.str()}; + auto tt = start_async_task(name, rec->target_id); + std::unique_lock l(target_lock); + target_map[rec->target_id] = tt; + target_start_times[rec->target_id] = rec->time; + active_target_addrs[rec->target_id] = target_rec.codeptr_ra; + active_target_devices[rec->target_id] = target_rec.device_num; + } else if (target_rec.endpoint == ompt_scope_end) { + std::shared_ptr tt; + uint64_t start; + { + std::unique_lock l(target_lock); + tt = target_map[rec->target_id]; + start = target_start_times[rec->target_id]; + active_target_addrs.erase(rec->target_id); + active_target_devices.erase(rec->target_id); + target_map.erase(rec->target_id); + target_start_times.erase(rec->target_id); + } + apex::base_thread_node node(target_rec.device_num, APEX_ASYNC_KERNEL); + stop_async_task(tt, start, rec->time, rec->target_id, node); + } + break; + } + case ompt_callback_target_data_op: + case ompt_callback_target_data_op_emi: + { + ompt_record_target_data_op_t target_data_op_rec = rec->record.target_data_op; + DEBUG_PRINT("\tRecord DataOp: host_op_id=%lu optype=%d src_addr=%p src_device=%d " + "dest_addr=%p dest_device=%d bytes=%lu end_time=%lu duration=%lu ns codeptr=%p\n", + target_data_op_rec.host_op_id, target_data_op_rec.optype, + target_data_op_rec.src_addr, target_data_op_rec.src_device_num, + target_data_op_rec.dest_addr, target_data_op_rec.dest_device_num, + target_data_op_rec.bytes, target_data_op_rec.end_time, + target_data_op_rec.end_time - rec->time, + target_data_op_rec.codeptr_ra); + apex::base_thread_node node(target_data_op_rec.dest_device_num, APEX_ASYNC_MEMORY); + std::stringstream ss; + ss << "GPU: OpenMP Target DataOp"; + switch (target_data_op_rec.optype) { + case ompt_target_data_alloc: { + ss << " Alloc"; + break; + } + case ompt_target_data_transfer_to_device: { + ss << " Xfer to Dev"; + break; + } + case ompt_target_data_transfer_from_device: { + ss << " Xfer from Dev"; + break; + } + case ompt_target_data_delete: { + ss << " Delete"; + break; + } + case ompt_target_data_associate: { + ss << " Associate"; + break; + } + case ompt_target_data_disassociate: { + ss << " Disassociate"; + break; + } + case ompt_target_data_alloc_async: { + ss << " Alloc"; + break; + } + case ompt_target_data_transfer_to_device_async: { + ss << " Xfer to Dev Async"; + break; + } + case ompt_target_data_transfer_from_device_async: { + ss << " Xfer from Dev Async"; + break; + } + case ompt_target_data_delete_async: { + ss << " Delete Async"; + break; + } + } + std::shared_ptr tt; + const void* codeptr_ra; + { + std::unique_lock l(target_lock); + tt = target_map[rec->target_id]; + codeptr_ra = active_target_addrs[rec->target_id]; + } + if (codeptr_ra != nullptr) { + ss << ": UNRESOLVED ADDR " << codeptr_ra; + } + std::string name{ss.str()}; + store_profiler_data(name, rec->time, + target_data_op_rec.end_time, node, tt); + store_counter_data("OpenMP Target DataOp", "Bytes", target_data_op_rec.end_time, + target_data_op_rec.bytes, node); + break; + } + case ompt_callback_target_submit: + case ompt_callback_target_submit_emi: + { + ompt_record_target_kernel_t target_kernel_rec = rec->record.target_kernel; + DEBUG_PRINT("\tRecord Submit: host_op_id=%lu requested_num_teams=%u granted_num_teams=%u " + "end_time=%lu duration=%lu ns\n", + target_kernel_rec.host_op_id, target_kernel_rec.requested_num_teams, + target_kernel_rec.granted_num_teams, target_kernel_rec.end_time, + target_kernel_rec.end_time - rec->time); + std::stringstream ss; + ss << "GPU: OpenMP Target Submit"; + int device_num; + std::shared_ptr tt; + const void* codeptr_ra; + { + std::unique_lock l(target_lock); + tt = target_map[rec->target_id]; + codeptr_ra = active_target_addrs[rec->target_id]; + device_num = active_target_devices[rec->target_id]; + } + if (codeptr_ra != nullptr) { + ss << ": UNRESOLVED ADDR " << codeptr_ra; + } + std::string name{ss.str()}; + apex::base_thread_node node(device_num, APEX_ASYNC_KERNEL); + store_profiler_data(name, rec->time, + target_kernel_rec.end_time, node, tt); + break; + } + default: + APEX_ASSERT(false); + break; + } +} + +// Deallocation routine that will be called by the tool when a buffer +// previously allocated by the buffer-request callback is no longer required. +// The deallocation method must match the allocation routine. Here +// free is used for corresponding malloc +static void delete_buffer_ompt(ompt_buffer_t *buffer) { + free(buffer); + printf("Deallocated %p\n", buffer); +} + /* Function pointers. These are all queried from the runtime during * * ompt_initialize() */ -static ompt_set_callback_t ompt_set_callback; -static ompt_finalize_tool_t ompt_finalize_tool; +static ompt_set_callback_t ompt_set_callback = nullptr; +static ompt_finalize_tool_t ompt_finalize_tool = nullptr; /* static ompt_enumerate_states_t ompt_enumerate_states; static ompt_enumerate_mutex_impls_t ompt_enumerate_mutex_impls; @@ -81,6 +458,12 @@ static ompt_get_num_devices_t ompt_get_num_devices; static ompt_get_unique_id_t ompt_get_unique_id; static ompt_function_lookup_t ompt_function_lookup; */ +static ompt_set_trace_ompt_t ompt_set_trace_ompt = nullptr; +static ompt_start_trace_t ompt_start_trace = nullptr; +static ompt_flush_trace_t ompt_flush_trace = nullptr; +static ompt_stop_trace_t ompt_stop_trace = nullptr; +static ompt_get_record_ompt_t ompt_get_record_ompt = nullptr; +static ompt_advance_buffer_cursor_t ompt_advance_buffer_cursor = nullptr; // forward declare so we can stop the initial task when forced to shut down void apex_ompt_stop(ompt_data_t * ompt_data); @@ -397,16 +780,22 @@ extern "C" void apex_implicit_task( extern "C" void apex_target ( ompt_target_t kind, ompt_scope_endpoint_t endpoint, - uint64_t device_num, + int device_num, ompt_data_t *task_data, ompt_id_t target_id, const void *codeptr_ra ) { + APEX_UNUSED(kind); + APEX_UNUSED(device_num); + APEX_UNUSED(target_id); if (!enabled) { return; } + DEBUG_PRINT("Callback Target:\n" + "\ttarget_id=%lu kind=%d endpoint=%d device_num=%d code=%p,\n" + "\ttask_data->value=%" PRId64 ", task_data->ptr=%p\n", + target_id, kind, endpoint, device_num, codeptr_ra, + task_data->value, task_data->ptr); if (endpoint == ompt_scope_begin) { char regionIDstr[128] = {0}; - DEBUG_PRINT("%" PRId64 ": Begin target: %p, %p\n", apex_threadid, - (void*)task_data, codeptr_ra); if (codeptr_ra != nullptr) { sprintf(regionIDstr, "OpenMP Target: UNRESOLVED ADDR %p", codeptr_ra); @@ -416,9 +805,17 @@ extern "C" void apex_target ( apex_ompt_start(regionIDstr, task_data, nullptr, true); } } else { - DEBUG_PRINT("%" PRId64 ": End target: %p, %p\n", apex_threadid, - (void*)task_data, codeptr_ra); + // save a copy of the task wrapper + std::shared_ptr tw = ((linked_timer*)(task_data->ptr))->tw; + Globals::insert_timer(target_id, tw); + // stop the timer apex_ompt_stop(task_data); + // save the timer data to make flow events + apex::async_event_data as_data( + tw->prof->get_start_us(), + "ControlFlow", target_id, + apex::thread_instance::get_id(), "OpenMP Target"); + Globals::insert_data(target_id, as_data); } } @@ -434,9 +831,92 @@ extern "C" void apex_target_data_op ( size_t bytes, const void *codeptr_ra ) { + APEX_UNUSED(target_id); + APEX_UNUSED(host_op_id); + APEX_UNUSED(optype); + APEX_UNUSED(src_addr); + APEX_UNUSED(src_device_num); + APEX_UNUSED(dest_addr); + APEX_UNUSED(dest_device_num); + APEX_UNUSED(bytes); + APEX_UNUSED(codeptr_ra); + static std::unordered_map allocations; + static std::mutex allocation_lock; if (!enabled) { return; } - printf("%s of %" PRId64 " bytes\n", __func__, bytes); + DEBUG_PRINT("Callback DataOp:\n" + "\ttarget_id=%lu host_op_id=%lu optype=%d src=%p\n" + "\tsrc_device_num=%d dest=%p dest_device_num=%d bytes=%lu code=%p\n", + target_id, host_op_id, optype, src_addr, src_device_num, + dest_addr, dest_device_num, bytes, codeptr_ra); // get the address and save the bytes transferred + switch (optype) { + case ompt_target_data_alloc: { + apex::sample_value("GPU: OpenMP Target Data Alloc",bytes); + std::unique_lock l(allocation_lock); + allocations[src_addr] = bytes; + break; + } + case ompt_target_data_transfer_to_device: { + apex::sample_value("GPU: OpenMP Target Data Transfer to Device",bytes); + std::unique_lock l(allocation_lock); + allocations[dest_addr] = bytes; + break; + } + case ompt_target_data_transfer_from_device: { + apex::sample_value("GPU: OpenMP Target Data Transfer from Device",bytes); + std::unique_lock l(allocation_lock); + allocations[dest_addr] = bytes; + break; + } + case ompt_target_data_delete: { + size_t mybytes; + { + std::unique_lock l(allocation_lock); + mybytes = allocations[src_addr]; + allocations.erase(src_addr); + } + apex::sample_value("GPU: OpenMP Target Data Delete",mybytes); + break; + } + case ompt_target_data_associate: { + apex::sample_value("GPU: OpenMP Target Data Associate",bytes); + break; + } + case ompt_target_data_disassociate: { + apex::sample_value("GPU: OpenMP Target Data Disassociate",bytes); + break; + } + case ompt_target_data_alloc_async: { + apex::sample_value("GPU: OpenMP Target Data Alloc Async",bytes); + std::unique_lock l(allocation_lock); + allocations[src_addr] = bytes; + break; + } + case ompt_target_data_transfer_to_device_async: { + apex::sample_value("GPU: OpenMP Target Data Transfer to Device Async",bytes); + std::unique_lock l(allocation_lock); + allocations[dest_addr] = bytes; + break; + } + case ompt_target_data_transfer_from_device_async: { + apex::sample_value("GPU: OpenMP Target Data Transfer from Device Async",bytes); + std::unique_lock l(allocation_lock); + allocations[dest_addr] = bytes; + break; + } + case ompt_target_data_delete_async: { + size_t mybytes; + { + std::unique_lock l(allocation_lock); + mybytes = allocations[src_addr]; + allocations.erase(src_addr); + } + apex::sample_value("GPU: OpenMP Target Data Delete Async",mybytes); + break; + } + default: + break; + } } /* Event #10, target submit */ @@ -445,8 +925,12 @@ extern "C" void apex_target_submit ( ompt_id_t host_op_id, unsigned int requested_num_teams ) { + APEX_UNUSED(target_id); + APEX_UNUSED(host_op_id); if (!enabled) { return; } - printf("%s with %" PRId64 " teams\n", __func__, requested_num_teams); + DEBUG_PRINT("Callback Submit:\n" + "\ttarget_id=%lu host_op_id=%lu req_num_teams=%d\n", + target_id, host_op_id, requested_num_teams); } /* Event #11, tool control */ @@ -456,28 +940,65 @@ extern "C" void apex_control( void *arg, /* argument of control call */ const void *codeptr_ra /* return address of runtime call */ ) { + APEX_UNUSED(command); + APEX_UNUSED(modifier); + APEX_UNUSED(arg); + APEX_UNUSED(codeptr_ra); if (!enabled) { return; } - printf("%s\n", __func__); + DEBUG_PRINT("%s\n", __func__); } +// forward declare some functions +static ompt_set_result_t apex_set_trace_ompt(); +static int apex_ompt_start_trace(); + /* Event #12, device initialize */ extern "C" void apex_device_initialize ( - uint64_t device_num, + int device_num, const char *type, ompt_device_t *device, ompt_function_lookup_t lookup, const char *documentation ) { if (!enabled) { return; } - printf("%s\n", __func__); + DEBUG_PRINT("%s\n", __func__); + DEBUG_PRINT("Init: device_num=%d type=%s device=%p lookup=%p doc=%p\n", + device_num, type, device, lookup, documentation); + if (!lookup) { + DEBUG_PRINT("Trace collection disabled on device %d\n", device_num); + return; + } + + ompt_set_trace_ompt = (ompt_set_trace_ompt_t) lookup("ompt_set_trace_ompt"); + ompt_start_trace = (ompt_start_trace_t) lookup("ompt_start_trace"); + ompt_flush_trace = (ompt_flush_trace_t) lookup("ompt_flush_trace"); + ompt_stop_trace = (ompt_stop_trace_t) lookup("ompt_stop_trace"); + ompt_get_record_ompt = (ompt_get_record_ompt_t) lookup("ompt_get_record_ompt"); + ompt_advance_buffer_cursor = (ompt_advance_buffer_cursor_t) lookup("ompt_advance_buffer_cursor"); + + apex_set_trace_ompt(); + + // In many scenarios, this will be a good place to start the + // trace. If start_trace is called from the main program before this + // callback is dispatched, the start_trace handle will be null. This + // is because this device_init callback is invoked during the first + // target construct implementation. + + apex_ompt_start_trace(); } +static int apex_ompt_flush_trace(); +static int apex_ompt_stop_trace(); + /* Event #13, device finalize */ extern "C" void apex_device_finalize ( uint64_t device_num ) { + APEX_UNUSED(device_num); if (!enabled) { return; } - printf("%s\n", __func__); + DEBUG_PRINT("%s\n", __func__); + apex_ompt_flush_trace(); + apex_ompt_stop_trace(); } /* Event #14, device load */ @@ -491,8 +1012,16 @@ extern "C" void apex_device_load ( void * device_addr, uint64_t module_id ) { + APEX_UNUSED(device_num); + APEX_UNUSED(filename); + APEX_UNUSED(offset_in_file); + APEX_UNUSED(vma_in_file); + APEX_UNUSED(bytes); + APEX_UNUSED(host_addr); + APEX_UNUSED(device_addr); + APEX_UNUSED(module_id); if (!enabled) { return; } - printf("%s\n", __func__); + DEBUG_PRINT("%s\n", __func__); } /* Event #15, device load */ @@ -500,8 +1029,10 @@ extern "C" void apex_device_unload ( uint64_t device_num, uint64_t module_id ) { + APEX_UNUSED(device_num); + APEX_UNUSED(module_id); if (!enabled) { return; } - printf("%s\n", __func__); + DEBUG_PRINT("%s\n", __func__); } /* Event #22, target map */ @@ -514,8 +1045,15 @@ extern "C" void apex_target_map ( unsigned int *mapping_flags, const void *codeptr_ra ) { + APEX_UNUSED(target_id); + APEX_UNUSED(nitems); + APEX_UNUSED(host_addr); + APEX_UNUSED(device_addr); + APEX_UNUSED(bytes); + APEX_UNUSED(mapping_flags); + APEX_UNUSED(codeptr_ra); if (!enabled) { return; } - printf("%s\n", __func__); + DEBUG_PRINT("%s\n", __func__); // get the direction, and capture the bytes transferred. } @@ -683,14 +1221,12 @@ extern "C" void apex_ompt_work ( sprintf(regionIDstr, "OpenMP Work %s", tmp_str); apex_ompt_start(regionIDstr, task_data, parallel_data, true); } - /* if (apex::apex_options::ompt_high_overhead_events()) { std::stringstream ss; ss << count_type << ": " << regionIDstr; std::string tmp{ss.str()}; apex::sample_value(tmp, count); } - */ } else { DEBUG_PRINT("%" PRId64 ": %s End task: %p, region: %p\n", apex_threadid, tmp_str, (void*)task_data, (void*)parallel_data); @@ -904,6 +1440,85 @@ extern "C" void apex_ompt_idle ( /* End Optional events */ /**********************************************************************/ +/**********************************************************************/ +/* Start Asynchronous handling callbacks */ +/**********************************************************************/ + +// Trace record callbacks +// Allocation routine +static void on_ompt_callback_buffer_request ( + int device_num, + ompt_buffer_t **buffer, + size_t *bytes +) { + APEX_UNUSED(device_num); + APEX_UNUSED(buffer); + APEX_UNUSED(bytes); + *bytes = apex_ompt_buffer_request_size; + *buffer = malloc(*bytes); + //printf("Allocated %lu bytes at %p in buffer request callback\n", *bytes, *buffer); +} + +// This function is called by an OpenMP runtime helper thread for +// returning trace records from a buffer. +// Note: This callback must handle a null begin cursor. Currently, +// ompt_get_record_ompt, print_record_ompt, and +// ompt_advance_buffer_cursor handle a null cursor. +static void on_ompt_callback_buffer_complete ( + int device_num, + ompt_buffer_t *buffer, + size_t bytes, /* bytes returned in this callback */ + ompt_buffer_cursor_t begin, + int buffer_owned +) { + DEBUG_PRINT("Executing buffer complete callback: %d %p %lu %p %d\n", + device_num, buffer, bytes, (void*)begin, buffer_owned); + + int status = 1; + ompt_buffer_cursor_t current = begin; + while (status) { + ompt_record_ompt_t *rec = ompt_get_record_ompt(buffer, current); + print_record_ompt(rec); + status = ompt_advance_buffer_cursor(NULL, /* TODO device */ + buffer, + bytes, + current, + ¤t); + } + if (buffer_owned) delete_buffer_ompt(buffer); +} + +// Utility routine to enable the desired tracing modes +static ompt_set_result_t apex_set_trace_ompt() { + if (!ompt_set_trace_ompt) return ompt_set_error; + + ompt_set_trace_ompt(0, 1, ompt_callback_target); + ompt_set_trace_ompt(0, 1, ompt_callback_target_data_op); + ompt_set_trace_ompt(0, 1, ompt_callback_target_submit); + + return ompt_set_always; +} + +static int apex_ompt_start_trace() { + if (!ompt_start_trace) return 0; + return ompt_start_trace(0, &on_ompt_callback_buffer_request, + &on_ompt_callback_buffer_complete); +} + +static int apex_ompt_flush_trace() { + if (!ompt_flush_trace) return 0; + return ompt_flush_trace(0); +} + +static int apex_ompt_stop_trace() { + if (!ompt_stop_trace) return 0; + return ompt_stop_trace(0); +} + +/**********************************************************************/ +/* End Asynchronous handling callbacks */ +/**********************************************************************/ + // This function is for checking that the function registration worked. int apex_ompt_register(ompt_callbacks_t e, ompt_callback_t c , const char * name) { @@ -1115,6 +1730,7 @@ void ompt_finalize(ompt_data_t* tool_data) { APEX_UNUSED(tool_data); if (!apex::apex_options::use_ompt()) { return; } + apex_ompt_flush_trace(); DEBUG_PRINT("OpenMP runtime is shutting down...\n"); if (the_initial_task != nullptr) { apex_ompt_stop(the_initial_task); diff --git a/src/apex/apex_rocm_smi.cpp b/src/apex/apex_rocm_smi.cpp index c3f01596..41d513af 100644 --- a/src/apex/apex_rocm_smi.cpp +++ b/src/apex/apex_rocm_smi.cpp @@ -47,7 +47,7 @@ do { \ namespace apex { namespace rsmi { -std::set monitor::activeDeviceIndices; +//std::set monitor::activeDeviceIndices; std::mutex monitor::indexMutex; monitor::monitor (void) { diff --git a/src/apex/apex_rocm_smi.hpp b/src/apex/apex_rocm_smi.hpp index cffd14e9..50a1e6ce 100644 --- a/src/apex/apex_rocm_smi.hpp +++ b/src/apex/apex_rocm_smi.hpp @@ -61,14 +61,14 @@ class monitor { ~monitor (void); void query(); void stop(); - static void activateDeviceIndex(uint32_t index); + void activateDeviceIndex(uint32_t index); private: bool success; uint32_t deviceCount; std::vector devices; std::vector deviceInfos; std::vector queried_once; - static std::set activeDeviceIndices; + std::set activeDeviceIndices; static std::mutex indexMutex; //double convertValue(nvmlFieldValue_t &value); }; // class monitor diff --git a/src/apex/async_thread_node.hpp b/src/apex/async_thread_node.hpp index 9f62032b..1d85453d 100644 --- a/src/apex/async_thread_node.hpp +++ b/src/apex/async_thread_node.hpp @@ -8,25 +8,54 @@ #pragma once +#include +#include +#include + namespace apex { - class cuda_thread_node { + class base_thread_node { public: uint32_t _device; + apex_async_activity_t _activity; + base_thread_node(uint32_t device, apex_async_activity_t activity) : + _device(device), _activity(activity) { } + virtual bool operator==(const base_thread_node &rhs) const { + return (_device == rhs._device && _activity == rhs._activity); + } + virtual bool operator<(const base_thread_node &rhs) const { + if (_device(node,id)); + vthread_map.insert(std::pair(node,id)); // construct a globally unique ID for this thread on this rank uint64_t my_node_id = my_saved_node_id; my_node_id = (my_node_id << 32) + id; @@ -2678,7 +2667,7 @@ namespace apex { #endif - void otf2_listener::on_async_event(async_thread_node &node, + void otf2_listener::on_async_event(base_thread_node &node, std::shared_ptr &p) { // This could be a callback from a library before APEX is ready // Something like OpenMP or CUDA/CUPTI or...? @@ -2739,7 +2728,7 @@ namespace apex { } - void otf2_listener::on_async_metric(async_thread_node &node, + void otf2_listener::on_async_metric(base_thread_node &node, std::shared_ptr &p) { // This could be a callback from a library before APEX is ready // Something like OpenMP or CUDA/CUPTI or...? diff --git a/src/apex/otf2_listener.hpp b/src/apex/otf2_listener.hpp index 16ba9eb1..c55548d7 100644 --- a/src/apex/otf2_listener.hpp +++ b/src/apex/otf2_listener.hpp @@ -193,9 +193,9 @@ namespace apex { uint64_t stamp, bool is_enter); #endif std::mutex _vthread_mutex; - std::map vthread_map; + std::map vthread_map; std::map vthread_evt_writer_map; - uint32_t make_vtid (async_thread_node &node); + uint32_t make_vtid (base_thread_node &node); std::map last_ts; uint64_t dropped; int64_t synchronizeClocks(void); @@ -241,9 +241,9 @@ namespace apex { { APEX_UNUSED(data); }; void on_send(message_event_data &data); void on_recv(message_event_data &data); - void on_async_event(async_thread_node &node, + void on_async_event(base_thread_node &node, std::shared_ptr &p); - void on_async_metric(async_thread_node &node, + void on_async_metric(base_thread_node &node, std::shared_ptr &p); }; diff --git a/src/apex/trace_event_listener.cpp b/src/apex/trace_event_listener.cpp index 2d531822..4cf6f1c6 100644 --- a/src/apex/trace_event_listener.cpp +++ b/src/apex/trace_event_listener.cpp @@ -214,7 +214,7 @@ void trace_event_listener::set_metadata(const char * name, const char * value) { APEX_UNUSED(value); } -std::string trace_event_listener::make_tid (async_thread_node &node) { +std::string trace_event_listener::make_tid (base_thread_node &node) { size_t tid; /* There is a potential for overlap here, but not a high potential. The CPU and the GPU * would BOTH have to spawn 64k+ threads/streams for this to happen. */ @@ -222,24 +222,14 @@ std::string trace_event_listener::make_tid (async_thread_node &node) { size_t id = vthread_map.size()+1; //uint32_t id_reversed = simple_reverse(id); uint32_t id_shifted = id << 16; - vthread_map.insert(std::pair(node,id_shifted)); + vthread_map.insert(std::pair(node,id_shifted)); std::stringstream ss; ss << fixed; ss << "{\"name\":\"thread_name\"" << ",\"ph\":\"M\",\"pid\":" << saved_node_id << ",\"tid\":" << id_shifted << ",\"args\":{\"name\":"; -#ifdef APEX_WITH_CUDA - ss << "\"CUDA [" << node._device << ":" << node._context - << ":" << std::setfill('0') << setw(5) << node._stream << "]"; -#endif -#ifdef APEX_WITH_HIP - ss << "\"HIP [" << node._device - << ":" << std::setfill('0') << setw(5) << node._queue << "]"; -#endif -#if !defined(APEX_WITH_CUDA) && !defined(APEX_WITH_HIP) - ss << "\"GPU [" << node._device << "]"; -#endif + ss << node.name(); //ss << "" << activity_to_string(node._activity); ss << "\""; ss << "}},\n"; @@ -258,7 +248,7 @@ std::string trace_event_listener::make_tid (async_thread_node &node) { return label; } -void trace_event_listener::on_async_event(async_thread_node &node, +void trace_event_listener::on_async_event(base_thread_node &node, std::shared_ptr &p, const async_event_data& data) { if (!_terminate) { std::stringstream ss; @@ -277,6 +267,7 @@ void trace_event_listener::on_async_event(async_thread_node &node, << ",\"args\":{\"GUID\":" << p->guid << ",\"Parent GUID\":" << pguid << "}},\n"; // write a flow event pair! // make sure the start of the flow is before the end of the flow, ideally the middle of the parent + if (data.flow) { if (data.reverse_flow) { double begin_ts = (p->get_stop_us() + p->get_start_us()) * 0.5; double end_ts = std::min(p->get_stop_us(), data.parent_ts_stop); @@ -288,12 +279,13 @@ void trace_event_listener::on_async_event(async_thread_node &node, write_flow_event(ss, begin_ts, 's', data.cat, data.id, saved_node_id, data.parent_tid, data.name); write_flow_event(ss, end_ts, 't', data.cat, data.id, saved_node_id, atol(tid.c_str()), data.name); } + } write_to_trace(ss); flush_trace_if_necessary(); } } -void trace_event_listener::on_async_metric(async_thread_node &node, +void trace_event_listener::on_async_metric(base_thread_node &node, std::shared_ptr &p) { if (!_terminate) { std::stringstream ss; diff --git a/src/apex/trace_event_listener.hpp b/src/apex/trace_event_listener.hpp index 417d0f0d..8beb589f 100644 --- a/src/apex/trace_event_listener.hpp +++ b/src/apex/trace_event_listener.hpp @@ -53,9 +53,9 @@ class trace_event_listener : public event_listener { void on_recv(message_event_data &data) { APEX_UNUSED(data); }; void set_node_id(int node_id, int node_count); void set_metadata(const char * name, const char * value); - void on_async_event(async_thread_node &node, std::shared_ptr &p, + void on_async_event(base_thread_node &node, std::shared_ptr &p, const async_event_data& data); - void on_async_metric(async_thread_node &node, std::shared_ptr &p); + void on_async_metric(base_thread_node &node, std::shared_ptr &p); void end_trace_time(void); private: @@ -65,7 +65,7 @@ class trace_event_listener : public event_listener { void close_trace(void); void flush_trace_if_necessary(void); void _common_stop(std::shared_ptr &p); - std::string make_tid (async_thread_node &node); + std::string make_tid (base_thread_node &node); long unsigned int get_thread_id_metadata(); static bool _initialized; size_t get_thread_index(void); @@ -84,7 +84,7 @@ class trace_event_listener : public event_listener { std::map mutexes; std::map streams; std::mutex _vthread_mutex; - std::map vthread_map; + std::map vthread_map; double _end_time; };