Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the row index stream order in ORC reader #13242

Merged
merged 8 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 16 additions & 26 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,18 @@ constexpr type_id to_type_id(const orc::SchemaType& schema,
return type_id::EMPTY;
}

constexpr std::pair<gpu::StreamIndexType, uint32_t> get_index_type_and_pos(
const orc::StreamKind kind, uint32_t skip_count, bool non_child)
gpu::StreamIndexType get_stream_index_type(orc::StreamKind kind)
{
switch (kind) {
case orc::DATA:
skip_count += 1;
skip_count |= (skip_count & 0xff) << 8;
return std::pair(gpu::CI_DATA, skip_count);
case orc::DATA: return gpu::CI_DATA;
case orc::LENGTH:
case orc::SECONDARY:
skip_count += 1;
skip_count |= (skip_count & 0xff) << 16;
return std::pair(gpu::CI_DATA2, skip_count);
case orc::DICTIONARY_DATA: return std::pair(gpu::CI_DICTIONARY, skip_count);
case orc::PRESENT:
skip_count += (non_child ? 1 : 0);
return std::pair(gpu::CI_PRESENT, skip_count);
case orc::ROW_INDEX: return std::pair(gpu::CI_INDEX, skip_count);
case orc::SECONDARY: return gpu::CI_DATA2;
case orc::DICTIONARY_DATA: return gpu::CI_DICTIONARY;
case orc::PRESENT: return gpu::CI_PRESENT;
case orc::ROW_INDEX: return gpu::CI_INDEX;
default:
// Skip this stream as it's not strictly required
return std::pair(gpu::CI_NUM_STREAMS, 0);
return gpu::CI_NUM_STREAMS;
}
}

Expand Down Expand Up @@ -213,16 +204,15 @@ size_t gather_stream_info(const size_t stripe_index,
}
if (col != -1) {
if (src_offset >= stripeinfo->indexLength || use_index) {
// NOTE: skip_count field is temporarily used to track index ordering
auto& chunk = chunks[stripe_index][col];
const auto idx =
get_index_type_and_pos(stream.kind, chunk.skip_count, col == orc2gdf[column_id]);
if (idx.first < gpu::CI_NUM_STREAMS) {
chunk.strm_id[idx.first] = stream_info.size();
chunk.strm_len[idx.first] = stream.length;
chunk.skip_count = idx.second;

if (idx.first == gpu::CI_DICTIONARY) {
auto& chunk = chunks[stripe_index][col];
auto const index_type = get_stream_index_type(stream.kind);
if (index_type < gpu::CI_NUM_STREAMS) {
chunk.strm_id[index_type] = stream_info.size();
chunk.strm_len[index_type] = stream.length;
// NOTE: skip_count field is temporarily used to track the presence of index streams
chunk.skip_count |= 1 << index_type;
vyasr marked this conversation as resolved.
Show resolved Hide resolved

if (index_type == gpu::CI_DICTIONARY) {
chunk.dictionary_start = *num_dictionary_entries;
chunk.dict_len = stripefooter->columns[column_id].dictionarySize;
*num_dictionary_entries += stripefooter->columns[column_id].dictionarySize;
Expand Down
39 changes: 33 additions & 6 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <cub/cub.cuh>
#include <rmm/cuda_stream_view.hpp>
#include <thrust/copy.h>
#include <thrust/execution_policy.h>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -226,6 +228,30 @@ enum row_entry_state_e {
STORE_INDEX2,
};

/**
* @brief Calculates the order of index streams based on the index types present in the column.
*
* @param index_types_bitmap The bitmap of index types showing which index streams are present
*
* @return The order of index streams
*/
static auto __device__ index_order_from_index_types(uint32_t index_types_bitmap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically all free functions are static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm misunderstanding the comment, but static is used here to give the function internal linkage - it is not called outside of this translation unit. Maybe this is not relevant for __device__ functions, but we use this extensively in cuIO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @ttnghia is confusing the concept of static member variables with usage of static as a storage class specifier? They are two completely different concepts that unfortunately reuse the same keyword. Otherwise I am not sure what the comment is referring to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but static is used here to give the function internal linkage

This makes sense. Otherwise, I don't see why it needs to be static. Sorry for the confusion.

Typically we can just put the function into an anonymous namespace for that purpose.

{
constexpr std::array full_order = {CI_PRESENT, CI_DATA, CI_DATA2};

std::array<uint32_t, full_order.size()> partial_order;
thrust::copy_if(thrust::seq,
full_order.cbegin(),
full_order.cend(),
partial_order.begin(),
[index_types_bitmap] __device__(auto index_type) {
// Check if the index type is present
return index_types_bitmap & (1 << index_type);
});

return partial_order;
}

/**
* @brief Decode a single row group index entry
*
Expand All @@ -239,11 +265,14 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s,
uint8_t const* const end)
{
constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8;
auto const stream_order = index_order_from_index_types(s->chunk.skip_count);

const uint8_t* cur = start;
row_entry_state_e state = NOT_FOUND;
uint32_t length = 0, strm_idx_id = s->chunk.skip_count >> 8, idx_id = 1, ci_id = CI_PRESENT,
pos_end = 0;
uint32_t length = 0;
uint32_t idx_id = 0;
uint32_t pos_end = 0;
uint32_t ci_id = CI_NUM_STREAMS;
vyasr marked this conversation as resolved.
Show resolved Hide resolved
while (cur < end) {
uint32_t v = 0;
for (uint32_t l = 0; l <= 28; l += 7) {
Expand Down Expand Up @@ -283,10 +312,8 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s,
}
break;
case STORE_INDEX0:
ci_id = (idx_id == (strm_idx_id & 0xff)) ? CI_DATA
: (idx_id == ((strm_idx_id >> 8) & 0xff)) ? CI_DATA2
: CI_PRESENT;
idx_id++;
// Start of a new entry; determine the stream index types
ci_id = stream_order[idx_id++];
if (s->is_compressed) {
if (ci_id < CI_PRESENT) s->row_index_entry[0][ci_id] = v;
if (cur >= start + pos_end) return length;
Expand Down
12 changes: 5 additions & 7 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2023, NVIDIA CORPORATION.

import datetime
import decimal
Expand Down Expand Up @@ -1896,12 +1896,10 @@ def test_reader_empty_stripe(datadir, fname):
assert_eq(expected, got)


@pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/11890", raises=RuntimeError
)
def test_reader_unsupported_offsets():
# needs enough data for more than one row group
expected = cudf.DataFrame({"str": ["*"] * 10001}, dtype="string")
# needs enough data for multiple row groups
@pytest.mark.parametrize("data", [["*"] * 10001, ["**", None] * 5001])
def test_reader_row_index_order(data):
expected = cudf.DataFrame({"str": data}, dtype="string")

buffer = BytesIO()
expected.to_pandas().to_orc(buffer)
Expand Down