Skip to content

Commit

Permalink
Implement heuristic to determin which kernel to call
Browse files Browse the repository at this point in the history
Signed-off-by: Nghia Truong <nghiat@nvidia.com>
  • Loading branch information
ttnghia committed Jul 17, 2024
1 parent 9a861a9 commit e4d1352
Showing 1 changed file with 117 additions and 57 deletions.
174 changes: 117 additions & 57 deletions src/main/cpp/src/get_json_object.cu
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ __device__ thrust::pair<bool, cudf::size_type> evaluate_path(
}

/**
* @brief The json_path_query_data class
* @brief TODO
*/
struct json_path_query_data {
json_path_query_data(cudf::device_span<path_instruction const> _path,
Expand All @@ -811,6 +811,58 @@ struct json_path_query_data {
int8_t* has_out_of_bound;
};

__device__ __forceinline__ void process_row(cudf::column_device_view input,
cudf::device_span<json_path_query_data> query_data,
int64_t idx)
{
auto const row_idx = idx / query_data.size();
auto const query_idx = idx % query_data.size();
auto const& query = query_data[query_idx];

char* const dst = query.out_buf + query.offsets[row_idx];
bool is_valid = false;
cudf::size_type out_size = 0;

auto const str = input.element<cudf::string_view>(row_idx);
if (str.size_bytes() > 0) {
thrust::tie(is_valid, out_size) = evaluate_path(char_range{str}, query.path_commands, dst);

auto const max_size = query.offsets[row_idx + 1] - query.offsets[row_idx];
if (out_size > max_size) { *(query.has_out_of_bound) = 1; }
}

// Write out `nullptr` in the output string_view to indicate that the output is a null.
// The situation `out_stringviews == nullptr` should only happen if the kernel is launched a
// second time due to out-of-bound write in the first launch.
if (query.out_stringviews) {
query.out_stringviews[row_idx] = {is_valid ? dst : nullptr, out_size};
}
}

/**
* @brief Kernel for running the JSONPath query, processing one row per thread.
*
* This kernel writes out the output strings and their lengths at the same time. If any output
* length exceed buffer size limit, a boolean flag will be turned on to inform to the caller.
* In such situation, another (larger) output buffer will be generated and the kernel is launched
* again. Otherwise, launching this kernel only once is sufficient to produce the desired output.
*
* @param input The input JSON strings stored in a strings column
* @param query_data TODO
*/
template <int block_size, int min_block_per_sm>
__launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL
void get_json_object_kernel_row_per_thread(cudf::column_device_view input,
cudf::device_span<json_path_query_data> query_data)
{
auto const max_tid = static_cast<int64_t>(input.size()) * query_data.size();
auto const stride = cudf::detail::grid_1d::grid_stride();

for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < max_tid; tid += stride) {
process_row(input, query_data, tid);
}
}

/**
* @brief Kernel for running the JSONPath query.
*
Expand All @@ -827,16 +879,9 @@ struct json_path_query_data {
* @param has_out_of_bound Flag to indicate if any output string has length exceeds its buffer size
*/
template <int block_size, int min_block_per_sm>
// We have 1 for the minBlocksPerMultiprocessor in the launch bounds to avoid spilling from
// the kernel itself. By default NVCC uses a heuristic to find a balance between the
// maximum number of registers used by a kernel and the parallelism of the kernel.
// If lots of registers are used the parallelism may suffer. But in our case
// NVCC gets this wrong and we want to avoid spilling all the time or else
// the performance is really bad. This essentially tells NVCC to prefer using lots
// of registers over spilling.
__launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL
void get_json_object_kernel(cudf::column_device_view input,
cudf::device_span<json_path_query_data> query_data)
void get_json_object_kernel_row_per_warp(cudf::column_device_view input,
cudf::device_span<json_path_query_data> query_data)
{
auto const max_tid =
static_cast<int64_t>(input.size()) * cudf::detail::warp_size * query_data.size();
Expand All @@ -845,29 +890,8 @@ __launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL

for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < max_tid; tid += stride) {
if (lane_id == 0) {
auto const warp_idx = tid / cudf::detail::warp_size;
auto const row_idx = warp_idx / query_data.size();
auto const query_idx = warp_idx % query_data.size();
auto const& query = query_data[query_idx];

char* const dst = query.out_buf + query.offsets[row_idx];
bool is_valid = false;
cudf::size_type out_size = 0;

auto const str = input.element<cudf::string_view>(row_idx);
if (str.size_bytes() > 0) {
thrust::tie(is_valid, out_size) = evaluate_path(char_range{str}, query.path_commands, dst);

auto const max_size = query.offsets[row_idx + 1] - query.offsets[row_idx];
if (out_size > max_size) { *(query.has_out_of_bound) = 1; }
}

// Write out `nullptr` in the output string_view to indicate that the output is a null.
// The situation `out_stringviews == nullptr` should only happen if the kernel is launched a
// second time due to out-of-bound write in the first launch.
if (query.out_stringviews) {
query.out_stringviews[row_idx] = {is_valid ? dst : nullptr, out_size};
}
auto const warp_idx = tid / cudf::detail::warp_size;
process_row(input, query_data, warp_idx);
} // done lane_id == 0
__syncwarp();
}
Expand Down Expand Up @@ -938,6 +962,43 @@ construct_path_commands(
std::move(h_inst_names)};
}

