Skip to content

Commit

Permalink
Allow adding user defined metadata to ParquetSink (apache#10224)
Browse files Browse the repository at this point in the history
* chore: make explicit what ParquetWriterOptions are created from a subset of TableParquetOptions

* refactor: restore the ability to add kv metadata into the generated file sink

* test: demomnstrate API contract for metadata TableParquetOptions

* chore: update code docs

* fix: parse on proper delimiter, and improve tests

* fix: enable any character in the metadata string value, by having any key parsing be a part of the format.metadata::key
  • Loading branch information
wiedld authored and appletreeisyellow committed Apr 26, 2024
1 parent 600b815 commit 72bc0d7
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 37 deletions.
73 changes: 71 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,12 +1364,31 @@ impl TableOptions {

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
///
/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
/// (e.g. sorting_columns).
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ColumnOptions>,
/// Additional file-level metadata to include. Inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
///
/// Multiple entries are permitted
/// ```sql
/// OPTIONS (
/// 'format.metadata::key1' '',
/// 'format.metadata::key2' 'value',
/// 'format.metadata::key3' 'value has spaces',
/// 'format.metadata::key4' 'value has special chars :: :',
/// 'format.metadata::key_dupe' 'original will be overwritten',
/// 'format.metadata::key_dupe' 'final'
/// )
/// ```
pub key_value_metadata: HashMap<String, Option<String>>,
}

impl ConfigField for TableParquetOptions {
Expand All @@ -1380,8 +1399,24 @@ impl ConfigField for TableParquetOptions {
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Determine the key if it's a global or column-specific setting
if key.contains("::") {
// Determine if the key is a global, metadata, or column-specific setting
if key.starts_with("metadata::") {
let k =
match key.split("::").collect::<Vec<_>>()[..] {
[_meta] | [_meta, ""] => return Err(DataFusionError::Configuration(
"Invalid metadata key provided, missing key in metadata::<key>"
.to_string(),
)),
[_meta, k] => k.into(),
_ => {
return Err(DataFusionError::Configuration(format!(
"Invalid metadata key provided, found too many '::' in \"{key}\""
)))
}
};
self.key_value_metadata.insert(k, Some(value.into()));
Ok(())
} else if key.contains("::") {
self.column_specific_options.set(key, value)
} else {
self.global.set(key, value)
Expand Down Expand Up @@ -1773,4 +1808,38 @@ mod tests {
.iter()
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
.set("format.metadata::key3", "value with spaces ")
.unwrap();
table_config
.set("format.metadata::key4", "value with special chars :: :")
.unwrap();

let parsed_metadata = table_config.parquet.key_value_metadata.clone();
assert_eq!(parsed_metadata.get("should not exist1"), None);
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
assert_eq!(
parsed_metadata.get("key3"),
Some(&Some("value with spaces ".into()))
);
assert_eq!(
parsed_metadata.get("key4"),
Some(&Some("value with special chars :: :".into()))
);

// duplicate keys are overwritten
table_config.set("format.metadata::key_dupe", "A").unwrap();
table_config.set("format.metadata::key_dupe", "B").unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
}
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ mod tests {
123
);

// properties which remain as default on WriterProperties
assert_eq!(properties.key_value_metadata(), None);
assert_eq!(properties.sorting_columns(), None);

Ok(())
}

Expand Down
104 changes: 73 additions & 31 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

//! Options related to how parquet files should be written

use crate::{config::TableParquetOptions, DataFusionError, Result};
use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::{
metadata::KeyValue,
properties::{EnabledStatistics, WriterProperties, WriterVersion},
},
schema::types::ColumnPath,
};

Expand All @@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
.set_write_batch_size(parquet_session_options.write_batch_size)
.set_writer_version(parse_version_string(
&parquet_session_options.writer_version,
)?)
.set_dictionary_page_size_limit(
parquet_session_options.dictionary_page_size_limit,
)
.set_max_row_group_size(parquet_session_options.max_row_group_size)
.set_created_by(parquet_session_options.created_by.clone())
.set_column_index_truncate_length(
parquet_session_options.column_index_truncate_length,
let ParquetOptions {
data_pagesize_limit,
write_batch_size,
writer_version,
dictionary_page_size_limit,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
encoding,
dictionary_enabled,
compression,
statistics_enabled,
max_statistics_size,
bloom_filter_fpp,
bloom_filter_ndv,
// below is not part of ParquetWriterOptions
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Some(
parquet_options
.key_value_metadata
.clone()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>(),
)
.set_data_page_row_count_limit(
parquet_session_options.data_page_row_count_limit,
)
.set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled);
} else {
None
};

if let Some(encoding) = &parquet_session_options.encoding {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

if let Some(enabled) = parquet_session_options.dictionary_enabled {
builder = builder.set_dictionary_enabled(enabled);
if let Some(enabled) = dictionary_enabled {
builder = builder.set_dictionary_enabled(*enabled);
}

if let Some(compression) = &parquet_session_options.compression {
if let Some(compression) = &compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}

if let Some(statistics) = &parquet_session_options.statistics_enabled {
if let Some(statistics) = &statistics_enabled {
builder =
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
}

if let Some(size) = parquet_session_options.max_statistics_size {
builder = builder.set_max_statistics_size(size);
if let Some(size) = max_statistics_size {
builder = builder.set_max_statistics_size(*size);
}

if let Some(fpp) = parquet_session_options.bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(fpp);
if let Some(fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*fpp);
}

if let Some(ndv) = parquet_session_options.bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(ndv);
if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(*ndv);
}

for (column, options) in &parquet_options.column_specific_options {
Expand Down Expand Up @@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
Expand Down
29 changes: 26 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ mod tests {
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -1865,7 +1865,13 @@ mod tests {
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([
("my-data".to_string(), Some("stuff".to_string())),
("my-data-bool-key".to_string(), None),
]),
..Default::default()
},
));

// create data
Expand Down Expand Up @@ -1899,7 +1905,10 @@ mod tests {
let (
path,
FileMetaData {
num_rows, schema, ..
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
Expand All @@ -1915,6 +1924,20 @@ mod tests {
"output file metadata should contain col b"
);

let mut key_value_metadata = key_value_metadata.unwrap();
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
let expected_metadata = vec![
KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
},
KeyValue {
key: "my-data-bool-key".to_string(),
value: None,
},
];
assert_eq!(key_value_metadata, expected_metadata);

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: Default::default(),
})
}
}
Expand Down
64 changes: 63 additions & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,73 @@ OPTIONS (
'format.statistics_enabled::col2' none,
'format.max_statistics_size' 123,
'format.bloom_filter_fpp' 0.001,
'format.bloom_filter_ndv' 100
'format.bloom_filter_ndv' 100,
'format.metadata::key' 'value'
)
----
2

# valid vs invalid metadata

# accepts map with a single entry
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key' 'value'
)

# accepts multiple entries (on different keys)
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key1' '',
'format.metadata::key2' 'value',
'format.metadata::key3' 'value with spaces',
'format.metadata::key4' 'value with special chars :: :'
)

# accepts multiple entries with the same key (will overwrite)
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key1' 'value',
'format.metadata::key1' 'value'
)

# errors if key is missing
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, missing key in metadata::<key>
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::' 'value'
)

# errors if key contains internal '::'
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, found too many '::' in "metadata::key::extra"
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key::extra' 'value'
)

# errors for invalid property (not stating `format.metadata`)
statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata" not found on ColumnOptions
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.wrong-metadata::key' 'value'
)


# validate multiple parquet file output with all options set
statement ok
CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/';
Expand Down

0 comments on commit 72bc0d7

Please sign in to comment.