Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix chunked reads of Parquet delta encoded pages #14921

Merged
merged 33 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
915db23
fix chunked reads of delta encoded pages
etseidl Jan 29, 2024
1f9e7db
fix comment
etseidl Jan 29, 2024
524149d
only assign str_bytes on thread 0
etseidl Jan 29, 2024
f32673f
Merge branch 'branch-24.04' into delta_chunked
etseidl Jan 29, 2024
91df98a
Merge branch 'rapidsai:branch-24.04' into delta_chunked
etseidl Jan 29, 2024
5be31b4
get accurate str_len for delta_length_byte_array
etseidl Jan 29, 2024
72be052
formatting
etseidl Jan 29, 2024
ba597ac
accurate size for delta_byte_array
etseidl Jan 29, 2024
8c9432e
revert delta byte array changes...needs entire block, not a single warp
etseidl Jan 30, 2024
d5c96c0
re-enable accurate delta_byte_array sizes
etseidl Jan 30, 2024
86a1210
Merge branch 'branch-24.04' into delta_chunked
etseidl Jan 30, 2024
cf6664b
fix typo
etseidl Jan 30, 2024
806d246
add fixme
etseidl Jan 30, 2024
74d7e51
refactor so gpuDecodeTotalPageStringSize is called by all threads
etseidl Jan 30, 2024
91e741d
Merge branch 'branch-24.04' into delta_chunked
etseidl Jan 30, 2024
2cbc917
suggestion from review
etseidl Jan 31, 2024
31c598b
use V2 page header info and remove TODO
etseidl Jan 31, 2024
7299aca
Merge branch 'delta_chunked' of github.com:etseidl/cudf into delta_ch…
etseidl Jan 31, 2024
c55e639
fix field name
etseidl Jan 31, 2024
3f698c5
Merge branch 'rapidsai:branch-24.04' into delta_chunked
etseidl Feb 5, 2024
cc16ca4
Merge remote-tracking branch 'origin/branch-24.04' into delta_chunked
etseidl Feb 6, 2024
18b99c6
remove outdated TODO
etseidl Feb 6, 2024
fdd77c7
Merge branch 'rapidsai:branch-24.04' into delta_chunked
etseidl Feb 14, 2024
2ca52fe
Merge branch 'branch-24.04' into delta_chunked
etseidl Feb 17, 2024
53434e2
Merge branch 'branch-24.04' into delta_chunked
etseidl Feb 20, 2024
01da47e
Merge branch 'branch-24.04' into delta_chunked
hyperbolic2346 Feb 20, 2024
9c21cb4
Merge branch 'branch-24.04' into delta_chunked
hyperbolic2346 Feb 21, 2024
e0ce950
Merge branch 'branch-24.04' into delta_chunked
hyperbolic2346 Feb 22, 2024
dcfa4ce
Merge branch 'rapidsai:branch-24.04' into delta_chunked
etseidl Feb 26, 2024
cf9619b
Merge branch 'branch-24.04' into delta_chunked
vuule Feb 27, 2024
1bd0b42
Merge branch 'branch-24.04' into delta_chunked
vuule Feb 28, 2024
34e3f45
Merge branch 'branch-24.04' into delta_chunked
etseidl Feb 29, 2024
09c26fe
Merge branch 'branch-24.04' into delta_chunked
vuule Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 122 additions & 11 deletions cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "delta_binary.cuh"
#include "page_decode.cuh"

#include <io/utilities/column_buffer.hpp>
Expand All @@ -40,26 +41,136 @@ constexpr int rolling_buf_size = LEVEL_DECODE_BUF_SIZE;
using unused_state_buf = page_state_buffers_s<0, 0, 0>;

/**
* @brief Calculate string bytes for DELTA_LENGTH_BYTE_ARRAY encoded pages
*
* Result is valid only on thread 0.
*
* @param s The local page info
* @param t Thread index
*/
__device__ size_type gpuDeltaLengthPageStringSize(page_state_s* s, int t)
{
if (t == 0) {
// find the beginning of char data
delta_binary_decoder string_lengths;
auto const* string_start = string_lengths.find_end_of_block(s->data_start, s->data_end);
// distance is size of string data
return static_cast<size_type>(std::distance(string_start, s->data_end));
}
return 0;
}

