Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Bloom filters between row groups instead of the end #5860

Merged
merged 16 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }

dsi-progress-logger = { version = "0.2.4", optional = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove these new dependencies (even though I do realize they are optional and won't be activated very often)

I think they will add some ongoing maintenance cost (keeping the dependencies updated) which I would prefer to avoid if possible

Copy link
Contributor Author

@progval progval Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about depending only on sysinfo to display the RAM usage? It has a small set of dependencies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be ok

Copy link
Contributor Author

@progval progval Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It now looks like this:

$ cargo run --release --features="cli sysinfo" --example write_parquet -- /tmp/test.parquet
2024-06-13 21:45:40 Writing 1000 batches of 1000000 rows. RSS = 1MB
2024-06-13 21:45:50 Iteration 260/1000. RSS = 50MB
2024-06-13 21:46:00 Iteration 518/1000. RSS = 50MB
2024-06-13 21:46:10 Iteration 772/1000. RSS = 50MB
2024-06-13 21:46:19 Done. RSS = 17MB

$ cargo run --release --features="cli sysinfo" --example write_parquet -- /tmp/test.parquet --bloom-filter-position end
2024-06-13 21:46:29 Writing 1000 batches of 1000000 rows. RSS = 1MB
2024-06-13 21:46:39 Iteration 267/1000. RSS = 451MB
2024-06-13 21:46:49 Iteration 533/1000. RSS = 791MB
2024-06-13 21:46:59 Iteration 799/1000. RSS = 1151MB
2024-06-13 21:47:07 Done. RSS = 1055MB

simplelog = { version = "0.12.2", optional = true }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
criterion = { version = "0.5", default-features = false }
Expand Down Expand Up @@ -114,12 +117,19 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd", "zstd-sys"]
# Enable progress logging
log = ["dep:simplelog", "dep:dsi-progress-logger"]

[[example]]
name = "read_parquet"
required-features = ["arrow"]
path = "./examples/read_parquet.rs"

[[example]]
name = "write_parquet"
required-features = ["log"]
path = "./examples/write_parquet.rs"

[[example]]
name = "async_read_parquet"
required-features = ["arrow", "async"]
Expand Down
71 changes: 71 additions & 0 deletions parquet/examples/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fs::File;
use std::sync::Arc;

use dsi_progress_logger::prelude::*;

use arrow::array::{StructArray, UInt64Builder};
use arrow::datatypes::DataType::UInt64;
use arrow::datatypes::{Field, Schema};
use parquet::arrow::ArrowWriter as ParquetWriter;
use parquet::basic::Encoding;
use parquet::errors::Result;
use parquet::file::properties::WriterProperties;

fn main() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could add some comments here explaining what this example is trying to show

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, along with a Clap argument parser:

$ cargo run --release --features="cli sysinfo" --example write_parquet -- -h
Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage

Usage: write_parquet [OPTIONS] <PATH>

Arguments:
  <PATH>  Path to the file to write

Options:
      --iterations <ITERATIONS>                        Number of batches to write [default: 1000]
      --batch <BATCH>                                  Number of rows in each batch [default: 1000000]
      --bloom-filter-position <BLOOM_FILTER_POSITION>  Where to write Bloom Filters [default: after-row-group] [possible values: end, after-row-group]
  -h, --help                                           Print help
  -V, --version                                        Print version

let _ = simplelog::SimpleLogger::init(simplelog::LevelFilter::Info, Default::default());

let properties = WriterProperties::builder()
.set_column_bloom_filter_enabled("id".into(), true)
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
// Create parquet file that will be read.
let path = "/tmp/test.parquet";
let file = File::create(path).unwrap();
let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;

let num_iterations = 3000;
let mut pl = progress_logger!(
item_name = "iterations",
display_memory = true,
expected_updates = Some(num_iterations as usize)
);
pl.start("Writing batches");
let mut array_builder = UInt64Builder::new();
for i in 0..num_iterations {
pl.update();
for j in 0..1_000_000 {
array_builder.append_value(i + j);
}
writer.write(
&StructArray::new(
schema.fields().clone(),
vec![Arc::new(array_builder.finish())],
None,
)
.into(),
)?;
}
writer.flush()?;
writer.close()?;
pl.done();

Ok(())
}
28 changes: 25 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down Expand Up @@ -1039,7 +1039,9 @@ mod tests {
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1687,6 +1689,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
bloom_filter_position: BloomFilterPosition,
}

impl RoundTripOptions {
Expand All @@ -1697,6 +1700,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
}
Expand All @@ -1716,6 +1720,7 @@ mod tests {
values,
schema,
bloom_filter,
bloom_filter_position,
} = options;

let encodings = match values.data_type() {
Expand Down Expand Up @@ -1756,6 +1761,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(bloom_filter_position)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down Expand Up @@ -2103,6 +2109,22 @@ mod tests {
values_required::<BinaryArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter_at_end() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;
options.bloom_filter_position = BloomFilterPosition::End;

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn i32_column_bloom_filter() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
file::{metadata::RowGroupMetaData, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.sync_writer.flushed_row_groups()
}

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ impl RowGroupMetaData {
&self.columns
}

/// Returns mutable slice of column chunk metadata.
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
Expand Down
36 changes: 36 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::bloom_filter_position`]
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
/// Default value for [`WriterProperties::column_index_truncate_length`]
Expand Down Expand Up @@ -86,6 +88,24 @@ impl FromStr for WriterVersion {
}
}

/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
/// write Bloom filters
///
/// Basic constant, which is not part of the Thrift definition.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
/// Write Bloom Filters of each row group right after the row group
///
/// This saves memory by writing it as soon as it is computed, at the cost
/// of data locality for readers
AfterRowGroup,
/// Write Bloom Filters at the end of the file
///
/// This allows better data locality for readers, at the cost of memory usage
/// for writers.
End,
}

/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;

Expand Down Expand Up @@ -131,6 +151,7 @@ pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
Expand Down Expand Up @@ -217,6 +238,11 @@ impl WriterProperties {
self.max_row_group_size
}

/// Returns maximum number of rows in a row group.
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}

/// Returns configured writer version.
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
Expand Down Expand Up @@ -337,6 +363,7 @@ pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
key_value_metadata: Option<Vec<KeyValue>>,
Expand All @@ -356,6 +383,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: usize::MAX,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
Expand All @@ -375,6 +403,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
key_value_metadata: self.key_value_metadata,
Expand Down Expand Up @@ -479,6 +508,12 @@ impl WriterPropertiesBuilder {
self
}

/// Sets where in the final file Bloom Filters are written
progval marked this conversation as resolved.
Show resolved Hide resolved
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}

/// Sets "created by" property.
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
Expand Down Expand Up @@ -991,6 +1026,7 @@ mod tests {
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
Expand Down
Loading
Loading