Skip to content

Commit

Permalink
Refactor Parquet kernel_error (#14464)
Browse files Browse the repository at this point in the history
While reviewing #14453 it became clear that the typing for error codes was inconsistent with their use as bitmasks. This PR changes the typing of `kernel_error::error_code` to unsigned, and adds type aliases to make any future changes to `kernel_error` easier to implement.

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #14464
  • Loading branch information
etseidl authored Nov 29, 2023
1 parent e696941 commit 75d5978
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 33 deletions.
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ namespace cudf::io::parquet {
* the object's lifetime.
*/
class kernel_error {
public:
using value_type = uint32_t;
using pointer = value_type*;

private:
rmm::device_scalar<int32_t> _error_code;
rmm::device_scalar<value_type> _error_code;

public:
/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
Expand Down Expand Up @@ -622,7 +622,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "error.hpp"
#include "parquet_gpu.hpp"
#include "rle_stream.cuh"

Expand Down Expand Up @@ -44,7 +45,7 @@ struct page_state_s {
int32_t dict_val{};
uint32_t initial_rle_run[NUM_LEVEL_TYPES]{}; // [def,rep]
int32_t initial_rle_value[NUM_LEVEL_TYPES]{}; // [def,rep]
int32_t error{};
kernel_error::value_type error{};
PageInfo page{};
ColumnChunkDesc col{};

Expand Down Expand Up @@ -73,13 +74,13 @@ struct page_state_s {

inline __device__ void set_error_code(decode_error err)
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{error};
ref.fetch_or(static_cast<int32_t>(err), cuda::std::memory_order_relaxed);
cuda::atomic_ref<kernel_error::value_type, cuda::thread_scope_block> ref{error};
ref.fetch_or(static_cast<kernel_error::value_type>(err), cuda::std::memory_order_relaxed);
}

inline __device__ void reset_error_code()
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{error};
cuda::atomic_ref<kernel_error::value_type, cuda::thread_scope_block> ref{error};
ref.store(0, cuda::std::memory_order_release);
}
};
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ __global__ void __launch_bounds__(96)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
Expand Down Expand Up @@ -346,7 +346,8 @@ __global__ void __launch_bounds__(96)

auto const batch_size = db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code);
set_error(static_cast<kernel_error::value_type>(decode_error::DELTA_PARAMS_UNSUPPORTED),
error_code);
return;
}

Expand Down Expand Up @@ -428,7 +429,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_byte_array_decoder db_state;
Expand Down Expand Up @@ -475,7 +476,8 @@ __global__ void __launch_bounds__(decode_block_size)
if (prefix_db->values_per_mb != suffix_db->values_per_mb or
prefix_db->block_size != suffix_db->block_size or
prefix_db->value_count != suffix_db->value_count) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAM_MISMATCH), error_code);
set_error(static_cast<kernel_error::value_type>(decode_error::DELTA_PARAM_MISMATCH),
error_code);
return;
}

Expand All @@ -485,7 +487,8 @@ __global__ void __launch_bounds__(decode_block_size)

auto const batch_size = prefix_db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code);
set_error(static_cast<kernel_error::value_type>(decode_error::DELTA_PARAMS_UNSUPPORTED),
error_code);
return;
}

Expand Down Expand Up @@ -567,10 +570,7 @@ __global__ void __launch_bounds__(decode_block_size)
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace
Expand All @@ -583,7 +583,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -608,7 +608,7 @@ void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "error.hpp"
#include "parquet_gpu.hpp"
#include <io/utilities/block_utils.cuh>

Expand Down Expand Up @@ -345,14 +346,16 @@ struct gpuParsePageHeader {
* @param[in] num_chunks Number of column chunks
*/
// blockDim {128,1,1}
__global__ void __launch_bounds__(128)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, int32_t* error_code)
__global__ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
gpuParsePageHeader parse_page_header;
__shared__ byte_stream_s bs_g[4];

int32_t error[4] = {0};
kernel_error::value_type error[4] = {0};

auto const lane_id = threadIdx.x % warp_size;
auto const warp_id = threadIdx.x / warp_size;
auto const chunk = (blockIdx.x * 4) + warp_id;
Expand Down Expand Up @@ -440,7 +443,8 @@ __global__ void __launch_bounds__(128)
bs->page.page_data = const_cast<uint8_t*>(bs->cur);
bs->cur += bs->page.compressed_page_size;
if (bs->cur > bs->end) {
error[warp_id] |= static_cast<int32_t>(decode_error::DATA_STREAM_OVERRUN);
error[warp_id] |=
static_cast<kernel_error::value_type>(decode_error::DATA_STREAM_OVERRUN);
}
bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck);
} else {
Expand Down Expand Up @@ -513,7 +517,7 @@ __global__ void __launch_bounds__(128)

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "delta_binary.cuh"
#include "error.hpp"
#include "page_decode.cuh"
#include "page_string_utils.cuh"

Expand Down Expand Up @@ -784,7 +785,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) page_state_s state_g;
Expand Down Expand Up @@ -1057,7 +1058,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include "error.hpp"

#include "io/comp/gpuinflate.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_common.hpp"
Expand Down Expand Up @@ -74,10 +76,10 @@ constexpr bool is_supported_encoding(Encoding enc)
/**
* @brief Atomically OR `error` into `error_code`.
*/
constexpr void set_error(int32_t error, int32_t* error_code)
constexpr void set_error(kernel_error::value_type error, kernel_error::pointer error_code)
{
if (error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
cuda::atomic_ref<kernel_error::value_type, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(error, cuda::std::memory_order_relaxed);
}
}
Expand All @@ -87,7 +89,7 @@ constexpr void set_error(int32_t error, int32_t* error_code)
*
* These values are used as bitmasks, so they must be powers of 2.
*/
enum class decode_error : int32_t {
enum class decode_error : kernel_error::value_type {
DATA_STREAM_OVERRUN = 0x1,
LEVEL_STREAM_OVERRUN = 0x2,
UNSUPPORTED_ENCODING = 0x4,
Expand Down Expand Up @@ -549,7 +551,7 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
*/
void DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
Expand Down Expand Up @@ -655,7 +657,7 @@ void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -677,7 +679,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -699,7 +701,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -721,7 +723,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

/**
Expand Down

0 comments on commit 75d5978

Please sign in to comment.