diff --git a/cpp/include/cudf/contiguous_split.hpp b/cpp/include/cudf/contiguous_split.hpp index 2756a8963cc..ce995a5c32a 100644 --- a/cpp/include/cudf/contiguous_split.hpp +++ b/cpp/include/cudf/contiguous_split.hpp @@ -141,8 +141,8 @@ struct contiguous_split_state; * all thrust and scratch memory allocations are using the passed-in memory resource exclusively, * not a per-device memory resource. * - * The caller has two methods it can use to carry out the chunked_pack: has_next and next. - * Here is an example: + * This class defines two methods that must be used in concert to carry out the chunked_pack: + * has_next and next. Here is an example: * * @code{.pseudo} * // Create a table_view @@ -156,7 +156,7 @@ struct contiguous_split_state; * // Define a buffer size for each chunk: the larger the buffer is, the more SMs can be * // occupied by this algorithm. * // - * // Internally, the GPU unit-of-work is a 1MB batch. When we instantiate `cudf::chunked_pack`, + * // Internally, the GPU unit of work is a 1MB batch. When we instantiate `cudf::chunked_pack`, * // all the 1MB batches for the source table_view are computed up front. Additionally, * // chunked_pack calculates the number of iterations that are required to go through all those * // batches given a `user_buffer_size` buffer. The number of 1MB batches in each iteration (chunk) @@ -196,15 +196,16 @@ class chunked_pack { * @param input source `table_view` to pack * @param user_buffer_size buffer size (in bytes) that will be passed on `next`. Must be * at least 1MB - * @param mr An optional memory resource to be used for temporary and scratch allocations only + * @param temp_mr An optional memory resource to be used for temporary and scratch allocations + * only */ explicit chunked_pack( cudf::table_view const& input, std::size_t user_buffer_size, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + rmm::mr::device_memory_resource* temp_mr = rmm::mr::get_current_device_resource()); /** - * @brief Destructor that will be implemented as default, required because + * @brief Destructor that will be implemented as default. Declared with definition here because * contiguous_split_state is incomplete at this stage. */ ~chunked_pack(); @@ -224,7 +225,7 @@ class chunked_pack { [[nodiscard]] bool has_next() const; /** - * @brief Packs the next chunk into `user_buffer`. This should be call as long as + * @brief Packs the next chunk into `user_buffer`. This should be called as long as * `has_next` returns true. If `next` is called when `has_next` is false, an exception * is thrown. * @@ -255,12 +256,13 @@ class chunked_pack { * @param input source `table_view` to pack * @param user_buffer_size buffer size (in bytes) that will be passed on `next`. Must be * at least 1MB - * @param mr RMM memory resource to be used for temporary and scratch allocations only + * @param temp_mr RMM memory resource to be used for temporary and scratch allocations only * @return a unique_ptr of chunked_pack */ - [[nodiscard]] static std::unique_ptr create(cudf::table_view const& input, - std::size_t user_buffer_size, - rmm::mr::device_memory_resource* mr); + [[nodiscard]] static std::unique_ptr create( + cudf::table_view const& input, + std::size_t user_buffer_size, + rmm::mr::device_memory_resource* temp_mr); private: // internal state of contiguous split diff --git a/cpp/src/copying/contiguous_split.cu b/cpp/src/copying/contiguous_split.cu index ce7b7bf0ad1..d982c11aee8 100644 --- a/cpp/src/copying/contiguous_split.cu +++ b/cpp/src/copying/contiguous_split.cu @@ -956,7 +956,7 @@ struct packed_split_indices_and_src_buf_info { std::size_t num_partitions, cudf::size_type num_src_bufs, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) : indices_size( cudf::util::round_up_safe((num_partitions + 1) * sizeof(size_type), split_align)), src_buf_info_size( @@ -981,7 +981,7 @@ struct packed_split_indices_and_src_buf_info { // device-side // gpu-only : stack space needed for nested list offset calculation d_indices_and_source_info = - rmm::device_buffer(indices_size + src_buf_info_size + offset_stack_size, stream, mr); + rmm::device_buffer(indices_size + src_buf_info_size + offset_stack_size, stream, temp_mr); d_indices = reinterpret_cast(d_indices_and_source_info.data()); d_src_buf_info = reinterpret_cast( reinterpret_cast(d_indices_and_source_info.data()) + indices_size); @@ -1017,7 +1017,7 @@ struct packed_partition_buf_size_and_dst_buf_info { cudf::size_type num_src_bufs, std::size_t num_bufs, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) : stream(stream), buf_sizes_size{cudf::util::round_up_safe(num_partitions * sizeof(std::size_t), split_align)}, dst_buf_info_size{cudf::util::round_up_safe(num_bufs * sizeof(dst_buf_info), split_align)}, @@ -1027,7 +1027,7 @@ struct packed_partition_buf_size_and_dst_buf_info { h_dst_buf_info{ reinterpret_cast(h_buf_sizes_and_dst_info.data() + buf_sizes_size)}, // device-side - d_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size, stream, mr), + d_buf_sizes_and_dst_info(buf_sizes_size + dst_buf_info_size, stream, temp_mr), d_buf_sizes{reinterpret_cast(d_buf_sizes_and_dst_info.data())}, // destination buffer info d_dst_buf_info{reinterpret_cast( @@ -1068,7 +1068,7 @@ struct packed_src_and_dst_pointers { std::size_t num_partitions, cudf::size_type num_src_bufs, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) : stream(stream), src_bufs_size{cudf::util::round_up_safe(num_src_bufs * sizeof(uint8_t*), split_align)}, dst_bufs_size{cudf::util::round_up_safe(num_partitions * sizeof(uint8_t*), split_align)}, @@ -1077,7 +1077,7 @@ struct packed_src_and_dst_pointers { h_src_bufs{reinterpret_cast(h_src_and_dst_buffers.data())}, h_dst_bufs{reinterpret_cast(h_src_and_dst_buffers.data() + src_bufs_size)}, // device-side - d_src_and_dst_buffers{rmm::device_buffer(src_bufs_size + dst_bufs_size, stream, mr)}, + d_src_and_dst_buffers{rmm::device_buffer(src_bufs_size + dst_bufs_size, stream, temp_mr)}, d_src_bufs{reinterpret_cast(d_src_and_dst_buffers.data())}, d_dst_bufs{reinterpret_cast( reinterpret_cast(d_src_and_dst_buffers.data()) + src_bufs_size)} @@ -1119,7 +1119,7 @@ struct packed_src_and_dst_pointers { * @param num_src_bufs number of buffers for the source columns including children * @param out_buffers the destination buffers per partition if in the non-chunked case * @param stream Optional CUDA stream on which to execute kernels - * @param mr RMM memory resource + * @param temp_mr A memory resource for temporary and scratch space * * @returns new unique pointer to packed_src_and_dst_pointers */ @@ -1129,10 +1129,10 @@ std::unique_ptr setup_src_and_dst_pointers( cudf::size_type num_src_bufs, std::vector& out_buffers, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) { - auto src_and_dst_pointers = - std::make_unique(input, num_partitions, num_src_bufs, stream, mr); + auto src_and_dst_pointers = std::make_unique( + input, num_partitions, num_src_bufs, stream, temp_mr); std::transform( out_buffers.begin(), out_buffers.end(), src_and_dst_pointers->h_dst_bufs, [](auto& buf) { @@ -1155,7 +1155,7 @@ std::unique_ptr setup_src_and_dst_pointers( * @param num_src_bufs number of buffers for the source columns including children * @param num_bufs num_src_bufs times the number of partitions * @param stream Optional CUDA stream on which to execute kernels - * @param mr RMM memory resource + * @param temp_mr A memory resource for temporary and scratch space * * @returns new unique pointer to `packed_partition_buf_size_and_dst_buf_info` */ @@ -1166,18 +1166,18 @@ std::unique_ptr compute_splits( cudf::size_type num_src_bufs, std::size_t num_bufs, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) { auto partition_buf_size_and_dst_buf_info = std::make_unique( - input, splits, num_partitions, num_src_bufs, num_bufs, stream, mr); + input, splits, num_partitions, num_src_bufs, num_bufs, stream, temp_mr); auto const d_dst_buf_info = partition_buf_size_and_dst_buf_info->d_dst_buf_info; auto const h_buf_sizes = partition_buf_size_and_dst_buf_info->h_buf_sizes; auto const d_buf_sizes = partition_buf_size_and_dst_buf_info->d_buf_sizes; - auto const split_indices_and_src_buf_info = - packed_split_indices_and_src_buf_info(input, splits, num_partitions, num_src_bufs, stream, mr); + auto const split_indices_and_src_buf_info = packed_split_indices_and_src_buf_info( + input, splits, num_partitions, num_src_bufs, stream, temp_mr); auto const d_src_buf_info = split_indices_and_src_buf_info.d_src_buf_info; auto const offset_stack_partition_size = @@ -1187,7 +1187,7 @@ std::unique_ptr compute_splits( // compute sizes of each column in each partition, including alignment. thrust::transform( - rmm::exec_policy(stream, mr), + rmm::exec_policy(stream, temp_mr), thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_bufs), d_dst_buf_info, @@ -1272,7 +1272,7 @@ std::unique_ptr compute_splits( auto values = cudf::detail::make_counting_transform_iterator(0, buf_size_functor{d_dst_buf_info}); - thrust::reduce_by_key(rmm::exec_policy(stream, mr), + thrust::reduce_by_key(rmm::exec_policy(stream, temp_mr), keys, keys + num_bufs, values, @@ -1287,7 +1287,7 @@ std::unique_ptr compute_splits( auto values = cudf::detail::make_counting_transform_iterator(0, buf_size_functor{d_dst_buf_info}); - thrust::exclusive_scan_by_key(rmm::exec_policy(stream, mr), + thrust::exclusive_scan_by_key(rmm::exec_policy(stream, temp_mr), keys, keys + num_bufs, values, @@ -1307,7 +1307,7 @@ std::unique_ptr compute_splits( * `copy_partitions` kernel and the number of iterations we need to carry out this copy. * * For the non-chunked contiguous_split case, this contains the batched dst_buf_infos and the - * number of iterations are going to be 1, since the non-chunked case is single pass. + * number of iterations is going to be 1 since the non-chunked case is single pass. * * For the chunked_pack case, this also contains the batched dst_buf_infos for all * iterations in addition to helping keep the state about what batches have been copied so far @@ -1320,8 +1320,8 @@ struct chunk_iteration_state { std::vector&& _h_size_of_buffs_per_iteration, std::size_t total_size) : num_iterations(_h_num_buffs_per_iteration.size()), - current_iteration(0), - starting_batch(0), + current_iteration{0}, + starting_batch{0}, d_batched_dst_buf_info(std::move(_d_batched_dst_buf_info)), d_batch_offsets(std::move(_d_batch_offsets)), h_num_buffs_per_iteration(std::move(_h_num_buffs_per_iteration)), @@ -1330,6 +1330,16 @@ struct chunk_iteration_state { { } + static std::unique_ptr create( + rmm::device_uvector> const& batches, + int num_bufs, + dst_buf_info* d_orig_dst_buf_info, + std::size_t const* const h_buf_sizes, + std::size_t num_partitions, + std::size_t user_buffer_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* temp_mr); + /** * @brief As of the time of the call, return the starting 1MB batch index, and the * number of batches to copy. @@ -1341,9 +1351,17 @@ struct chunk_iteration_state { CUDF_EXPECTS(current_iteration < num_iterations, "current_iteration cannot exceed num_iterations"); auto count_for_current = h_num_buffs_per_iteration[current_iteration]; - return std::make_pair(starting_batch, count_for_current); + return {starting_batch, count_for_current}; } + /** + * @brief Advance the iteration state if there are iterations left, updating the + * starting batch and returning the amount of bytes were copied in the iteration + * we just finished. + * @throws cudf::logic_error If the state was at the last iteration before entering + * this function. + * @return size in bytes that were copied in the finished iteration + */ std::size_t advance_iteration() { CUDF_EXPECTS(current_iteration < num_iterations, @@ -1354,6 +1372,9 @@ struct chunk_iteration_state { return bytes_copied; } + /** + * Returns true if there are iterations left. + */ bool has_more_copies() const { return current_iteration < num_iterations; } rmm::device_uvector d_batched_dst_buf_info; @@ -1368,7 +1389,7 @@ struct chunk_iteration_state { std::vector const h_size_of_buffs_per_iteration; }; -std::unique_ptr make_chunk_iteration_state( +std::unique_ptr chunk_iteration_state::create( rmm::device_uvector> const& batches, int num_bufs, dst_buf_info* d_orig_dst_buf_info, @@ -1376,16 +1397,16 @@ std::unique_ptr make_chunk_iteration_state( std::size_t num_partitions, std::size_t user_buffer_size, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) { - rmm::device_uvector d_batch_offsets(num_bufs + 1, stream, mr); + rmm::device_uvector d_batch_offsets(num_bufs + 1, stream, temp_mr); auto const buf_count_iter = cudf::detail::make_counting_transform_iterator( 0, [num_bufs, num_batches = num_batches_func{batches.begin()}] __device__(size_type i) { return i == num_bufs ? 0 : num_batches(i); }); - thrust::exclusive_scan(rmm::exec_policy(stream, mr), + thrust::exclusive_scan(rmm::exec_policy(stream, temp_mr), buf_count_iter, buf_count_iter + num_bufs + 1, d_batch_offsets.begin(), @@ -1394,17 +1415,17 @@ std::unique_ptr make_chunk_iteration_state( auto const num_batches_iter = cudf::detail::make_counting_transform_iterator(0, num_batches_func{batches.begin()}); size_type const num_batches = thrust::reduce( - rmm::exec_policy(stream, mr), num_batches_iter, num_batches_iter + batches.size()); + rmm::exec_policy(stream, temp_mr), num_batches_iter, num_batches_iter + batches.size()); auto out_to_in_index = out_to_in_index_function{d_batch_offsets.begin(), num_bufs}; auto const iter = thrust::make_counting_iterator(0); // load up the batches as d_dst_buf_info - rmm::device_uvector d_batched_dst_buf_info(num_batches, stream, mr); + rmm::device_uvector d_batched_dst_buf_info(num_batches, stream, temp_mr); thrust::for_each( - rmm::exec_policy(stream, mr), + rmm::exec_policy(stream, temp_mr), iter, iter + num_batches, [d_orig_dst_buf_info, @@ -1464,11 +1485,11 @@ std::unique_ptr make_chunk_iteration_state( // copy the batch offsets back to host std::vector h_offsets(num_batches + 1); { - rmm::device_uvector offsets(h_offsets.size(), stream, mr); + rmm::device_uvector offsets(h_offsets.size(), stream, temp_mr); auto const batch_byte_size_iter = cudf::detail::make_counting_transform_iterator( 0, batch_byte_size_function{num_batches, d_batched_dst_buf_info.begin()}); - thrust::exclusive_scan(rmm::exec_policy(stream, mr), + thrust::exclusive_scan(rmm::exec_policy(stream, temp_mr), batch_byte_size_iter, batch_byte_size_iter + num_batches + 1, offsets.begin()); @@ -1526,7 +1547,7 @@ std::unique_ptr make_chunk_iteration_state( // apply changed offset { rmm::device_uvector d_accum_size_per_iteration( - accum_size_per_iteration.size(), stream, mr); + accum_size_per_iteration.size(), stream, temp_mr); CUDF_CUDA_TRY(cudaMemcpyAsync(d_accum_size_per_iteration.data(), accum_size_per_iteration.data(), @@ -1540,7 +1561,7 @@ std::unique_ptr make_chunk_iteration_state( auto const iter = thrust::make_counting_iterator(num_batches_in_first_iteration); auto num_iterations = accum_size_per_iteration.size(); thrust::for_each( - rmm::exec_policy(stream, mr), + rmm::exec_policy(stream, temp_mr), iter, iter + num_batches - num_batches_in_first_iteration, [num_iterations, @@ -1640,10 +1661,10 @@ namespace detail { * is using the single-pass contiguous_split or chunked_pack. * * It exposes an iterator-like pattern where contiguous_split_state::has_next() - * return true when there is work to be done, and false otherwise. + * returns true when there is work to be done, and false otherwise. * * contiguous_split_state::contiguous_split() performs a single-pass contiguous_split - * and is only valid iff contiguous_split_state is instantiated with 0 for the user_buffer_size. + * and is valid iff contiguous_split_state is instantiated with 0 for the user_buffer_size. * * contiguous_split_state::contiguous_split_chunk(device_span) is only valid when * user_buffer_size > 0. It should be called as long as has_next() returns true. The @@ -1656,16 +1677,18 @@ struct contiguous_split_state { contiguous_split_state(cudf::table_view const& input, std::size_t user_buffer_size, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : contiguous_split_state(input, {}, user_buffer_size, stream, mr) + rmm::mr::device_memory_resource* mr, + rmm::mr::device_memory_resource* temp_mr) + : contiguous_split_state(input, {}, user_buffer_size, stream, mr, temp_mr) { } contiguous_split_state(cudf::table_view const& input, std::vector const& splits, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : contiguous_split_state(input, splits, 0, stream, mr) + rmm::mr::device_memory_resource* mr, + rmm::mr::device_memory_resource* temp_mr) + : contiguous_split_state(input, splits, 0, stream, mr, temp_mr) { } @@ -1673,14 +1696,18 @@ struct contiguous_split_state { std::vector const& splits, std::size_t user_buffer_size, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : input(input), user_buffer_size(user_buffer_size), stream(stream), mr(mr) + rmm::mr::device_memory_resource* mr, + rmm::mr::device_memory_resource* temp_mr) + : input(input), + user_buffer_size(user_buffer_size), + stream(stream), + mr(mr), + temp_mr(temp_mr), + is_empty{check_inputs(input, splits)}, + num_partitions{splits.size() + 1}, + num_src_bufs{count_src_bufs(input.begin(), input.end())}, + num_bufs{num_src_bufs * num_partitions} { - is_empty = check_inputs(input, splits); - num_partitions = splits.size() + 1; - num_src_bufs = count_src_bufs(input.begin(), input.end()); - num_bufs = num_src_bufs * num_partitions; - // if the table we are about to contig split is empty, we have special // handling where metadata is produced and a 0-byte contiguous buffer // is the result. @@ -1689,8 +1716,8 @@ struct contiguous_split_state { // First pass over the source tables to generate a `dst_buf_info` per split and column buffer // (`num_bufs`). After this, contiguous_split uses `dst_buf_info` to further subdivide the work // into 1MB batches in `compute_batches` - partition_buf_size_and_dst_buf_info = - std::move(compute_splits(input, splits, num_partitions, num_src_bufs, num_bufs, stream, mr)); + partition_buf_size_and_dst_buf_info = std::move( + compute_splits(input, splits, num_partitions, num_src_bufs, num_bufs, stream, temp_mr)); // Second pass: uses `dst_buf_info` to break down the work into 1MB batches. compute_batches(); @@ -1707,8 +1734,8 @@ struct contiguous_split_state { }); } - src_and_dst_pointers = std::move( - setup_src_and_dst_pointers(input, num_partitions, num_src_bufs, out_buffers, stream, mr)); + src_and_dst_pointers = std::move(setup_src_and_dst_pointers( + input, num_partitions, num_src_bufs, out_buffers, stream, temp_mr)); } bool has_next() const { return !is_empty && chunk_iter_state->has_more_copies(); } @@ -1720,16 +1747,16 @@ struct contiguous_split_state { void compute_batches() { - // Since we parallelize at one block per copy, we are vulnerable to situations where we + // Since we parallelize at one block per copy, performance is vulnerable to situations where we // have small numbers of copies to do (a combination of small numbers of splits and/or columns), // so we will take the actual set of outgoing source/destination buffers and further partition // them into much smaller batches in order to drive up the number of blocks and overall // occupancy. - rmm::device_uvector> batches(num_bufs, stream, mr); + rmm::device_uvector> batches(num_bufs, stream, temp_mr); auto d_dst_buf_info = partition_buf_size_and_dst_buf_info->d_dst_buf_info; auto h_buf_sizes = partition_buf_size_and_dst_buf_info->h_buf_sizes; thrust::transform( - rmm::exec_policy(stream, mr), + rmm::exec_policy(stream, temp_mr), d_dst_buf_info, d_dst_buf_info + num_bufs, batches.begin(), @@ -1753,8 +1780,14 @@ struct contiguous_split_state { return {num_batches, desired_batch_size}; }); - chunk_iter_state = make_chunk_iteration_state( - batches, num_bufs, d_dst_buf_info, h_buf_sizes, num_partitions, user_buffer_size, stream, mr); + chunk_iter_state = chunk_iteration_state::create(batches, + num_bufs, + d_dst_buf_info, + h_buf_sizes, + num_partitions, + user_buffer_size, + stream, + temp_mr); } std::vector contiguous_split() @@ -1788,7 +1821,7 @@ struct contiguous_split_state { chunk_iter_state->d_batched_dst_buf_info.begin(), [] __device__(dst_buf_info const& info) { return info.valid_count; }); - thrust::reduce_by_key(rmm::exec_policy(stream, mr), + thrust::reduce_by_key(rmm::exec_policy(stream, temp_mr), keys, keys + num_batches_total, values, @@ -1925,29 +1958,31 @@ struct contiguous_split_state { } cudf::table_view const input; - rmm::cuda_stream_view stream; - rmm::mr::device_memory_resource* mr; + std::size_t const user_buffer_size; + rmm::cuda_stream_view const stream; + rmm::mr::device_memory_resource* const mr; + rmm::mr::device_memory_resource* const temp_mr; - std::size_t num_partitions; + // whether the table was empty to begin with (0 rows or 0 columns) and should be metadata-only + bool const is_empty; - // number of source buffers including children * number of splits - std::size_t num_bufs; + std::size_t const num_partitions; // number of source buffers including children - size_type num_src_bufs; + size_type const num_src_bufs; + + // number of source buffers including children * number of splits + std::size_t const num_bufs; std::unique_ptr partition_buf_size_and_dst_buf_info; std::unique_ptr src_and_dst_pointers; - // whether the table was empty to begin with (0 rows or 0 columns) and should be metadata-only - bool is_empty; - // // State around the chunked pattern // - // chunked_pack will 1 or more "chunks" to iterate on, defined in chunk_iter_state + // chunked_pack will have 1 or more "chunks" to iterate on, defined in chunk_iter_state // contiguous_split will have a single "chunk" in chunk_iter_state, so no iteration. std::unique_ptr chunk_iter_state; @@ -1964,8 +1999,6 @@ struct contiguous_split_state { // each buffer. // std::vector out_buffers; - - std::size_t user_buffer_size; }; std::vector contiguous_split(cudf::table_view const& input, @@ -1973,7 +2006,10 @@ std::vector contiguous_split(cudf::table_view const& input, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - auto state = contiguous_split_state(input, splits, stream, mr); + // `temp_mr` is the same as `mr` for contiguous_split as it allocates all + // of its memory from the default memory resource in cuDF + auto temp_mr = mr; + auto state = contiguous_split_state(input, splits, stream, mr, temp_mr); return state.contiguous_split(); } @@ -1989,10 +2025,14 @@ std::vector contiguous_split(cudf::table_view const& input, chunked_pack::chunked_pack(cudf::table_view const& input, std::size_t user_buffer_size, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) { + CUDF_EXPECTS(user_buffer_size >= desired_batch_size, + "The output buffer size must be at least 1MB in size"); + // We pass `nullptr` for the first `mr` in `contiguous_split_state` to indicate + // that it does not allocate any user-bound data for the `chunked_pack` case. state = std::make_unique( - input, user_buffer_size, cudf::get_default_stream(), mr); + input, user_buffer_size, cudf::get_default_stream(), nullptr, temp_mr); } // required for the unique_ptr to work with a non-complete type (contiguous_split_state) @@ -2017,11 +2057,12 @@ std::unique_ptr> chunked_pack::build_metadata() const std::unique_ptr chunked_pack::create(cudf::table_view const& input, std::size_t user_buffer_size, - rmm::mr::device_memory_resource* mr) + rmm::mr::device_memory_resource* temp_mr) { - CUDF_EXPECTS(user_buffer_size >= desired_batch_size, - "The output buffer size must be at least 1MB in size"); - return std::make_unique(input, user_buffer_size, mr); + // `temp_mr` could be a special memory resource to be used in situations when + // GPU memory is low and we want scratch and temporary allocations to happen from + // a small reserved pool of memory. + return std::make_unique(input, user_buffer_size, temp_mr); } }; // namespace cudf