Skip to content

Commit

Permalink
Use ParquetMetaDataReader
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Nov 23, 2024
1 parent 9f663ba commit 6f055f9
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 324 deletions.
77 changes: 33 additions & 44 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ mod filter;
mod selection;
pub mod statistics;

use crate::file::footer;
use crate::file::page_index::index_reader;
use crate::encryption::ciphers::FileDecryptionProperties;

/// Builder for constructing parquet readers into arrow.
Expand Down Expand Up @@ -373,35 +371,6 @@ pub struct ArrowReaderMetadata {
}

impl ArrowReaderMetadata {
/// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`]
///
/// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used
pub fn load2<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
Self::load_with_decryption(reader, options, FileDecryptionProperties::builder().build())
}

pub fn load_with_decryption<T: ChunkReader>(reader: &T, options: ArrowReaderOptions,
file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
let mut metadata = footer::parse_metadata_with_decryption(reader, file_decryption_properties)?;
if options.page_index {
let column_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_columns_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;
metadata.set_column_index(Some(column_index));

let offset_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;

metadata.set_offset_index(Some(offset_index))
}
Self::try_new(Arc::new(metadata), options)
}

/// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
///
/// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
Expand All @@ -412,9 +381,14 @@ impl ArrowReaderMetadata {
/// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
/// `Self::metadata` is missing the page index, this function will attempt
/// to load the page index by making an object store request.
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
pub fn load<T: ChunkReader>(
reader: &T,
options: ArrowReaderOptions,
file_decryption_properties: Option<FileDecryptionProperties>,
) -> Result<Self> {
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(options.page_index)
.with_encryption_properties(file_decryption_properties)
.parse_and_finish(reader)?;
Self::try_new(Arc::new(metadata), options)
}
Expand Down Expand Up @@ -561,12 +535,16 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {

/// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
let metadata = ArrowReaderMetadata::load(&reader, options)?;
let metadata = ArrowReaderMetadata::load(&reader, options, None)?;
Ok(Self::new_with_metadata(reader, metadata))
}

pub fn try_new_with_decryption(reader: T, options: ArrowReaderOptions, file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
let metadata = ArrowReaderMetadata::load_with_decryption(&reader, options, file_decryption_properties)?;
pub fn try_new_with_decryption(
reader: T,
options: ArrowReaderOptions,
file_decryption_properties: Option<FileDecryptionProperties>,
) -> Result<Self> {
let metadata = ArrowReaderMetadata::load(&reader, options, file_decryption_properties)?;
Ok(Self::new_with_metadata(reader, metadata))
}

Expand Down Expand Up @@ -826,11 +804,18 @@ impl ParquetRecordBatchReader {
.build()
}

pub fn try_new_with_decryption<T: ChunkReader + 'static>(reader: T, batch_size: usize,
file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
ParquetRecordBatchReaderBuilder::try_new_with_decryption(reader, Default::default(), file_decryption_properties)?
.with_batch_size(batch_size)
.build()
pub fn try_new_with_decryption<T: ChunkReader + 'static>(
reader: T,
batch_size: usize,
file_decryption_properties: Option<FileDecryptionProperties>,
) -> Result<Self> {
ParquetRecordBatchReaderBuilder::try_new_with_decryption(
reader,
Default::default(),
file_decryption_properties,
)?
.with_batch_size(batch_size)
.build()
}

/// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
Expand Down Expand Up @@ -1719,10 +1704,14 @@ mod tests {
// todo
let key_code: &[u8] = "0123456789012345".as_bytes();
// todo
let decryption_properties = ciphers::FileDecryptionProperties::builder()
.with_footer_key(key_code.to_vec())
.build();
let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties).unwrap();
let decryption_properties = Some(
ciphers::FileDecryptionProperties::builder()
.with_footer_key(key_code.to_vec())
.build(),
);
let record_reader =
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties)
.unwrap();
// todo check contents
}

Expand Down
5 changes: 4 additions & 1 deletion parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;

Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
// TODO: add self.file_decryption_properties
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(
&buf, None,
)?))
}
.boxed()
}
Expand Down
1 change: 1 addition & 0 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord
Ok(aad)
}

#[derive(Clone)]
pub struct FileDecryptionProperties {
footer_key: Option<Vec<u8>>
}
Expand Down
Loading

0 comments on commit 6f055f9

Please sign in to comment.