Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit DELTA_BINARY_PACKED encoder to the same number of bits as the physical type being encoded #14392

Merged
merged 15 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ constexpr int buffer_size = 2 * block_size;
static_assert(block_size % 128 == 0);
static_assert(values_per_mini_block % 32 == 0);

using block_reduce = cub::BlockReduce<zigzag128_t, block_size>;
using warp_reduce = cub::WarpReduce<uleb128_t>;
using index_scan = cub::BlockScan<size_type, block_size>;

constexpr int rolling_idx(int index) { return rolling_index<buffer_size>(index); }

// Version of bit packer that can handle up to 64 bits values.
Expand Down Expand Up @@ -128,9 +124,15 @@ inline __device__ void bitpack_mini_block(
// Object used to turn a stream of integers into a DELTA_BINARY_PACKED stream. This takes as input
// 128 values with validity at a time, saving them until there are enough values for a block
// to be written.
// T is the input data type (either zigzag128_t or uleb128_t).
// T is the input data type (either int32_t or int64_t).
template <typename T>
class delta_binary_packer {
public:
using U = std::make_unsigned_t<T>;
etseidl marked this conversation as resolved.
Show resolved Hide resolved
using block_reduce = cub::BlockReduce<T, delta::block_size>;
using warp_reduce = cub::WarpReduce<U>;
using index_scan = cub::BlockScan<size_type, delta::block_size>;

private:
uint8_t* _dst; // sink to dump encoded values to
T* _buffer; // buffer to store values to be encoded
Expand All @@ -140,9 +142,9 @@ class delta_binary_packer {
uint8_t _mb_bits[delta::num_mini_blocks]; // bitwidth for each mini-block

// pointers to shared scratch memory for the warp and block scans/reduces
delta::index_scan::TempStorage* _scan_tmp;
delta::warp_reduce::TempStorage* _warp_tmp;
delta::block_reduce::TempStorage* _block_tmp;
index_scan::TempStorage* _scan_tmp;
typename warp_reduce::TempStorage* _warp_tmp;
typename block_reduce::TempStorage* _block_tmp;

void* _bitpack_tmp; // pointer to shared scratch memory used in bitpacking

Expand All @@ -164,9 +166,9 @@ class delta_binary_packer {
}

// Signed subtraction with defined wrapping behavior.
inline __device__ zigzag128_t subtract(zigzag128_t a, zigzag128_t b)
inline __device__ T subtract(T a, T b)
{
return static_cast<zigzag128_t>(static_cast<uleb128_t>(a) - static_cast<uleb128_t>(b));
return static_cast<T>(static_cast<U>(a) - static_cast<U>(b));
vuule marked this conversation as resolved.
Show resolved Hide resolved
}

public:
Expand All @@ -178,9 +180,9 @@ class delta_binary_packer {
_dst = dest;
_num_values = num_values;
_buffer = buffer;
_scan_tmp = reinterpret_cast<delta::index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<delta::warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<delta::block_reduce::TempStorage*>(temp_storage);
_scan_tmp = reinterpret_cast<index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<typename warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<typename block_reduce::TempStorage*>(temp_storage);
_bitpack_tmp = _buffer + delta::buffer_size;
_current_idx = 0;
_values_in_buffer = 0;
Expand All @@ -193,7 +195,7 @@ class delta_binary_packer {
size_type const valid = is_valid;
size_type pos;
size_type num_valid;
delta::index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);
index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);

if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; }
__syncthreads();
Expand All @@ -216,7 +218,7 @@ class delta_binary_packer {
inline __device__ uint8_t const* flush()
{
using cudf::detail::warp_size;
__shared__ zigzag128_t block_min;
__shared__ T block_min;

int const t = threadIdx.x;
int const warp_id = t / warp_size;
Expand All @@ -225,27 +227,26 @@ class delta_binary_packer {
if (_values_in_buffer <= 0) { return _dst; }

// Calculate delta for this thread.
size_type const idx = _current_idx + t;
zigzag128_t const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<zigzag128_t>::max();
size_type const idx = _current_idx + t;
T const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<T>::max();

// Find min delta for the block.
auto const min_delta = delta::block_reduce(*_block_tmp).Reduce(delta, cub::Min());
auto const min_delta = block_reduce(*_block_tmp).Reduce(delta, cub::Min());

if (t == 0) { block_min = min_delta; }
__syncthreads();

// Compute frame of reference for the block.
uleb128_t const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;
U const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;

// Get max normalized delta for each warp, and use that to determine how many bits to use
// for the bitpacking of this warp.
zigzag128_t const warp_max =
delta::warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
U const warp_max = warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
__syncwarp();

if (lane_id == 0) { _mb_bits[warp_id] = sizeof(zigzag128_t) * 8 - __clzll(warp_max); }
if (lane_id == 0) { _mb_bits[warp_id] = sizeof(long long) * 8 - __clzll(warp_max); }
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
__syncthreads();

// write block header
Expand Down
32 changes: 17 additions & 15 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,15 @@ struct BitwiseOr {
}
};

// I is the column type from the input table
template <typename I>
// PT is the parquet physical type (INT32 or INT64).
// I is the column type from the input table.
template <Type PT, typename I>
__device__ uint8_t const* delta_encode(page_enc_state_s<0>* s,
uint32_t valid_count,
uint64_t* buffer,
void* temp_space)
{
using output_type = std::conditional_t<std::is_signed_v<I>, zigzag128_t, uleb128_t>;
using output_type = std::conditional_t<PT == INT32, int32_t, int64_t>;
__shared__ delta_binary_packer<output_type> packer;

auto const t = threadIdx.x;
Expand Down Expand Up @@ -1711,9 +1712,10 @@ __global__ void __launch_bounds__(block_size, 8)
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
__shared__ union {
typename block_reduce::TempStorage reduce_storage;
typename delta::index_scan::TempStorage delta_index_tmp;
typename delta::block_reduce::TempStorage delta_reduce_tmp;
typename delta::warp_reduce::TempStorage delta_warp_red_tmp[delta::num_mini_blocks];
typename delta_binary_packer<uleb128_t>::index_scan::TempStorage delta_index_tmp;
typename delta_binary_packer<uleb128_t>::block_reduce::TempStorage delta_reduce_tmp;
typename delta_binary_packer<uleb128_t>::warp_reduce::TempStorage
delta_warp_red_tmp[delta::num_mini_blocks];
} temp_storage;

auto* const s = &state_g;
Expand Down Expand Up @@ -1784,40 +1786,40 @@ __global__ void __launch_bounds__(block_size, 8)
switch (dtype_len_in) {
case 8: {
// only DURATIONS map to 8 bytes, so safe to just use signed here?
delta_ptr = delta_encode<int64_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int64_t>(s, valid_count, delta_shared, &temp_storage);
break;
}
case 4: {
if (type_id == type_id::UINT32) {
delta_ptr = delta_encode<uint32_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, uint32_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int32_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int32_t>(s, valid_count, delta_shared, &temp_storage);
}
break;
}
case 2: {
if (type_id == type_id::UINT16) {
delta_ptr = delta_encode<uint16_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, uint16_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int16_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int16_t>(s, valid_count, delta_shared, &temp_storage);
}
break;
}
case 1: {
if (type_id == type_id::UINT8) {
delta_ptr = delta_encode<uint8_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, uint8_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int8_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int8_t>(s, valid_count, delta_shared, &temp_storage);
}
break;
}
default: CUDF_UNREACHABLE("invalid dtype_len_in when encoding DELTA_BINARY_PACKED");
}
} else {
if (type_id == type_id::UINT64) {
delta_ptr = delta_encode<uint64_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT64, uint64_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int64_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT64, int64_t>(s, valid_count, delta_shared, &temp_storage);
}
}

Expand Down
16 changes: 3 additions & 13 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ def delta_num_rows():
return [1, 2, 23, 32, 33, 34, 64, 65, 66, 128, 129, 130, 20000, 50000]


@pytest.mark.parametrize("nrows", [1, 100000])
@pytest.mark.parametrize("nrows", delta_num_rows())
@pytest.mark.parametrize("add_nulls", [True, False])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1346,18 +1346,8 @@ def test_delta_binary(nrows, add_nulls, dtype, tmpdir):
use_dictionary=False,
)

# FIXME(ets): should probably not use more bits than the data type
try:
cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname))
except OSError as e:
if dtype == "int32" and nrows == 100000:
pytest.mark.xfail(
reason="arrow does not support 33-bit delta encoding"
)
else:
raise e
else:
assert_eq(cdf2, cdf)
cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname))
assert_eq(cdf2, cdf)


@pytest.mark.parametrize("nrows", delta_num_rows())
Expand Down