/**
* @brief Calculate string bytes for DELTA_BYTE_ARRAY encoded pages
*
* This expects all threads in the thread block (preprocess_block_size).
*
* @param s The local page info
* @param t Thread index
*/
__device__ size_type gpuDeltaPageStringSize(page_state_s* s, int t)
{
using cudf::detail::warp_size;
using WarpReduce = cub::WarpReduce<uleb128_t>;
__shared__ typename WarpReduce::TempStorage temp_storage[2];

__shared__ __align__(16) delta_binary_decoder prefixes;
__shared__ __align__(16) delta_binary_decoder suffixes;

int const lane_id = t % warp_size;
int const warp_id = t / warp_size;

if (t == 0) {
auto const* suffix_start = prefixes.find_end_of_block(s->data_start, s->data_end);
suffixes.init_binary_block(suffix_start, s->data_end);
}
__syncthreads();

// two warps will traverse the prefixes and suffixes and sum them up
auto const db = t < warp_size ? &prefixes : t < 2 * warp_size ? &suffixes : nullptr;

size_t total_bytes = 0;
if (db != nullptr) {
// initialize with first value (which is stored in last_value)
if (lane_id == 0) { total_bytes = db->last_value; }

uleb128_t lane_sum = 0;
while (db->current_value_idx < db->num_encoded_values(true)) {
// calculate values for current mini-block
db->calc_mini_block_values(lane_id);

// get per lane sum for mini-block
for (uint32_t i = 0; i < db->values_per_mb; i += 32) {
etseidl marked this conversation as resolved.
Show resolved Hide resolved
uint32_t const idx = db->current_value_idx + i + lane_id;
if (idx < db->value_count) {
lane_sum += db->value[rolling_index<delta_rolling_buf_size>(idx)];
}
}

if (lane_id == 0) { db->setup_next_mini_block(true); }
__syncwarp();
}

// get sum for warp.
// note: warp_sum will only be valid on lane 0.
auto const warp_sum = WarpReduce(temp_storage[warp_id]).Sum(lane_sum);

if (lane_id == 0) { total_bytes += warp_sum; }
}
__syncthreads();

// now sum up total_bytes from the two warps. result is only valid on thread 0.
auto const final_bytes =
cudf::detail::single_lane_block_sum_reduce<preprocess_block_size, 0>(total_bytes);

return static_cast<size_type>(final_bytes);
}

/**
* @brief Calculate the number of string bytes in the page.
*
* This function expects the dictionary position to be at 0 and will traverse
* the entire thing.
* the entire thing (for plain and dictionary encoding).
*
* Operates on a single warp only. Expects t < 32
* This expects all threads in the thread block (preprocess_block_size). Result is only
* valid on thread 0.
*
* @param s The local page info
* @param t Thread index
*/
__device__ size_type gpuDecodeTotalPageStringSize(page_state_s* s, int t)
{
using cudf::detail::warp_size;
size_type target_pos = s->num_input_values;
size_type str_len = 0;
if (s->dict_base) {
auto const [new_target_pos, len] =
gpuDecodeDictionaryIndices<true, unused_state_buf>(s, nullptr, target_pos, t);
target_pos = new_target_pos;
str_len = len;
} else if ((s->col.data_type & 7) == BYTE_ARRAY) {
str_len = gpuInitStringDescriptors<true, unused_state_buf>(s, nullptr, target_pos, t);
switch (s->page.encoding) {
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// TODO: make this block-based instead of just 1 warp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO for this PR or do we need a tracking issue for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an old TODO I moved from outside the function. @nvdbaranec do you think this TODO is still relevant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to delete. At this point any optimization effort needs to be starting at the top and looking at the whole picture anyway.

if (t < warp_size && s->dict_base) {
auto const [new_target_pos, len] =
gpuDecodeDictionaryIndices<true, unused_state_buf>(s, nullptr, target_pos, t);
target_pos = new_target_pos;
str_len = len;
}
break;

case Encoding::PLAIN:
// TODO: since this is really just an estimate, we could just return
// s->dict_size (overestimate) or
// s->dict_size - sizeof(int) * s->page.num_input_values (underestimate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I mistaken that this was discussed and decided to find the real size?

Copy link
Contributor Author

@etseidl etseidl Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind the discussion was more about the delta encodings...this is more a note that there could be a faster way to do this (esp since the plain string size calc is agonizingly slow). Unfortunately, at this point we don't really yet know how many values are present, so we can't get an exact number of string bytes. But if an overestimate is ok, we could just use dict_size and save a lot of time in this step. Not sure if that's something to address with this PR.

Edit: actually, for V2 headers, we do know how many values there are. I'm going to change this and get rid of the TODO.

if (t < warp_size && (s->col.data_type & 7) == BYTE_ARRAY) {
str_len = gpuInitStringDescriptors<true, unused_state_buf>(s, nullptr, target_pos, t);
}
break;

case Encoding::DELTA_LENGTH_BYTE_ARRAY: str_len = gpuDeltaLengthPageStringSize(s, t); break;

case Encoding::DELTA_BYTE_ARRAY: str_len = gpuDeltaPageStringSize(s, t); break;

default:
// not a valid string encoding, so just return 0
break;
}
if (!t) { s->dict_pos = target_pos; }
return str_len;
Expand Down Expand Up @@ -348,9 +459,9 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
}

// retrieve total string size.
// TODO: make this block-based instead of just 1 warp
if (compute_string_sizes) {
if (t < 32) { s->page.str_bytes = gpuDecodeTotalPageStringSize(s, t); }
auto const str_bytes = gpuDecodeTotalPageStringSize(s, t);
if (t == 0) { s->page.str_bytes = str_bytes; }
}

// update output results:
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_bits = 0;
s->dict_base = nullptr;
s->dict_size = 0;
s->dict_val = 0;
// NOTE: if additional encodings are supported in the future, modifications must
// be made to is_supported_encoding() in reader_impl_preprocess.cu
switch (s->page.encoding) {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ __device__ thrust::pair<size_t, size_t> totalDeltaByteArraySize(uint8_t const* d
// get sum for warp.
// note: warp_sum will only be valid on lane 0.
auto const warp_sum = WarpReduce(temp_storage[warp_id]).Sum(lane_sum);
__syncwarp();
auto const warp_max = WarpReduce(temp_storage[warp_id]).Reduce(lane_max, cub::Max());

if (lane_id == 0) {
Expand Down
Loading
Loading