diff --git a/cpp/src/io/parquet/error.hpp b/cpp/src/io/parquet/error.hpp index 92b5eebe9fd..bff0713a1ef 100644 --- a/cpp/src/io/parquet/error.hpp +++ b/cpp/src/io/parquet/error.hpp @@ -32,8 +32,12 @@ namespace cudf::io::parquet { * the object's lifetime. */ class kernel_error { + public: + using value_type = uint32_t; + using pointer = value_type*; + private: - rmm::device_scalar _error_code; + rmm::device_scalar _error_code; public: /** diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 1a94f05498e..12bf5e860fe 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -426,7 +426,7 @@ __global__ void __launch_bounds__(decode_block_size) device_span chunks, size_t min_row, size_t num_rows, - int32_t* error_code) + kernel_error::pointer error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) @@ -622,7 +622,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index a521f4af039..c1f9cb87b43 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -16,6 +16,7 @@ #pragma once +#include "error.hpp" #include "parquet_gpu.hpp" #include "rle_stream.cuh" @@ -44,7 +45,7 @@ struct page_state_s { int32_t dict_val{}; uint32_t initial_rle_run[NUM_LEVEL_TYPES]{}; // [def,rep] int32_t initial_rle_value[NUM_LEVEL_TYPES]{}; // [def,rep] - int32_t error{}; + kernel_error::value_type error{}; PageInfo page{}; ColumnChunkDesc col{}; @@ -73,13 +74,13 @@ struct page_state_s { inline __device__ void set_error_code(decode_error err) { - cuda::atomic_ref ref{error}; - ref.fetch_or(static_cast(err), cuda::std::memory_order_relaxed); + cuda::atomic_ref ref{error}; + ref.fetch_or(static_cast(err), cuda::std::memory_order_relaxed); } inline __device__ void reset_error_code() { - cuda::atomic_ref ref{error}; + cuda::atomic_ref ref{error}; ref.store(0, cuda::std::memory_order_release); } }; diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index bc025c6fc3e..5c8dcaad422 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -308,7 +308,7 @@ __global__ void __launch_bounds__(96) device_span chunks, size_t min_row, size_t num_rows, - int32_t* error_code) + kernel_error::pointer error_code) { using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; @@ -346,7 +346,8 @@ __global__ void __launch_bounds__(96) auto const batch_size = db->values_per_mb; if (batch_size > max_delta_mini_block_size) { - set_error(static_cast(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code); + set_error(static_cast(decode_error::DELTA_PARAMS_UNSUPPORTED), + error_code); return; } @@ -428,7 +429,7 @@ __global__ void __launch_bounds__(decode_block_size) device_span chunks, size_t min_row, size_t num_rows, - int32_t* error_code) + kernel_error::pointer error_code) { using cudf::detail::warp_size; __shared__ __align__(16) delta_byte_array_decoder db_state; @@ -475,7 +476,8 @@ __global__ void __launch_bounds__(decode_block_size) if (prefix_db->values_per_mb != suffix_db->values_per_mb or prefix_db->block_size != suffix_db->block_size or prefix_db->value_count != suffix_db->value_count) { - set_error(static_cast(decode_error::DELTA_PARAM_MISMATCH), error_code); + set_error(static_cast(decode_error::DELTA_PARAM_MISMATCH), + error_code); return; } @@ -485,7 +487,8 @@ __global__ void __launch_bounds__(decode_block_size) auto const batch_size = prefix_db->values_per_mb; if (batch_size > max_delta_mini_block_size) { - set_error(static_cast(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code); + set_error(static_cast(decode_error::DELTA_PARAMS_UNSUPPORTED), + error_code); return; } @@ -567,10 +570,7 @@ __global__ void __launch_bounds__(decode_block_size) auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); block_excl_sum(offptr, value_count, s->page.str_offset); - if (t == 0 and s->error != 0) { - cuda::atomic_ref ref{*error_code}; - ref.fetch_or(s->error, cuda::std::memory_order_relaxed); - } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } } // anonymous namespace @@ -583,7 +583,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -608,7 +608,7 @@ void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector& pa size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 595dd40cdc2..114e47aa507 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "error.hpp" #include "parquet_gpu.hpp" #include @@ -345,14 +346,16 @@ struct gpuParsePageHeader { * @param[in] num_chunks Number of column chunks */ // blockDim {128,1,1} -__global__ void __launch_bounds__(128) - gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, int32_t* error_code) +__global__ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, + int32_t num_chunks, + kernel_error::pointer error_code) { using cudf::detail::warp_size; gpuParsePageHeader parse_page_header; __shared__ byte_stream_s bs_g[4]; - int32_t error[4] = {0}; + kernel_error::value_type error[4] = {0}; + auto const lane_id = threadIdx.x % warp_size; auto const warp_id = threadIdx.x / warp_size; auto const chunk = (blockIdx.x * 4) + warp_id; @@ -440,7 +443,8 @@ __global__ void __launch_bounds__(128) bs->page.page_data = const_cast(bs->cur); bs->cur += bs->page.compressed_page_size; if (bs->cur > bs->end) { - error[warp_id] |= static_cast(decode_error::DATA_STREAM_OVERRUN); + error[warp_id] |= + static_cast(decode_error::DATA_STREAM_OVERRUN); } bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck); } else { @@ -513,7 +517,7 @@ __global__ void __launch_bounds__(128) void __host__ DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 916eaa3d681..dbd69515d9a 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -15,6 +15,7 @@ */ #include "delta_binary.cuh" +#include "error.hpp" #include "page_decode.cuh" #include "page_string_utils.cuh" @@ -784,7 +785,7 @@ __global__ void __launch_bounds__(decode_block_size) device_span chunks, size_t min_row, size_t num_rows, - int32_t* error_code) + kernel_error::pointer error_code) { using cudf::detail::warp_size; __shared__ __align__(16) page_state_s state_g; @@ -1057,7 +1058,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector& pa size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 129d4e4d28c..25323cfaa9e 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -16,6 +16,8 @@ #pragma once +#include "error.hpp" + #include "io/comp/gpuinflate.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_common.hpp" @@ -74,10 +76,10 @@ constexpr bool is_supported_encoding(Encoding enc) /** * @brief Atomically OR `error` into `error_code`. */ -constexpr void set_error(int32_t error, int32_t* error_code) +constexpr void set_error(kernel_error::value_type error, kernel_error::pointer error_code) { if (error != 0) { - cuda::atomic_ref ref{*error_code}; + cuda::atomic_ref ref{*error_code}; ref.fetch_or(error, cuda::std::memory_order_relaxed); } } @@ -87,7 +89,7 @@ constexpr void set_error(int32_t error, int32_t* error_code) * * These values are used as bitmasks, so they must be powers of 2. */ -enum class decode_error : int32_t { +enum class decode_error : kernel_error::value_type { DATA_STREAM_OVERRUN = 0x1, LEVEL_STREAM_OVERRUN = 0x2, UNSUPPORTED_ENCODING = 0x4, @@ -549,7 +551,7 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk) */ void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream); /** @@ -655,7 +657,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream); /** @@ -677,7 +679,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream); /** @@ -699,7 +701,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream); /** @@ -721,7 +723,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int32_t* error_code, + kernel_error::pointer error_code, rmm::cuda_stream_view stream); /**