From 759767b721e6e45f31e087f058da1d10de065aa4 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 15:03:20 +0200 Subject: [PATCH 01/13] Add example script to write Parquet files with a Bloom filter --- parquet/Cargo.toml | 10 +++++ parquet/examples/write_parquet.rs | 71 +++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 parquet/examples/write_parquet.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e6d612e0cc62..986e2ad74cb4 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -68,6 +68,9 @@ twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } +dsi-progress-logger = { version = "0.2.4", optional = true } +simplelog = { version = "0.12.2", optional = true } + [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } criterion = { version = "0.5", default-features = false } @@ -104,12 +107,19 @@ experimental = [] async = ["futures", "tokio"] # Enable object_store integration object_store = ["dep:object_store", "async"] +# Enable progress logging +log = ["dep:simplelog", "dep:dsi-progress-logger"] [[example]] name = "read_parquet" required-features = ["arrow"] path = "./examples/read_parquet.rs" +[[example]] +name = "write_parquet" +required-features = ["log"] +path = "./examples/write_parquet.rs" + [[example]] name = "async_read_parquet" required-features = ["arrow", "async"] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs new file mode 100644 index 000000000000..06259ee5ad04 --- /dev/null +++ b/parquet/examples/write_parquet.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs::File; +use std::sync::Arc; + +use dsi_progress_logger::prelude::*; + +use arrow::array::{StructArray, UInt64Builder}; +use arrow::datatypes::DataType::UInt64; +use arrow::datatypes::{Field, Schema}; +use parquet::arrow::ArrowWriter as ParquetWriter; +use parquet::basic::Encoding; +use parquet::errors::Result; +use parquet::file::properties::WriterProperties; + +fn main() -> Result<()> { + let _ = simplelog::SimpleLogger::init(simplelog::LevelFilter::Info, Default::default()); + + let properties = WriterProperties::builder() + .set_column_bloom_filter_enabled("id".into(), true) + .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) + .build(); + let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); + // Create parquet file that will be read. + let path = "/tmp/test.parquet"; + let file = File::create(path).unwrap(); + let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; + + let num_iterations = 3000; + let mut pl = progress_logger!( + item_name = "iterations", + display_memory = true, + expected_updates = Some(num_iterations as usize) + ); + pl.start("Writing batches"); + let mut array_builder = UInt64Builder::new(); + for i in 0..num_iterations { + pl.update(); + for j in 0..1_000_000 { + array_builder.append_value(i + j); + } + writer.write( + &StructArray::new( + schema.fields().clone(), + vec![Arc::new(array_builder.finish())], + None, + ) + .into(), + )?; + } + writer.flush()?; + writer.close()?; + pl.done(); + + Ok(()) +} From a417f014ce6ff38081a584676e2f13c58cc110fe Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 15:10:48 +0200 Subject: [PATCH 02/13] Write Bloom filters between row groups instead of the end This allows Bloom filters to not be saved in memory, which can save significant space when writing long files --- parquet/src/arrow/arrow_writer/mod.rs | 3 ++- parquet/src/file/writer.rs | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 18c8617e07e6..e9eb86b7d76c 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -259,7 +259,8 @@ impl ArrowWriter { for chunk in in_progress.close()? { chunk.append_to_row_group(&mut row_group_writer)?; } - row_group_writer.close()?; + let row_group_metadata = row_group_writer.close()?; + self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?; Ok(()) } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7806384cdb52..cd20082f189a 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -274,13 +274,19 @@ impl SerializedFileWriter { } /// Serialize all the bloom filter to the file - fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + pub fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group // iter each column // write bloom filter to the file - for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for row_group in row_groups.iter_mut() { + let row_group_idx: u16 = row_group + .ordinal + .expect("Missing row group ordinal") + .try_into() + .expect("Negative row group ordinal"); + let row_group_idx = row_group_idx as usize; for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { - match &self.bloom_filters[row_group_idx][column_idx] { + match self.bloom_filters[row_group_idx][column_idx].take() { Some(bloom_filter) => { let start_offset = self.buf.bytes_written(); bloom_filter.write(&mut self.buf)?; @@ -338,7 +344,6 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); - self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; From 6effa7f90ea7eb71c841b81dd9f15f0cae853997 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 17:23:32 +0200 Subject: [PATCH 03/13] Add WriterProperties::bloom_filter_position --- parquet/src/arrow/arrow_writer/mod.rs | 9 +++++-- parquet/src/file/properties.rs | 36 +++++++++++++++++++++++++++ parquet/src/file/writer.rs | 1 + 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 067f9cbabdf5..6b9dc25a5e30 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -44,7 +44,7 @@ use crate::column::writer::{ use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; -use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; +use crate::file::properties::{BloomFilterPosition, WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; @@ -264,7 +264,11 @@ impl ArrowWriter { chunk.append_to_row_group(&mut row_group_writer)?; } let row_group_metadata = row_group_writer.close()?; - self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?; + match self.writer.properties().bloom_filter_position() { + BloomFilterPosition::AfterRowGroup => + self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?, + BloomFilterPosition::End => (), + } Ok(()) } @@ -1757,6 +1761,7 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) + .set_bloom_filter_position(BloomFilterPosition::End) .build(); files.push(roundtrip_opts(&expected_batch, props)) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 59e29440ae0c..8bcb760e8a64 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Default value for [`WriterProperties::bloom_filter_position`] +pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")); /// Default value for [`WriterProperties::column_index_truncate_length`] @@ -86,6 +88,24 @@ impl FromStr for WriterVersion { } } +/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should +/// write Bloom filters +/// +/// Basic constant, which is not part of the Thrift definition. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BloomFilterPosition { + /// Write Bloom Filters of each row group right after the row group + /// + /// This saves memory by writing it as soon as it is computed, at the cost + /// of data locality for readers + AfterRowGroup, + /// Write Bloom Filters at the end of the file + /// + /// This allows better data locality for readers, at the cost of memory usage + /// for writers. + End, +} + /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -131,6 +151,7 @@ pub struct WriterProperties { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, pub(crate) key_value_metadata: Option>, @@ -217,6 +238,11 @@ impl WriterProperties { self.max_row_group_size } + /// Returns maximum number of rows in a row group. + pub fn bloom_filter_position(&self) -> BloomFilterPosition { + self.bloom_filter_position + } + /// Returns configured writer version. pub fn writer_version(&self) -> WriterVersion { self.writer_version @@ -337,6 +363,7 @@ pub struct WriterPropertiesBuilder { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, key_value_metadata: Option>, @@ -356,6 +383,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: usize::MAX, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, + bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, @@ -375,6 +403,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, + bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, key_value_metadata: self.key_value_metadata, @@ -479,6 +508,12 @@ impl WriterPropertiesBuilder { self } + /// Sets where in the final file Bloom Filters are written + pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { + self.bloom_filter_position = value; + self + } + /// Sets "created by" property. pub fn set_created_by(mut self, value: String) -> Self { self.created_by = value; @@ -991,6 +1026,7 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); assert_eq!(props.key_value_metadata(), None); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index cd20082f189a..7f7f642b64a3 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -344,6 +344,7 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); + self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; From f23759a8beb65003b75e5c49428e53477aa85e57 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 22:10:41 +0200 Subject: [PATCH 04/13] Mutate the right row group metadata When using BloomFilterPosition::AfterRowGroup this was only writing the Bloom Filter offset to a temporary clone of the metadata, causing the Bloom Filter to never be seen by readers --- parquet/src/arrow/arrow_writer/mod.rs | 19 ++-- parquet/src/file/metadata.rs | 5 ++ parquet/src/file/writer.rs | 124 +++++++++++++++----------- 3 files changed, 84 insertions(+), 64 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 6b9dc25a5e30..74186dcaca6b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,8 +43,8 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; -use crate::file::properties::{BloomFilterPosition, WriterProperties, WriterPropertiesPtr}; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; +use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; @@ -185,7 +185,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.writer.flushed_row_groups() } @@ -263,12 +263,7 @@ impl ArrowWriter { for chunk in in_progress.close()? { chunk.append_to_row_group(&mut row_group_writer)?; } - let row_group_metadata = row_group_writer.close()?; - match self.writer.properties().bloom_filter_position() { - BloomFilterPosition::AfterRowGroup => - self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?, - BloomFilterPosition::End => (), - } + row_group_writer.close()?; Ok(()) } @@ -1044,7 +1039,9 @@ mod tests { use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; + use crate::file::properties::{ + BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, + }; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1761,7 +1758,7 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) - .set_bloom_filter_position(BloomFilterPosition::End) + .set_bloom_filter_position(BloomFilterPosition::AfterRowGroup) .build(); files.push(roundtrip_opts(&expected_batch, props)) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 853d5ffec8b0..673dc11fd1d6 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -324,6 +324,11 @@ impl RowGroupMetaData { &self.columns } + /// Returns mutable slice of column chunk metadata. + pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { + &mut self.columns + } + /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7f7f642b64a3..77bcb7384536 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -34,8 +34,9 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; +use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; -use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; +use crate::file::{metadata::*, PARQUET_MAGIC}; use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; /// A wrapper around a [`Write`] that keeps track of the number @@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the row group metadata /// - the column index for each column chunk /// - the offset index for each column chunk -pub type OnCloseRowGroup<'a> = Box< +pub type OnCloseRowGroup<'a, W> = Box< dyn FnOnce( - RowGroupMetaDataPtr, + &'a mut TrackedWrite, + RowGroupMetaData, Vec>, Vec>, Vec>, @@ -143,7 +145,7 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, @@ -197,18 +199,28 @@ impl SerializedFileWriter { self.row_group_index += 1; + let bloom_filter_position = self.properties().bloom_filter_position(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = - |metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| { - row_groups.push(metadata); - row_bloom_filters.push(row_group_bloom_filter); - row_column_indexes.push(row_group_column_index); - row_offset_indexes.push(row_group_offset_index); - Ok(()) + let on_close = move |buf, + mut metadata, + row_group_bloom_filter, + row_group_column_index, + row_group_offset_index| { + row_bloom_filters.push(row_group_bloom_filter); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); + match bloom_filter_position { + BloomFilterPosition::AfterRowGroup => { + write_bloom_filters(buf, row_bloom_filters, &mut metadata)? + } + BloomFilterPosition::End => (), }; + row_groups.push(metadata); + Ok(()) + }; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), @@ -221,7 +233,7 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { &self.row_groups } @@ -273,40 +285,6 @@ impl SerializedFileWriter { Ok(()) } - /// Serialize all the bloom filter to the file - pub fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { - // iter row group - // iter each column - // write bloom filter to the file - for row_group in row_groups.iter_mut() { - let row_group_idx: u16 = row_group - .ordinal - .expect("Missing row group ordinal") - .try_into() - .expect("Negative row group ordinal"); - let row_group_idx = row_group_idx as usize; - for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { - match self.bloom_filters[row_group_idx][column_idx].take() { - Some(bloom_filter) => { - let start_offset = self.buf.bytes_written(); - bloom_filter.write(&mut self.buf)?; - let end_offset = self.buf.bytes_written(); - // set offset and index for bloom filter - let column_chunk_meta = column_chunk - .meta_data - .as_mut() - .expect("can't have bloom filter without column metadata"); - column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); - column_chunk_meta.bloom_filter_length = - Some((end_offset - start_offset) as i32); - } - None => {} - } - } - } - Ok(()) - } - /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -337,6 +315,10 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + for row_group in &mut self.row_groups { + write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; + } + let mut row_groups = self .row_groups .as_slice() @@ -344,7 +326,6 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); - self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -449,6 +430,43 @@ impl SerializedFileWriter { } } +/// Serialize all the bloom filters of the given row group to the given buffer, +/// and returns the updated row group metadata. +fn write_bloom_filters( + buf: &mut TrackedWrite, + bloom_filters: &mut Vec>>, + row_group: &mut RowGroupMetaData, +) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + + let row_group_idx: u16 = row_group + .ordinal() + .expect("Missing row group ordinal") + .try_into() + .expect("Negative row group ordinal"); + let row_group_idx = row_group_idx as usize; + for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { + match bloom_filters[row_group_idx][column_idx].take() { + Some(bloom_filter) => { + let start_offset = buf.bytes_written(); + bloom_filter.write(&mut *buf)?; + let end_offset = buf.bytes_written(); + // set offset and index for bloom filter + *column_chunk = column_chunk + .clone() + .into_builder() + .set_bloom_filter_offset(Some(start_offset as i64)) + .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) + .build()?; + } + None => {} + } + } + Ok(()) +} + /// Parquet row group writer API. /// Provides methods to access column writers in an iterator-like fashion, order is /// guaranteed to match the order of schema leaves (column descriptors). @@ -474,7 +492,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { offset_indexes: Vec>, row_group_index: i16, file_offset: i64, - on_close: Option>, + on_close: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -491,7 +509,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: i16, - on_close: Option>, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); let file_offset = buf.bytes_written() as i64; @@ -675,12 +693,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_file_offset(self.file_offset) .build()?; - let metadata = Arc::new(row_group_metadata); - self.row_group_metadata = Some(metadata.clone()); + self.row_group_metadata = Some(Arc::new(row_group_metadata.clone())); if let Some(on_close) = self.on_close.take() { on_close( - metadata, + self.buf, + row_group_metadata, self.bloom_filters, self.column_indexes, self.offset_indexes, @@ -1452,7 +1470,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); + assert_eq!(flushed[idx], Arc::unwrap_or_clone(last_group)); } let file_metadata = file_writer.close().unwrap(); From 83b475e6ee8db1d9dc7a75a2e0ecb46f6352135e Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 22:18:08 +0200 Subject: [PATCH 05/13] Add a test for Bloom Filters written at the end --- parquet/examples/write_parquet.rs | 1 + parquet/src/arrow/arrow_writer/mod.rs | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs index 06259ee5ad04..22f9028ba532 100644 --- a/parquet/examples/write_parquet.rs +++ b/parquet/examples/write_parquet.rs @@ -34,6 +34,7 @@ fn main() -> Result<()> { let properties = WriterProperties::builder() .set_column_bloom_filter_enabled("id".into(), true) .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) + .set_bloom_filter_position(BloomFilterPosition::End) .build(); let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); // Create parquet file that will be read. diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 74186dcaca6b..da24b5e24022 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1689,6 +1689,7 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, + bloom_filter_position: BloomFilterPosition, } impl RoundTripOptions { @@ -1699,6 +1700,7 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, + bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } } @@ -1718,6 +1720,7 @@ mod tests { values, schema, bloom_filter, + bloom_filter_position, } = options; let encodings = match values.data_type() { @@ -1758,7 +1761,7 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) - .set_bloom_filter_position(BloomFilterPosition::AfterRowGroup) + .set_bloom_filter_position(bloom_filter_position) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -2106,6 +2109,22 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn i32_column_bloom_filter_at_end() { + let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + options.bloom_filter_position = BloomFilterPosition::End; + + let files = one_column_roundtrip_with_options(options); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); From 3f810b5ff6e27c1f83718d6dac8947c560d4ab15 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 22:22:45 +0200 Subject: [PATCH 06/13] Update async writer accordingly --- parquet/src/arrow/async_writer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 0bedf1fcb731..28efbdc7c66e 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,7 +54,7 @@ use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties}, + file::{metadata::RowGroupMetaData, properties::WriterProperties}, format::{FileMetaData, KeyValue}, }; use arrow_array::RecordBatch; @@ -172,7 +172,7 @@ impl AsyncArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.sync_writer.flushed_row_groups() } From ad0c40e26311af5a48c1d8c38cc6d61f3b544a8e Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 22:28:41 +0200 Subject: [PATCH 07/13] Undo accidental commit --- parquet/examples/write_parquet.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs index 22f9028ba532..06259ee5ad04 100644 --- a/parquet/examples/write_parquet.rs +++ b/parquet/examples/write_parquet.rs @@ -34,7 +34,6 @@ fn main() -> Result<()> { let properties = WriterProperties::builder() .set_column_bloom_filter_enabled("id".into(), true) .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) - .set_bloom_filter_position(BloomFilterPosition::End) .build(); let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); // Create parquet file that will be read. From 74c40ee8dae1db70e15eb818e3da53ac4a1b93a1 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 10 Jun 2024 22:39:49 +0200 Subject: [PATCH 08/13] Clippy --- parquet/src/file/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 77bcb7384536..d3f4b514ff49 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -434,7 +434,7 @@ impl SerializedFileWriter { /// and returns the updated row group metadata. fn write_bloom_filters( buf: &mut TrackedWrite, - bloom_filters: &mut Vec>>, + bloom_filters: &mut [Vec>], row_group: &mut RowGroupMetaData, ) -> Result<()> { // iter row group From f237e8c62640f94a76b87c9a77e231bfe3a7ac80 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Thu, 13 Jun 2024 21:07:32 +0200 Subject: [PATCH 09/13] Apply suggestions from code review Improve documentation Co-authored-by: Andrew Lamb --- parquet/src/file/properties.rs | 2 +- parquet/src/file/writer.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 8bcb760e8a64..66c75a339ed6 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -508,7 +508,7 @@ impl WriterPropertiesBuilder { self } - /// Sets where in the final file Bloom Filters are written + /// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`) pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { self.bloom_filter_position = value; self diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index d3f4b514ff49..d341c6cd9db7 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -212,6 +212,7 @@ impl SerializedFileWriter { row_bloom_filters.push(row_group_bloom_filter); row_column_indexes.push(row_group_column_index); row_offset_indexes.push(row_group_offset_index); + // write bloom filters out immediately after the row group if requested match bloom_filter_position { BloomFilterPosition::AfterRowGroup => { write_bloom_filters(buf, row_bloom_filters, &mut metadata)? @@ -315,6 +316,7 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + // write out any remaining bloom filters after all row groups for row_group in &mut self.row_groups { write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; } From d2a7ab8d6e7ba1771311352ec5c58f4914b36551 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Thu, 13 Jun 2024 21:49:02 +0200 Subject: [PATCH 10/13] Rewrite example with constants as parameters and fewer dependencies --- parquet/Cargo.toml | 10 ++-- parquet/examples/write_parquet.rs | 91 +++++++++++++++++++++++++------ 2 files changed, 79 insertions(+), 22 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e8c8d5dda76d..eec7faf09d06 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,9 +67,7 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } - -dsi-progress-logger = { version = "0.2.4", optional = true } -simplelog = { version = "0.12.2", optional = true } +sysinfo = { version = "0.30.12", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -117,8 +115,8 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd", "zstd-sys"] -# Enable progress logging -log = ["dep:simplelog", "dep:dsi-progress-logger"] +# Display memory in example/write_parquet.rs +sysinfo = ["dep:sysinfo"] [[example]] name = "read_parquet" @@ -127,7 +125,7 @@ path = "./examples/read_parquet.rs" [[example]] name = "write_parquet" -required-features = ["log"] +required-features = ["cli", "sysinfo"] path = "./examples/write_parquet.rs" [[example]] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs index 06259ee5ad04..bd7d585b2f7d 100644 --- a/parquet/examples/write_parquet.rs +++ b/parquet/examples/write_parquet.rs @@ -16,42 +16,100 @@ // under the License. use std::fs::File; +use std::path::PathBuf; use std::sync::Arc; - -use dsi_progress_logger::prelude::*; +use std::time::{Duration, Instant}; use arrow::array::{StructArray, UInt64Builder}; use arrow::datatypes::DataType::UInt64; use arrow::datatypes::{Field, Schema}; +use clap::{Parser, ValueEnum}; use parquet::arrow::ArrowWriter as ParquetWriter; use parquet::basic::Encoding; use parquet::errors::Result; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{BloomFilterPosition, WriterProperties}; +use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System, MemoryRefreshKind}; + +#[derive(ValueEnum, Clone)] +enum BloomFilterPositionArg { + End, + AfterRowGroup, +} + +#[derive(Parser)] +#[command(version)] +/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage. +struct Args { + #[arg(long, default_value_t = 1000)] + /// Number of batches to write + iterations: u64, + + #[arg(long, default_value_t = 1000000)] + /// Number of rows in each batch + batch: u64, + + #[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)] + /// Where to write Bloom Filters + bloom_filter_position: BloomFilterPositionArg, + + /// Path to the file to write + path: PathBuf, +} + +fn now() -> String { + chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string() +} + +fn mem(system: &mut System) -> String { + let pid = Pid::from(std::process::id() as usize); + system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory()); + system + .process(pid) + .map(|proc| format!("{}MB", proc.memory() / 1_000_000)) + .unwrap_or("N/A".to_string()) +} fn main() -> Result<()> { - let _ = simplelog::SimpleLogger::init(simplelog::LevelFilter::Info, Default::default()); + let args = Args::parse(); + + let bloom_filter_position = match args.bloom_filter_position { + BloomFilterPositionArg::End => BloomFilterPosition::End, + BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup, + }; let properties = WriterProperties::builder() .set_column_bloom_filter_enabled("id".into(), true) .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) + .set_bloom_filter_position(bloom_filter_position) .build(); let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); // Create parquet file that will be read. - let path = "/tmp/test.parquet"; - let file = File::create(path).unwrap(); + let file = File::create(args.path).unwrap(); let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; - let num_iterations = 3000; - let mut pl = progress_logger!( - item_name = "iterations", - display_memory = true, - expected_updates = Some(num_iterations as usize) + let mut system = System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); + eprintln!( + "{} Writing {} batches of {} rows. RSS = {}", + now(), + args.iterations, + args.batch, + mem(&mut system) ); - pl.start("Writing batches"); + let mut array_builder = UInt64Builder::new(); - for i in 0..num_iterations { - pl.update(); - for j in 0..1_000_000 { + let mut last_log = Instant::now(); + for i in 0..args.iterations { + if Instant::now() - last_log > Duration::new(10, 0) { + last_log = Instant::now(); + eprintln!( + "{} Iteration {}/{}. RSS = {}", + now(), + i + 1, + args.iterations, + mem(&mut system) + ); + } + for j in 0..args.batch { array_builder.append_value(i + j); } writer.write( @@ -65,7 +123,8 @@ fn main() -> Result<()> { } writer.flush()?; writer.close()?; - pl.done(); + + eprintln!("{} Done. RSS = {}", now(), mem(&mut system)); Ok(()) } From 942c6ab06bb06c3c6ecfcc7b5effe4f82edd2323 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Thu, 13 Jun 2024 21:53:38 +0200 Subject: [PATCH 11/13] rustfmt --- parquet/examples/write_parquet.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs index bd7d585b2f7d..d2ef550df840 100644 --- a/parquet/examples/write_parquet.rs +++ b/parquet/examples/write_parquet.rs @@ -28,7 +28,7 @@ use parquet::arrow::ArrowWriter as ParquetWriter; use parquet::basic::Encoding; use parquet::errors::Result; use parquet::file::properties::{BloomFilterPosition, WriterProperties}; -use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System, MemoryRefreshKind}; +use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System}; #[derive(ValueEnum, Clone)] enum BloomFilterPositionArg { @@ -87,7 +87,8 @@ fn main() -> Result<()> { let file = File::create(args.path).unwrap(); let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; - let mut system = System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); + let mut system = + System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); eprintln!( "{} Writing {} batches of {} rows. RSS = {}", now(), From e4a588da7a772556719917238ff377e01a740282 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Thu, 13 Jun 2024 21:57:59 +0200 Subject: [PATCH 12/13] Clippy --- parquet/src/file/writer.rs | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index d341c6cd9db7..1d52a46e91a1 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -450,20 +450,17 @@ fn write_bloom_filters( .expect("Negative row group ordinal"); let row_group_idx = row_group_idx as usize; for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { - match bloom_filters[row_group_idx][column_idx].take() { - Some(bloom_filter) => { - let start_offset = buf.bytes_written(); - bloom_filter.write(&mut *buf)?; - let end_offset = buf.bytes_written(); - // set offset and index for bloom filter - *column_chunk = column_chunk - .clone() - .into_builder() - .set_bloom_filter_offset(Some(start_offset as i64)) - .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) - .build()?; - } - None => {} + if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() { + let start_offset = buf.bytes_written(); + bloom_filter.write(&mut *buf)?; + let end_offset = buf.bytes_written(); + // set offset and index for bloom filter + *column_chunk = column_chunk + .clone() + .into_builder() + .set_bloom_filter_offset(Some(start_offset as i64)) + .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) + .build()?; } } Ok(()) From ed9e576fca0fe1c49d25f66d2f12ce2fab95fb69 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Thu, 13 Jun 2024 22:04:16 +0200 Subject: [PATCH 13/13] Fix MSRV --- parquet/src/file/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 1d52a46e91a1..eb633f31c477 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1469,7 +1469,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(flushed[idx], Arc::unwrap_or_clone(last_group)); + assert_eq!(&flushed[idx], last_group.as_ref()); } let file_metadata = file_writer.close().unwrap();