From 433e959deab26ccf1eb9b75b8ea3e21659da4f0a Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:45:05 -0400 Subject: [PATCH] Free temp memory no longer needed in multibyte_split processing (#16091) Updates the `multibyte_split` logic to free temporary memory once the chars and offsets have been resolved. This gives room to the remaining processing if more temp memory is required. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Bradley Dice (https://github.com/bdice) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/16091 --- cpp/src/io/text/multibyte_split.cu | 324 ++++++++++++++--------------- 1 file changed, 162 insertions(+), 162 deletions(-) diff --git a/cpp/src/io/text/multibyte_split.cu b/cpp/src/io/text/multibyte_split.cu index 51dc0ca90af..be2e2b9a79c 100644 --- a/cpp/src/io/text/multibyte_split.cu +++ b/cpp/src/io/text/multibyte_split.cu @@ -55,6 +55,8 @@ #include #include +namespace cudf::io::text { +namespace detail { namespace { using cudf::io::text::detail::multistate; @@ -299,11 +301,6 @@ CUDF_KERNEL __launch_bounds__(THREADS_PER_TILE) void byte_split_kernel( } // namespace -namespace cudf { -namespace io { -namespace text { -namespace detail { - std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source const& source, std::string const& delimiter, byte_range_info byte_range, @@ -336,173 +333,181 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source CUDF_EXPECTS(delimiter.size() < multistate::max_segment_value, "delimiter contains too many total tokens to produce a deterministic result."); - auto const concurrency = 2; - - // must be at least 32 when using warp-reduce on partials - // must be at least 1 more than max possible concurrent tiles - // best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s - auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32); - auto tile_multistates = - scan_tile_state(num_tile_states, stream, rmm::mr::get_current_device_resource()); - auto tile_offsets = - scan_tile_state(num_tile_states, stream, rmm::mr::get_current_device_resource()); - - multibyte_split_init_kernel<<>>( // - -TILES_PER_CHUNK, - TILES_PER_CHUNK, - tile_multistates, - tile_offsets, - cudf::io::text::detail::scan_tile_status::oob); - - auto multistate_seed = multistate(); - multistate_seed.enqueue(0, 0); // this represents the first state in the pattern. - - // Seeding the tile state with an identity value allows the 0th tile to follow the same logic as - // the Nth tile, assuming it can look up an inclusive prefix. Without this seed, the 0th block - // would have to follow separate logic. - cudf::detail::device_single_thread( - [tm = scan_tile_state_view(tile_multistates), - to = scan_tile_state_view(tile_offsets), - multistate_seed] __device__() mutable { - tm.set_inclusive_prefix(-1, multistate_seed); - to.set_inclusive_prefix(-1, 0); - }, - stream); - - auto reader = source.create_reader(); - auto chunk_offset = std::max(0, byte_range.offset() - delimiter.size()); - auto const byte_range_end = byte_range.offset() + byte_range.size(); - reader->skip_bytes(chunk_offset); - // amortize output chunk allocations over 8 worst-case outputs. This limits the overallocation - constexpr auto max_growth = 8; - output_builder row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream); - output_builder char_storage(ITEMS_PER_CHUNK, max_growth, stream); - - auto streams = cudf::detail::fork_streams(stream, concurrency); - - cudaEvent_t last_launch_event; - CUDF_CUDA_TRY(cudaEventCreate(&last_launch_event)); - - auto& read_stream = streams[0]; - auto& scan_stream = streams[1]; - auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream); - int64_t base_tile_idx = 0; + auto chunk_offset = std::max(0, byte_range.offset() - delimiter.size()); std::optional first_row_offset; - std::optional last_row_offset; - bool found_last_offset = false; if (byte_range.offset() == 0) { first_row_offset = 0; } - std::swap(read_stream, scan_stream); - - while (chunk->size() > 0) { - // if we found the last delimiter, or didn't find delimiters inside the byte range at all: abort - if (last_row_offset.has_value() or - (not first_row_offset.has_value() and chunk_offset >= byte_range_end)) { - break; - } - - auto tiles_in_launch = - cudf::util::div_rounding_up_safe(chunk->size(), static_cast(ITEMS_PER_TILE)); - - auto row_offsets = row_offset_storage.next_output(scan_stream); + std::optional last_row_offset; - // reset the next chunk of tile state - multibyte_split_init_kernel<<(num_tile_states, stream, rmm::mr::get_current_device_resource()); + auto tile_offsets = scan_tile_state( + num_tile_states, stream, rmm::mr::get_current_device_resource()); + + multibyte_split_init_kernel<<>>( // - base_tile_idx, - tiles_in_launch, + stream.value()>>>( // + -TILES_PER_CHUNK, + TILES_PER_CHUNK, tile_multistates, - tile_offsets); + tile_offsets, + cudf::io::text::detail::scan_tile_status::oob); - CUDF_CUDA_TRY(cudaStreamWaitEvent(scan_stream.value(), last_launch_event)); + auto multistate_seed = multistate(); + multistate_seed.enqueue(0, 0); // this represents the first state in the pattern. - if (delimiter.size() == 1) { - // the single-byte case allows for a much more efficient kernel, so we special-case it - byte_split_kernel<<>>( // - base_tile_idx, - chunk_offset, - row_offset_storage.size(), - tile_offsets, - delimiter[0], - *chunk, - row_offsets); - } else { - multibyte_split_kernel<<>>( // + // Seeding the tile state with an identity value allows the 0th tile to follow the same logic as + // the Nth tile, assuming it can look up an inclusive prefix. Without this seed, the 0th block + // would have to follow separate logic. + cudf::detail::device_single_thread( + [tm = scan_tile_state_view(tile_multistates), + to = scan_tile_state_view(tile_offsets), + multistate_seed] __device__() mutable { + tm.set_inclusive_prefix(-1, multistate_seed); + to.set_inclusive_prefix(-1, 0); + }, + stream); + + auto reader = source.create_reader(); + auto const byte_range_end = byte_range.offset() + byte_range.size(); + reader->skip_bytes(chunk_offset); + // amortize output chunk allocations over 8 worst-case outputs. This limits the overallocation + constexpr auto max_growth = 8; + output_builder row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream); + output_builder char_storage(ITEMS_PER_CHUNK, max_growth, stream); + + auto streams = cudf::detail::fork_streams(stream, concurrency); + + cudaEvent_t last_launch_event; + CUDF_CUDA_TRY(cudaEventCreate(&last_launch_event)); + + auto& read_stream = streams[0]; + auto& scan_stream = streams[1]; + auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream); + int64_t base_tile_idx = 0; + bool found_last_offset = false; + std::swap(read_stream, scan_stream); + + while (chunk->size() > 0) { + // if we found the last delimiter, or didn't find delimiters inside the byte range at all: + // abort + if (last_row_offset.has_value() or + (not first_row_offset.has_value() and chunk_offset >= byte_range_end)) { + break; + } + + auto tiles_in_launch = + cudf::util::div_rounding_up_safe(chunk->size(), static_cast(ITEMS_PER_TILE)); + + auto row_offsets = row_offset_storage.next_output(scan_stream); + + // reset the next chunk of tile state + multibyte_split_init_kernel<<>>( // base_tile_idx, - chunk_offset, - row_offset_storage.size(), + tiles_in_launch, tile_multistates, - tile_offsets, - {device_delim.data(), static_cast(device_delim.size())}, - *chunk, - row_offsets); - } + tile_offsets); + + CUDF_CUDA_TRY(cudaStreamWaitEvent(scan_stream.value(), last_launch_event)); + + if (delimiter.size() == 1) { + // the single-byte case allows for a much more efficient kernel, so we special-case it + byte_split_kernel<<>>( // + base_tile_idx, + chunk_offset, + row_offset_storage.size(), + tile_offsets, + delimiter[0], + *chunk, + row_offsets); + } else { + multibyte_split_kernel<<>>( // + base_tile_idx, + chunk_offset, + row_offset_storage.size(), + tile_multistates, + tile_offsets, + {device_delim.data(), static_cast(device_delim.size())}, + *chunk, + row_offsets); + } - // load the next chunk - auto next_chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream); - // while that is running, determine how many offsets we output (synchronizes) - auto const new_offsets = [&] { - auto const new_offsets_unclamped = - tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream) - - static_cast(row_offset_storage.size()); - // if we are not in the last chunk, we can use all offsets - if (chunk_offset + static_cast(chunk->size()) < byte_range_end) { - return new_offsets_unclamped; + // load the next chunk + auto next_chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, read_stream); + // while that is running, determine how many offsets we output (synchronizes) + auto const new_offsets = [&] { + auto const new_offsets_unclamped = + tile_offsets.get_inclusive_prefix(base_tile_idx + tiles_in_launch - 1, scan_stream) - + static_cast(row_offset_storage.size()); + // if we are not in the last chunk, we can use all offsets + if (chunk_offset + static_cast(chunk->size()) < byte_range_end) { + return new_offsets_unclamped; + } + // if we are in the last chunk, we need to find the first out-of-bounds offset + auto const it = thrust::make_counting_iterator(output_offset{}); + auto const end_loc = + *thrust::find_if(rmm::exec_policy_nosync(scan_stream), + it, + it + new_offsets_unclamped, + [row_offsets, byte_range_end] __device__(output_offset i) { + return row_offsets[i] >= byte_range_end; + }); + // if we had no out-of-bounds offset, we copy all offsets + if (end_loc == new_offsets_unclamped) { return end_loc; } + // otherwise we copy only up to (including) the first out-of-bounds delimiter + found_last_offset = true; + return end_loc + 1; + }(); + row_offset_storage.advance_output(new_offsets, scan_stream); + // determine if we found the first or last field offset for the byte range + if (new_offsets > 0 and not first_row_offset) { + first_row_offset = row_offset_storage.front_element(scan_stream); + } + if (found_last_offset) { last_row_offset = row_offset_storage.back_element(scan_stream); } + // copy over the characters we need, if we already encountered the first field delimiter + if (first_row_offset.has_value()) { + auto const begin = + chunk->data() + std::max(0, *first_row_offset - chunk_offset); + auto const sentinel = last_row_offset.value_or(std::numeric_limits::max()); + auto const end = + chunk->data() + std::min(sentinel - chunk_offset, chunk->size()); + auto const output_size = end - begin; + auto char_output = char_storage.next_output(scan_stream); + thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, end, char_output.begin()); + char_storage.advance_output(output_size, scan_stream); } - // if we are in the last chunk, we need to find the first out-of-bounds offset - auto const it = thrust::make_counting_iterator(output_offset{}); - auto const end_loc = - *thrust::find_if(rmm::exec_policy_nosync(scan_stream), - it, - it + new_offsets_unclamped, - [row_offsets, byte_range_end] __device__(output_offset i) { - return row_offsets[i] >= byte_range_end; - }); - // if we had no out-of-bounds offset, we copy all offsets - if (end_loc == new_offsets_unclamped) { return end_loc; } - // otherwise we copy only up to (including) the first out-of-bounds delimiter - found_last_offset = true; - return end_loc + 1; - }(); - row_offset_storage.advance_output(new_offsets, scan_stream); - // determine if we found the first or last field offset for the byte range - if (new_offsets > 0 and not first_row_offset) { - first_row_offset = row_offset_storage.front_element(scan_stream); - } - if (found_last_offset) { last_row_offset = row_offset_storage.back_element(scan_stream); } - // copy over the characters we need, if we already encountered the first field delimiter - if (first_row_offset.has_value()) { - auto const begin = chunk->data() + std::max(0, *first_row_offset - chunk_offset); - auto const sentinel = last_row_offset.value_or(std::numeric_limits::max()); - auto const end = - chunk->data() + std::min(sentinel - chunk_offset, chunk->size()); - auto const output_size = end - begin; - auto char_output = char_storage.next_output(scan_stream); - thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, end, char_output.begin()); - char_storage.advance_output(output_size, scan_stream); - } - CUDF_CUDA_TRY(cudaEventRecord(last_launch_event, scan_stream.value())); + CUDF_CUDA_TRY(cudaEventRecord(last_launch_event, scan_stream.value())); - std::swap(read_stream, scan_stream); - base_tile_idx += tiles_in_launch; - chunk_offset += chunk->size(); - chunk = std::move(next_chunk); - } + std::swap(read_stream, scan_stream); + base_tile_idx += tiles_in_launch; + chunk_offset += chunk->size(); + chunk = std::move(next_chunk); + } + + CUDF_CUDA_TRY(cudaEventDestroy(last_launch_event)); - CUDF_CUDA_TRY(cudaEventDestroy(last_launch_event)); + cudf::detail::join_streams(streams, stream); - cudf::detail::join_streams(streams, stream); + auto chars = char_storage.gather(stream, mr); + auto global_offsets = row_offset_storage.gather(stream, mr); + return std::pair{std::move(global_offsets), std::move(chars)}; + }(); // if the input was empty, we didn't find a delimiter at all, // or the first delimiter was also the last: empty output @@ -511,9 +516,6 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source return make_empty_column(type_id::STRING); } - auto chars = char_storage.gather(stream, mr); - auto global_offsets = row_offset_storage.gather(stream, mr); - // insert an offset at the beginning if we started at the beginning of the input bool const insert_begin = first_row_offset.value_or(0) == 0; // insert an offset at the end if we have not terminated the last row @@ -591,6 +593,4 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source return result; } -} // namespace text -} // namespace io -} // namespace cudf +} // namespace cudf::io::text