void launch_kernel(bool exec_row_per_thread,
cudf::column_device_view const& input,
cudf::device_span<json_path_query_data> query_data,
rmm::cuda_stream_view stream)
{
auto const get_SM_count = []() {
int device_id{};
cudaDeviceProp props{};
CUDF_CUDA_TRY(cudaGetDevice(&device_id));
CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id));
return props.multiProcessorCount;
};

// We explicitly set the minBlocksPerMultiprocessor parameter in the launch bounds to avoid
// spilling from the kernel itself. By default NVCC uses a heuristic to find a balance between
// the maximum number of registers used by a kernel and the parallelism of the kernel.
// If lots of registers are used the parallelism may suffer. But in our case
// NVCC gets this wrong and we want to avoid spilling all the time or else
// the performance is really bad. This essentially tells NVCC to prefer using lots
// of registers over spilling.
if (exec_row_per_thread) {
constexpr int block_size = 256;
constexpr int min_block_per_sm = 1;
constexpr int block_count_multiplier = 1;
static auto const num_blocks = get_SM_count() * block_count_multiplier;
get_json_object_kernel_row_per_thread<block_size, min_block_per_sm>
<<<num_blocks, block_size, 0, stream.value()>>>(input, query_data);
} else {
constexpr int block_size = 512;
constexpr int min_block_per_sm = 2;
constexpr int block_count_multiplier = 8;
static auto const num_blocks = get_SM_count() * block_count_multiplier;
get_json_object_kernel_row_per_warp<block_size, min_block_per_sm>
<<<num_blocks, block_size, 0, stream.value()>>>(input, query_data);
}
}

std::vector<std::unique_ptr<cudf::column>> get_json_object(
cudf::strings_column_view const& input,
std::vector<std::vector<std::tuple<path_instruction_type, std::string, int64_t>>> const&
Expand All @@ -958,21 +1019,28 @@ std::vector<std::unique_ptr<cudf::column>> get_json_object(
auto const in_offsets =
cudf::detail::offsetalator_factory::make_input_iterator(input.offsets(), input.offset());

auto const [max_row_size, sum_row_size] =
thrust::transform_reduce(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.size()),
cuda::proclaim_return_type<thrust::pair<int64_t, int64_t>>(
[in_offsets] __device__(auto const idx) {
auto const size = in_offsets[idx + 1] - in_offsets[idx];
return thrust::pair<int64_t, int64_t>{size, size};
}),
thrust::pair<int64_t, int64_t>{0, 0},
cuda::proclaim_return_type<thrust::pair<int64_t, int64_t>>(
[] __device__(auto const& lhs, auto const& rhs) {
return thrust::pair<int64_t, int64_t>{
std::max(lhs.first, rhs.first), lhs.second + rhs.second};
}));

// A buffer to store the output strings without knowing their sizes.
// Since we do not know their sizes, we need to allocate the buffer a bit larger than the input
// size so that we will not write output strings into an out-of-bound position.
// Checking out-of-bound needs to be performed in the main kernel to make sure we will not have
// data corruption.
auto const scratch_size = [&] {
auto const max_row_size = thrust::transform_reduce(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.size()),
cuda::proclaim_return_type<int64_t>(
[in_offsets] __device__(auto const idx) { return in_offsets[idx + 1] - in_offsets[idx]; }),
int64_t{0},
thrust::maximum{});

auto const scratch_size = [&, max_row_size = max_row_size] {
// Pad the scratch buffer by an additional size that is a multiple of max row size.
auto constexpr padding_rows = 10;
return input.chars_size(stream) + max_row_size * padding_rows;
Expand Down Expand Up @@ -1009,18 +1077,11 @@ std::vector<std::unique_ptr<cudf::column>> get_json_object(
thrust::uninitialized_fill(
rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0);

constexpr int block_size = 512;
constexpr int min_block_per_sm = 2;
constexpr int blocks_count_multiplier = 8;
auto const num_blocks = [&] {
int device_id{};
cudaDeviceProp props{};
CUDF_CUDA_TRY(cudaGetDevice(&device_id));
CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id));
return props.multiProcessorCount * blocks_count_multiplier;
}();
get_json_object_kernel<block_size, min_block_per_sm>
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr, d_query_data);
// Threshold to decide on using row per thread or row per warp functions.
constexpr int64_t AVG_CHAR_BYTES_THRESHOLD = 128;
auto const exec_row_per_thread =
(sum_row_size / (input.size() - input.null_count())) < AVG_CHAR_BYTES_THRESHOLD;
launch_kernel(exec_row_per_thread, *d_input_ptr, d_query_data, stream);

auto h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream);
auto has_no_oob = std::none_of(
Expand Down Expand Up @@ -1090,8 +1151,7 @@ std::vector<std::unique_ptr<cudf::column>> get_json_object(
thrust::uninitialized_fill(
rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0);

get_json_object_kernel<block_size, min_block_per_sm>
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr, d_query_data);
launch_kernel(exec_row_per_thread, *d_input_ptr, d_query_data, stream);

// Check out of bound again for sure.
h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream);
Expand Down

0 comments on commit e4d1352

Please sign in to comment.