Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ggershinsky authored and rok committed Oct 28, 2024
1 parent c039762 commit 7faac72
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 3 deletions.
2 changes: 2 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }
ring = { version = "0.17", default-features = false, features = ["std"]}

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -120,6 +121,7 @@ zstd = ["dep:zstd", "zstd-sys"]
sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]
#encryption = ["aes-gcm", "base64"]

[[example]]
name = "read_parquet"
Expand Down
65 changes: 64 additions & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ mod filter;
mod selection;
pub mod statistics;

use crate::file::footer;
use crate::file::page_index::index_reader;
use crate::encryption::ciphers::FileDecryptionProperties;

/// Builder for constructing parquet readers into arrow.
///
/// Most users should use one of the following specializations:
Expand Down Expand Up @@ -317,7 +321,7 @@ impl ArrowReaderOptions {
///
/// // Create the reader and read the data using the supplied schema.
/// let mut reader = builder.build().unwrap();
/// let _batch = reader.next().unwrap().unwrap();
/// let _batch = reader.next().unwrap().unwrap();
/// ```
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
Expand Down Expand Up @@ -369,6 +373,35 @@ pub struct ArrowReaderMetadata {
}

impl ArrowReaderMetadata {
/// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`]
///
/// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used
pub fn load2<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
Self::load_with_decryption(reader, options, FileDecryptionProperties::builder().build())
}

pub fn load_with_decryption<T: ChunkReader>(reader: &T, options: ArrowReaderOptions,
file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
let mut metadata = footer::parse_metadata_with_decryption(reader, file_decryption_properties)?;
if options.page_index {
let column_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_columns_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;
metadata.set_column_index(Some(column_index));

let offset_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;

metadata.set_offset_index(Some(offset_index))
}
Self::try_new(Arc::new(metadata), options)
}

/// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
///
/// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
Expand Down Expand Up @@ -532,6 +565,11 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
Ok(Self::new_with_metadata(reader, metadata))
}

pub fn try_new_with_decryption(reader: T, options: ArrowReaderOptions, file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
let metadata = ArrowReaderMetadata::load_with_decryption(&reader, options, file_decryption_properties)?;
Ok(Self::new_with_metadata(reader, metadata))
}

/// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`]
///
/// This interface allows:
Expand Down Expand Up @@ -788,6 +826,13 @@ impl ParquetRecordBatchReader {
.build()
}

pub fn try_new_with_decryption<T: ChunkReader + 'static>(reader: T, batch_size: usize,
file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
ParquetRecordBatchReaderBuilder::try_new_with_decryption(reader, Default::default(), file_decryption_properties)?
.with_batch_size(batch_size)
.build()
}

/// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
///
/// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
Expand Down Expand Up @@ -955,6 +1000,7 @@ mod tests {
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
FloatType, Int32Type, Int64Type, Int96Type,
};
use crate::encryption::ciphers;
use crate::errors::Result;
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::file::writer::SerializedFileWriter;
Expand Down Expand Up @@ -1663,6 +1709,23 @@ mod tests {
assert!(col.value(2).is_nan());
}

#[test]
fn test_uniform_encryption() {
let path = format!(
"{}/uniform_encryption.parquet.encrypted",
arrow::util::test_util::parquet_test_data(),
);
let file = File::open(path).unwrap();
// todo
let key_code: &[u8] = "0123456789012345".as_bytes();
// todo
let decryption_properties = ciphers::FileDecryptionProperties::builder()
.with_footer_key(key_code.to_vec())
.build();
let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties).unwrap();
// todo check contents
}

#[test]
fn test_read_float32_float64_byte_stream_split() {
let path = format!(
Expand Down
Loading

0 comments on commit 7faac72

Please sign in to comment.