Skip to content

Commit

Permalink
Shuffling read into a sub function in parquet read (#12809)
Browse files Browse the repository at this point in the history
This change is the first step toward the pipelined parquet reader and moves the chunk creation and file reads into another function. Right now, the operation is the same, but this change will allow for smaller groups to be read at a time for pipelining.

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)

URL: #12809
  • Loading branch information
hyperbolic2346 authored Feb 23, 2023
1 parent 5719463 commit 430d91e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
15 changes: 13 additions & 2 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,10 +130,21 @@ class reader::impl {
bool uses_custom_row_bounds,
host_span<std::vector<size_type> const> row_group_indices);

/**
* @brief Create chunk information and start file reads
*
* @param row_groups_info vector of information about row groups to read
* @param num_rows Maximum number of rows to read
* @return pair of boolean indicating if compressed chunks were found and a vector of futures for
* read completion
*/
std::pair<bool, std::vector<std::future<void>>> create_and_read_column_chunks(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows);

/**
* @brief Load and decompress the input file(s) into memory.
*/
void load_and_decompress_data(std::vector<row_group_info> const& row_groups_info,
void load_and_decompress_data(cudf::host_span<row_group_info const> const row_groups_info,
size_type num_rows);

/**
Expand Down
43 changes: 26 additions & 17 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -651,16 +651,11 @@ void reader::impl::allocate_nesting_info()
page_nesting_decode_info.host_to_device(_stream);
}

void reader::impl::load_and_decompress_data(std::vector<row_group_info> const& row_groups_info,
size_type num_rows)
std::pair<bool, std::vector<std::future<void>>> reader::impl::create_and_read_column_chunks(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows)
{
// This function should never be called if `num_rows == 0`.
CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero.");

auto& raw_page_data = _file_itm_data.raw_page_data;
auto& decomp_page_data = _file_itm_data.decomp_page_data;
auto& chunks = _file_itm_data.chunks;
auto& pages_info = _file_itm_data.pages_info;
auto& raw_page_data = _file_itm_data.raw_page_data;
auto& chunks = _file_itm_data.chunks;

// Descriptors for all the chunks that make up the selected columns
const auto num_input_columns = _input_columns.size();
Expand Down Expand Up @@ -732,7 +727,7 @@ void reader::impl::load_and_decompress_data(std::vector<row_group_info> const& r
total_decompressed_size += col_meta.total_uncompressed_size;
}
}
remaining_rows -= row_group.num_rows;
remaining_rows -= row_group_rows;
}

// Read compressed chunk data to device memory
Expand All @@ -745,27 +740,41 @@ void reader::impl::load_and_decompress_data(std::vector<row_group_info> const& r
chunk_source_map,
_stream));

CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read.");

return {total_decompressed_size > 0, std::move(read_rowgroup_tasks)};
}

void reader::impl::load_and_decompress_data(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows)
{
// This function should never be called if `num_rows == 0`.
CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero.");

auto& raw_page_data = _file_itm_data.raw_page_data;
auto& decomp_page_data = _file_itm_data.decomp_page_data;
auto& chunks = _file_itm_data.chunks;
auto& pages_info = _file_itm_data.pages_info;

auto const [has_compressed_data, read_rowgroup_tasks] =
create_and_read_column_chunks(row_groups_info, num_rows);

for (auto& task : read_rowgroup_tasks) {
task.wait();
}

CUDF_EXPECTS(remaining_rows <= 0, "All rows data must be read.");

// Process dataset chunk pages into output columns
auto const total_pages = count_page_headers(chunks, _stream);
pages_info = hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);

if (total_pages > 0) {
// decoding of column/page information
decode_page_headers(chunks, pages_info, _stream);
if (total_decompressed_size > 0) {
if (has_compressed_data) {
decomp_page_data = decompress_page_data(chunks, pages_info, _stream);
// Free compressed data
for (size_t c = 0; c < chunks.size(); c++) {
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) {
raw_page_data[c].reset();
// TODO: Check if this is called
}
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); }
}
}

Expand Down

0 comments on commit 430d91e

Please sign in to comment.