Skip to content

Commit

Permalink
Fix 'ColumnPath not found' error reading Parquet files with nested RE…
Browse files Browse the repository at this point in the history
…PEATED fields (#5102)

* Fix ColumnPath not found error in Parquet files with nested REPEATED fields

* Avoid pushing a value just to pop it right after

* Review feedback

---------

Co-authored-by: Matthieu Maitre <mmaitre@microsoft.com>
Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
3 people authored Nov 28, 2023
1 parent 58c80e6 commit a361ce1
Showing 1 changed file with 105 additions and 2 deletions.
107 changes: 105 additions & 2 deletions parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ impl TreeBuilder {
row_group_reader,
)?;

Reader::RepeatedReader(
return Ok(Reader::RepeatedReader(
field,
curr_def_level - 1,
curr_rep_level - 1,
Box::new(reader),
)
));
}
// Group types (structs)
_ => {
Expand Down Expand Up @@ -811,11 +811,14 @@ impl Iterator for ReaderIter {
mod tests {
use super::*;

use crate::data_type::Int64Type;
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::SerializedFileWriter;
use crate::record::api::{Field, Row, RowAccessor};
use crate::schema::parser::parse_message_type;
use crate::util::test_common::file_util::{get_test_file, get_test_path};
use bytes::Bytes;
use std::convert::TryFrom;

// Convenient macros to assemble row, list, map, and group.
Expand Down Expand Up @@ -1580,6 +1583,106 @@ mod tests {
assert_eq!(rows, expected_rows);
}

#[test]
fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
// Create schema
let schema = Arc::new(
parse_message_type(
"
message schema {
REPEATED group level1 {
REPEATED group level2 {
REQUIRED group level3 {
REQUIRED INT64 value3;
}
}
REQUIRED INT64 value1;
}
}",
)
.unwrap(),
);

// Write Parquet file to buffer
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();

// Write column level1.level2.level3.value3
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int64Type>()
.write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
.unwrap();
column_writer.close().unwrap();

// Write column level1.value1
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int64Type>()
.write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
.unwrap();
column_writer.close().unwrap();

// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");

// Read Parquet file from buffer
let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let rows: Vec<_> = file_reader
.get_row_iter(None)
.unwrap()
.map(|row| row.unwrap())
.collect();

let expected_rows = vec![
row![(
"level1".to_string(),
list![group![
(
"level2".to_string(),
list![group![(
"level3".to_string(),
group![("value3".to_string(), Field::Long(30))]
)]]
),
("value1".to_string(), Field::Long(10))
]]
)],
row![(
"level1".to_string(),
list![group![
(
"level2".to_string(),
list![group![(
"level3".to_string(),
group![("value3".to_string(), Field::Long(31))]
)]]
),
("value1".to_string(), Field::Long(11))
]]
)],
row![(
"level1".to_string(),
list![group![
(
"level2".to_string(),
list![group![(
"level3".to_string(),
group![("value3".to_string(), Field::Long(32))]
)]]
),
("value1".to_string(), Field::Long(12))
]]
)],
];

assert_eq!(rows, expected_rows);
}

fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
Expand Down

0 comments on commit a361ce1

Please sign in to comment.