Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Bloom filters between row groups instead of the end #5860

Merged
merged 16 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.30.12", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -114,12 +115,19 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]

[[example]]
name = "read_parquet"
required-features = ["arrow"]
path = "./examples/read_parquet.rs"

[[example]]
name = "write_parquet"
required-features = ["cli", "sysinfo"]
path = "./examples/write_parquet.rs"

[[example]]
name = "async_read_parquet"
required-features = ["arrow", "async"]
Expand Down
131 changes: 131 additions & 0 deletions parquet/examples/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use arrow::array::{StructArray, UInt64Builder};
use arrow::datatypes::DataType::UInt64;
use arrow::datatypes::{Field, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::ArrowWriter as ParquetWriter;
use parquet::basic::Encoding;
use parquet::errors::Result;
use parquet::file::properties::{BloomFilterPosition, WriterProperties};
use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};

#[derive(ValueEnum, Clone)]
enum BloomFilterPositionArg {
End,
AfterRowGroup,
}

#[derive(Parser)]
#[command(version)]
/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage.
struct Args {
#[arg(long, default_value_t = 1000)]
/// Number of batches to write
iterations: u64,

#[arg(long, default_value_t = 1000000)]
/// Number of rows in each batch
batch: u64,

#[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)]
/// Where to write Bloom Filters
bloom_filter_position: BloomFilterPositionArg,

/// Path to the file to write
path: PathBuf,
}

fn now() -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}

fn mem(system: &mut System) -> String {
let pid = Pid::from(std::process::id() as usize);
system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory());
system
.process(pid)
.map(|proc| format!("{}MB", proc.memory() / 1_000_000))
.unwrap_or("N/A".to_string())
}

fn main() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could add some comments here explaining what this example is trying to show

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, along with a Clap argument parser:

$ cargo run --release --features="cli sysinfo" --example write_parquet -- -h
Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage

Usage: write_parquet [OPTIONS] <PATH>

Arguments:
  <PATH>  Path to the file to write

Options:
      --iterations <ITERATIONS>                        Number of batches to write [default: 1000]
      --batch <BATCH>                                  Number of rows in each batch [default: 1000000]
      --bloom-filter-position <BLOOM_FILTER_POSITION>  Where to write Bloom Filters [default: after-row-group] [possible values: end, after-row-group]
  -h, --help                                           Print help
  -V, --version                                        Print version

let args = Args::parse();

let bloom_filter_position = match args.bloom_filter_position {
BloomFilterPositionArg::End => BloomFilterPosition::End,
BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup,
};

let properties = WriterProperties::builder()
.set_column_bloom_filter_enabled("id".into(), true)
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
.set_bloom_filter_position(bloom_filter_position)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
// Create parquet file that will be read.
let file = File::create(args.path).unwrap();
let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;

let mut system =
System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
eprintln!(
"{} Writing {} batches of {} rows. RSS = {}",
now(),
args.iterations,
args.batch,
mem(&mut system)
);

let mut array_builder = UInt64Builder::new();
let mut last_log = Instant::now();
for i in 0..args.iterations {
if Instant::now() - last_log > Duration::new(10, 0) {
last_log = Instant::now();
eprintln!(
"{} Iteration {}/{}. RSS = {}",
now(),
i + 1,
args.iterations,
mem(&mut system)
);
}
for j in 0..args.batch {
array_builder.append_value(i + j);
}
writer.write(
&StructArray::new(
schema.fields().clone(),
vec![Arc::new(array_builder.finish())],
None,
)
.into(),
)?;
}
writer.flush()?;
writer.close()?;

eprintln!("{} Done. RSS = {}", now(), mem(&mut system));

Ok(())
}
28 changes: 25 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down Expand Up @@ -1053,7 +1053,9 @@ mod tests {
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1701,6 +1703,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
bloom_filter_position: BloomFilterPosition,
}

impl RoundTripOptions {
Expand All @@ -1711,6 +1714,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
}
Expand All @@ -1730,6 +1734,7 @@ mod tests {
values,
schema,
bloom_filter,
bloom_filter_position,
} = options;

let encodings = match values.data_type() {
Expand Down Expand Up @@ -1770,6 +1775,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(bloom_filter_position)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down Expand Up @@ -2127,6 +2133,22 @@ mod tests {
values_required::<BinaryViewArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter_at_end() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;
options.bloom_filter_position = BloomFilterPosition::End;

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn i32_column_bloom_filter() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
file::{metadata::RowGroupMetaData, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.sync_writer.flushed_row_groups()
}

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ impl RowGroupMetaData {
&self.columns
}

/// Returns mutable slice of column chunk metadata.
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
Expand Down
36 changes: 36 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::bloom_filter_position`]
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
/// Default value for [`WriterProperties::column_index_truncate_length`]
Expand Down Expand Up @@ -86,6 +88,24 @@ impl FromStr for WriterVersion {
}
}

/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
/// write Bloom filters
///
/// Basic constant, which is not part of the Thrift definition.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
/// Write Bloom Filters of each row group right after the row group
///
/// This saves memory by writing it as soon as it is computed, at the cost
/// of data locality for readers
AfterRowGroup,
/// Write Bloom Filters at the end of the file
///
/// This allows better data locality for readers, at the cost of memory usage
/// for writers.
End,
}

/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;

Expand Down Expand Up @@ -130,6 +150,7 @@ pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
Expand Down Expand Up @@ -217,6 +238,11 @@ impl WriterProperties {
self.max_row_group_size
}

/// Returns maximum number of rows in a row group.
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}

/// Returns configured writer version.
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
Expand Down Expand Up @@ -338,6 +364,7 @@ pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
key_value_metadata: Option<Vec<KeyValue>>,
Expand All @@ -357,6 +384,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: usize::MAX,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
Expand All @@ -376,6 +404,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
key_value_metadata: self.key_value_metadata,
Expand Down Expand Up @@ -487,6 +516,12 @@ impl WriterPropertiesBuilder {
self
}

/// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`)
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}

/// Sets "created by" property (defaults to `parquet-rs version <VERSION>`).
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
Expand Down Expand Up @@ -1052,6 +1087,7 @@ mod tests {
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
Expand Down
Loading
Loading