Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 'ColumnPath not found' error reading Parquet files with nested REPEATED fields #5102

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl TreeBuilder {
.with_fields(field.get_fields().to_vec())
.build()?;

path.pop();
let field_name = path.pop().unwrap();

let reader = self.reader_tree(
Arc::new(required_field),
Expand All @@ -274,6 +274,8 @@ impl TreeBuilder {
row_group_reader,
)?;

path.push(field_name);
mmaitre314 marked this conversation as resolved.
Show resolved Hide resolved

Reader::RepeatedReader(
field,
curr_def_level - 1,
Expand Down Expand Up @@ -800,11 +802,14 @@ impl Iterator for ReaderIter {
mod tests {
use super::*;

use crate::data_type::Int64Type;
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::SerializedFileWriter;
use crate::record::api::{Field, Row, RowAccessor};
use crate::schema::parser::parse_message_type;
use crate::util::test_common::file_util::{get_test_file, get_test_path};
use bytes::Bytes;
use std::convert::TryFrom;

// Convenient macros to assemble row, list, map, and group.
Expand Down Expand Up @@ -1569,6 +1574,106 @@ mod tests {
assert_eq!(rows, expected_rows);
}

#[test]
fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
// Create schema
let schema = Arc::new(
parse_message_type(
"
message schema {
REPEATED group level1 {
REPEATED group level2 {
REQUIRED group level3 {
REQUIRED INT64 value3;
}
}
REQUIRED INT64 value1;
}
}",
)
.unwrap(),
);

// Write Parquet file to buffer
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();

// Write column level1.level2.level3.value3
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int64Type>()
.write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
.unwrap();
column_writer.close().unwrap();

// Write column level1.value1
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int64Type>()
.write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
.unwrap();
column_writer.close().unwrap();

// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");

// Read Parquet file from buffer
let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let rows: Vec<_> = file_reader
.get_row_iter(None)
.unwrap()
.map(|row| row.unwrap())
.collect();

let expected_rows = vec![
row![(
"level1".to_string(),
list![group![
(
"level2".to_string(),
list![group![(
"level3".to_string(),
group![("value3".to_string(), Field::Long(30))]
)]]
),
("value1".to_string(), Field::Long(10))
]]
)],
row![(
"level1".to_string(),
list![group![
(
"level2".to_string(),
list![group![(
"level3".to_string(),
group![("value3".to_string(), Field::Long(31))]
)]]
),
("value1".to_string(), Field::Long(11))
]]
)],
row![(
"level1".to_string(),
list![group![
(
"level2".to_string(),
list![group![(
"level3".to_string(),
group![("value3".to_string(), Field::Long(32))]
)]]
),
("value1".to_string(), Field::Long(12))
]]
)],
];

assert_eq!(rows, expected_rows);
}

fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
Expand Down
Loading