From e4d13527c70fd2f4f312508416d7a7bef6ded60c Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 17 Jul 2024 00:01:21 -0700 Subject: [PATCH] Implement heuristic to determin which kernel to call Signed-off-by: Nghia Truong --- src/main/cpp/src/get_json_object.cu | 174 +++++++++++++++++++--------- 1 file changed, 117 insertions(+), 57 deletions(-) diff --git a/src/main/cpp/src/get_json_object.cu b/src/main/cpp/src/get_json_object.cu index 67a290b784..f0f695ef83 100644 --- a/src/main/cpp/src/get_json_object.cu +++ b/src/main/cpp/src/get_json_object.cu @@ -789,7 +789,7 @@ __device__ thrust::pair evaluate_path( } /** - * @brief The json_path_query_data class + * @brief TODO */ struct json_path_query_data { json_path_query_data(cudf::device_span _path, @@ -811,6 +811,58 @@ struct json_path_query_data { int8_t* has_out_of_bound; }; +__device__ __forceinline__ void process_row(cudf::column_device_view input, + cudf::device_span query_data, + int64_t idx) +{ + auto const row_idx = idx / query_data.size(); + auto const query_idx = idx % query_data.size(); + auto const& query = query_data[query_idx]; + + char* const dst = query.out_buf + query.offsets[row_idx]; + bool is_valid = false; + cudf::size_type out_size = 0; + + auto const str = input.element(row_idx); + if (str.size_bytes() > 0) { + thrust::tie(is_valid, out_size) = evaluate_path(char_range{str}, query.path_commands, dst); + + auto const max_size = query.offsets[row_idx + 1] - query.offsets[row_idx]; + if (out_size > max_size) { *(query.has_out_of_bound) = 1; } + } + + // Write out `nullptr` in the output string_view to indicate that the output is a null. + // The situation `out_stringviews == nullptr` should only happen if the kernel is launched a + // second time due to out-of-bound write in the first launch. + if (query.out_stringviews) { + query.out_stringviews[row_idx] = {is_valid ? dst : nullptr, out_size}; + } +} + +/** + * @brief Kernel for running the JSONPath query, processing one row per thread. + * + * This kernel writes out the output strings and their lengths at the same time. If any output + * length exceed buffer size limit, a boolean flag will be turned on to inform to the caller. + * In such situation, another (larger) output buffer will be generated and the kernel is launched + * again. Otherwise, launching this kernel only once is sufficient to produce the desired output. + * + * @param input The input JSON strings stored in a strings column + * @param query_data TODO + */ +template +__launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL + void get_json_object_kernel_row_per_thread(cudf::column_device_view input, + cudf::device_span query_data) +{ + auto const max_tid = static_cast(input.size()) * query_data.size(); + auto const stride = cudf::detail::grid_1d::grid_stride(); + + for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < max_tid; tid += stride) { + process_row(input, query_data, tid); + } +} + /** * @brief Kernel for running the JSONPath query. * @@ -827,16 +879,9 @@ struct json_path_query_data { * @param has_out_of_bound Flag to indicate if any output string has length exceeds its buffer size */ template -// We have 1 for the minBlocksPerMultiprocessor in the launch bounds to avoid spilling from -// the kernel itself. By default NVCC uses a heuristic to find a balance between the -// maximum number of registers used by a kernel and the parallelism of the kernel. -// If lots of registers are used the parallelism may suffer. But in our case -// NVCC gets this wrong and we want to avoid spilling all the time or else -// the performance is really bad. This essentially tells NVCC to prefer using lots -// of registers over spilling. __launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL - void get_json_object_kernel(cudf::column_device_view input, - cudf::device_span query_data) + void get_json_object_kernel_row_per_warp(cudf::column_device_view input, + cudf::device_span query_data) { auto const max_tid = static_cast(input.size()) * cudf::detail::warp_size * query_data.size(); @@ -845,29 +890,8 @@ __launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < max_tid; tid += stride) { if (lane_id == 0) { - auto const warp_idx = tid / cudf::detail::warp_size; - auto const row_idx = warp_idx / query_data.size(); - auto const query_idx = warp_idx % query_data.size(); - auto const& query = query_data[query_idx]; - - char* const dst = query.out_buf + query.offsets[row_idx]; - bool is_valid = false; - cudf::size_type out_size = 0; - - auto const str = input.element(row_idx); - if (str.size_bytes() > 0) { - thrust::tie(is_valid, out_size) = evaluate_path(char_range{str}, query.path_commands, dst); - - auto const max_size = query.offsets[row_idx + 1] - query.offsets[row_idx]; - if (out_size > max_size) { *(query.has_out_of_bound) = 1; } - } - - // Write out `nullptr` in the output string_view to indicate that the output is a null. - // The situation `out_stringviews == nullptr` should only happen if the kernel is launched a - // second time due to out-of-bound write in the first launch. - if (query.out_stringviews) { - query.out_stringviews[row_idx] = {is_valid ? dst : nullptr, out_size}; - } + auto const warp_idx = tid / cudf::detail::warp_size; + process_row(input, query_data, warp_idx); } // done lane_id == 0 __syncwarp(); } @@ -938,6 +962,43 @@ construct_path_commands( std::move(h_inst_names)}; } +void launch_kernel(bool exec_row_per_thread, + cudf::column_device_view const& input, + cudf::device_span query_data, + rmm::cuda_stream_view stream) +{ + auto const get_SM_count = []() { + int device_id{}; + cudaDeviceProp props{}; + CUDF_CUDA_TRY(cudaGetDevice(&device_id)); + CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id)); + return props.multiProcessorCount; + }; + + // We explicitly set the minBlocksPerMultiprocessor parameter in the launch bounds to avoid + // spilling from the kernel itself. By default NVCC uses a heuristic to find a balance between + // the maximum number of registers used by a kernel and the parallelism of the kernel. + // If lots of registers are used the parallelism may suffer. But in our case + // NVCC gets this wrong and we want to avoid spilling all the time or else + // the performance is really bad. This essentially tells NVCC to prefer using lots + // of registers over spilling. + if (exec_row_per_thread) { + constexpr int block_size = 256; + constexpr int min_block_per_sm = 1; + constexpr int block_count_multiplier = 1; + static auto const num_blocks = get_SM_count() * block_count_multiplier; + get_json_object_kernel_row_per_thread + <<>>(input, query_data); + } else { + constexpr int block_size = 512; + constexpr int min_block_per_sm = 2; + constexpr int block_count_multiplier = 8; + static auto const num_blocks = get_SM_count() * block_count_multiplier; + get_json_object_kernel_row_per_warp + <<>>(input, query_data); + } +} + std::vector> get_json_object( cudf::strings_column_view const& input, std::vector>> const& @@ -958,21 +1019,28 @@ std::vector> get_json_object( auto const in_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets(), input.offset()); + auto const [max_row_size, sum_row_size] = + thrust::transform_reduce(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.size()), + cuda::proclaim_return_type>( + [in_offsets] __device__(auto const idx) { + auto const size = in_offsets[idx + 1] - in_offsets[idx]; + return thrust::pair{size, size}; + }), + thrust::pair{0, 0}, + cuda::proclaim_return_type>( + [] __device__(auto const& lhs, auto const& rhs) { + return thrust::pair{ + std::max(lhs.first, rhs.first), lhs.second + rhs.second}; + })); + // A buffer to store the output strings without knowing their sizes. // Since we do not know their sizes, we need to allocate the buffer a bit larger than the input // size so that we will not write output strings into an out-of-bound position. // Checking out-of-bound needs to be performed in the main kernel to make sure we will not have // data corruption. - auto const scratch_size = [&] { - auto const max_row_size = thrust::transform_reduce( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.size()), - cuda::proclaim_return_type( - [in_offsets] __device__(auto const idx) { return in_offsets[idx + 1] - in_offsets[idx]; }), - int64_t{0}, - thrust::maximum{}); - + auto const scratch_size = [&, max_row_size = max_row_size] { // Pad the scratch buffer by an additional size that is a multiple of max row size. auto constexpr padding_rows = 10; return input.chars_size(stream) + max_row_size * padding_rows; @@ -1009,18 +1077,11 @@ std::vector> get_json_object( thrust::uninitialized_fill( rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0); - constexpr int block_size = 512; - constexpr int min_block_per_sm = 2; - constexpr int blocks_count_multiplier = 8; - auto const num_blocks = [&] { - int device_id{}; - cudaDeviceProp props{}; - CUDF_CUDA_TRY(cudaGetDevice(&device_id)); - CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id)); - return props.multiProcessorCount * blocks_count_multiplier; - }(); - get_json_object_kernel - <<>>(*d_input_ptr, d_query_data); + // Threshold to decide on using row per thread or row per warp functions. + constexpr int64_t AVG_CHAR_BYTES_THRESHOLD = 128; + auto const exec_row_per_thread = + (sum_row_size / (input.size() - input.null_count())) < AVG_CHAR_BYTES_THRESHOLD; + launch_kernel(exec_row_per_thread, *d_input_ptr, d_query_data, stream); auto h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream); auto has_no_oob = std::none_of( @@ -1090,8 +1151,7 @@ std::vector> get_json_object( thrust::uninitialized_fill( rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0); - get_json_object_kernel - <<>>(*d_input_ptr, d_query_data); + launch_kernel(exec_row_per_thread, *d_input_ptr, d_query_data, stream); // Check out of bound again for sure. h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream);