diff --git a/cpp/src/io/parquet/delta_enc.cuh b/cpp/src/io/parquet/delta_enc.cuh index b0a7493fcab..cbb44d30a56 100644 --- a/cpp/src/io/parquet/delta_enc.cuh +++ b/cpp/src/io/parquet/delta_enc.cuh @@ -57,10 +57,6 @@ constexpr int buffer_size = 2 * block_size; static_assert(block_size % 128 == 0); static_assert(values_per_mini_block % 32 == 0); -using block_reduce = cub::BlockReduce; -using warp_reduce = cub::WarpReduce; -using index_scan = cub::BlockScan; - constexpr int rolling_idx(int index) { return rolling_index(index); } // Version of bit packer that can handle up to 64 bits values. @@ -128,9 +124,15 @@ inline __device__ void bitpack_mini_block( // Object used to turn a stream of integers into a DELTA_BINARY_PACKED stream. This takes as input // 128 values with validity at a time, saving them until there are enough values for a block // to be written. -// T is the input data type (either zigzag128_t or uleb128_t). +// T is the input data type (either int32_t or int64_t). template class delta_binary_packer { + public: + using U = std::make_unsigned_t; + using block_reduce = cub::BlockReduce; + using warp_reduce = cub::WarpReduce; + using index_scan = cub::BlockScan; + private: uint8_t* _dst; // sink to dump encoded values to T* _buffer; // buffer to store values to be encoded @@ -140,9 +142,9 @@ class delta_binary_packer { uint8_t _mb_bits[delta::num_mini_blocks]; // bitwidth for each mini-block // pointers to shared scratch memory for the warp and block scans/reduces - delta::index_scan::TempStorage* _scan_tmp; - delta::warp_reduce::TempStorage* _warp_tmp; - delta::block_reduce::TempStorage* _block_tmp; + index_scan::TempStorage* _scan_tmp; + typename warp_reduce::TempStorage* _warp_tmp; + typename block_reduce::TempStorage* _block_tmp; void* _bitpack_tmp; // pointer to shared scratch memory used in bitpacking @@ -164,9 +166,9 @@ class delta_binary_packer { } // Signed subtraction with defined wrapping behavior. - inline __device__ zigzag128_t subtract(zigzag128_t a, zigzag128_t b) + inline __device__ T subtract(T a, T b) { - return static_cast(static_cast(a) - static_cast(b)); + return static_cast(static_cast(a) - static_cast(b)); } public: @@ -178,9 +180,9 @@ class delta_binary_packer { _dst = dest; _num_values = num_values; _buffer = buffer; - _scan_tmp = reinterpret_cast(temp_storage); - _warp_tmp = reinterpret_cast(temp_storage); - _block_tmp = reinterpret_cast(temp_storage); + _scan_tmp = reinterpret_cast(temp_storage); + _warp_tmp = reinterpret_cast(temp_storage); + _block_tmp = reinterpret_cast(temp_storage); _bitpack_tmp = _buffer + delta::buffer_size; _current_idx = 0; _values_in_buffer = 0; @@ -193,7 +195,7 @@ class delta_binary_packer { size_type const valid = is_valid; size_type pos; size_type num_valid; - delta::index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid); + index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid); if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; } __syncthreads(); @@ -216,7 +218,7 @@ class delta_binary_packer { inline __device__ uint8_t const* flush() { using cudf::detail::warp_size; - __shared__ zigzag128_t block_min; + __shared__ T block_min; int const t = threadIdx.x; int const warp_id = t / warp_size; @@ -225,27 +227,26 @@ class delta_binary_packer { if (_values_in_buffer <= 0) { return _dst; } // Calculate delta for this thread. - size_type const idx = _current_idx + t; - zigzag128_t const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)], - _buffer[delta::rolling_idx(idx - 1)]) - : std::numeric_limits::max(); + size_type const idx = _current_idx + t; + T const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)], + _buffer[delta::rolling_idx(idx - 1)]) + : std::numeric_limits::max(); // Find min delta for the block. - auto const min_delta = delta::block_reduce(*_block_tmp).Reduce(delta, cub::Min()); + auto const min_delta = block_reduce(*_block_tmp).Reduce(delta, cub::Min()); if (t == 0) { block_min = min_delta; } __syncthreads(); // Compute frame of reference for the block. - uleb128_t const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0; + U const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0; // Get max normalized delta for each warp, and use that to determine how many bits to use // for the bitpacking of this warp. - zigzag128_t const warp_max = - delta::warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max()); + U const warp_max = warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max()); __syncwarp(); - if (lane_id == 0) { _mb_bits[warp_id] = sizeof(zigzag128_t) * 8 - __clzll(warp_max); } + if (lane_id == 0) { _mb_bits[warp_id] = sizeof(long long) * 8 - __clzll(warp_max); } __syncthreads(); // write block header diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d75608930d5..ba751548e3f 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -251,14 +251,15 @@ struct BitwiseOr { } }; -// I is the column type from the input table -template +// PT is the parquet physical type (INT32 or INT64). +// I is the column type from the input table. +template __device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, uint32_t valid_count, uint64_t* buffer, void* temp_space) { - using output_type = std::conditional_t, zigzag128_t, uleb128_t>; + using output_type = std::conditional_t; __shared__ delta_binary_packer packer; auto const t = threadIdx.x; @@ -1711,9 +1712,10 @@ __global__ void __launch_bounds__(block_size, 8) using block_reduce = cub::BlockReduce; __shared__ union { typename block_reduce::TempStorage reduce_storage; - typename delta::index_scan::TempStorage delta_index_tmp; - typename delta::block_reduce::TempStorage delta_reduce_tmp; - typename delta::warp_reduce::TempStorage delta_warp_red_tmp[delta::num_mini_blocks]; + typename delta_binary_packer::index_scan::TempStorage delta_index_tmp; + typename delta_binary_packer::block_reduce::TempStorage delta_reduce_tmp; + typename delta_binary_packer::warp_reduce::TempStorage + delta_warp_red_tmp[delta::num_mini_blocks]; } temp_storage; auto* const s = &state_g; @@ -1784,30 +1786,30 @@ __global__ void __launch_bounds__(block_size, 8) switch (dtype_len_in) { case 8: { // only DURATIONS map to 8 bytes, so safe to just use signed here? - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); break; } case 4: { if (type_id == type_id::UINT32) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } break; } case 2: { if (type_id == type_id::UINT16) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } break; } case 1: { if (type_id == type_id::UINT8) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } break; } @@ -1815,9 +1817,9 @@ __global__ void __launch_bounds__(block_size, 8) } } else { if (type_id == type_id::UINT64) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } } diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index b3a06dbd742..c5da03d2942 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1293,7 +1293,7 @@ def delta_num_rows(): return [1, 2, 23, 32, 33, 34, 64, 65, 66, 128, 129, 130, 20000, 50000] -@pytest.mark.parametrize("nrows", [1, 100000]) +@pytest.mark.parametrize("nrows", delta_num_rows()) @pytest.mark.parametrize("add_nulls", [True, False]) @pytest.mark.parametrize( "dtype", @@ -1346,18 +1346,8 @@ def test_delta_binary(nrows, add_nulls, dtype, tmpdir): use_dictionary=False, ) - # FIXME(ets): should probably not use more bits than the data type - try: - cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) - except OSError as e: - if dtype == "int32" and nrows == 100000: - pytest.mark.xfail( - reason="arrow does not support 33-bit delta encoding" - ) - else: - raise e - else: - assert_eq(cdf2, cdf) + cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) + assert_eq(cdf2, cdf) @pytest.mark.parametrize("nrows", delta_num_rows())