Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge string-view2 branch: reading from parquet up to 2x faster for some ClickBench queries (not on by default) #11667

Merged
merged 20 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
987e33b
Pin to pre-release version of arrow 52.2.0
alamb Jul 16, 2024
2c808fb
Update for deprecated method
alamb Jul 16, 2024
8d8732c
Add a config to force using string view in benchmark (#11514)
XiangpengHao Jul 19, 2024
8e0ca1a
Add String view helper functions (#11517)
XiangpengHao Jul 19, 2024
db65772
Add ArrowBytesViewMap and ArrowBytesViewSet (#11515)
XiangpengHao Jul 19, 2024
efcf5c6
Enable `GroupValueBytesView` for aggregation with StringView types (#…
XiangpengHao Jul 20, 2024
34d42bc
Initial support for regex_replace on `StringViewArray` (#11556)
XiangpengHao Jul 22, 2024
bb780b3
Add support for Utf8View for date/temporal codepaths (#11518)
a10y Jul 22, 2024
2b58fd5
GC `StringViewArray` in `CoalesceBatchesStream` (#11587)
XiangpengHao Jul 25, 2024
2b2b8ab
Merge remote-tracking branch 'apache/main' into string-view2
alamb Jul 26, 2024
ea11a9d
Merge remote-tracking branch 'apache/main' into string-view2
alamb Jul 26, 2024
f13bb82
[Bug] fix bug in return type inference of `utf8_to_int_type` (#11662)
XiangpengHao Jul 26, 2024
fb79638
Merge remote-tracking branch 'apache/main' into string-view2
alamb Jul 26, 2024
281fbed
Fix clippy
alamb Jul 26, 2024
5690712
Increase ByteViewMap block size to 2MB (#11674)
XiangpengHao Jul 27, 2024
322c3d2
Change `--string-view` to only apply to parquet formats (#11663)
XiangpengHao Jul 27, 2024
ab8005d
Implement native support StringView for character length (#11676)
XiangpengHao Jul 27, 2024
561aee8
Merge remote-tracking branch 'apache/main' into string-view2
alamb Jul 29, 2024
2e9c8a0
Remove uneeded patches
alamb Jul 29, 2024
f1f22fa
cargo fmt
alamb Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ impl RunOpt {
None => queries.min_query_id()..=queries.max_query_id(),
};

let config = self.common.config();
let mut config = self.common.config();
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;

Expand Down
7 changes: 7 additions & 0 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -339,6 +344,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -372,6 +378,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub struct CommonOpt {
/// Activate debug mode to see more details
#[structopt(short, long)]
pub debug: bool,

/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub string_view: bool,
}

impl CommonOpt {
Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions datafusion/common/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use arrow::{
},
datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType},
};
use arrow_array::{BinaryViewArray, StringViewArray};

// Downcast ArrayRef to Date32Array
pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> {
Expand Down Expand Up @@ -87,6 +88,11 @@ pub fn as_string_array(array: &dyn Array) -> Result<&StringArray> {
Ok(downcast_value!(array, StringArray))
}

// Downcast ArrayRef to StringViewArray
pub fn as_string_view_array(array: &dyn Array) -> Result<&StringViewArray> {
Ok(downcast_value!(array, StringViewArray))
}

// Downcast ArrayRef to UInt32Array
pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array> {
Ok(downcast_value!(array, UInt32Array))
Expand Down Expand Up @@ -221,6 +227,11 @@ pub fn as_binary_array(array: &dyn Array) -> Result<&BinaryArray> {
Ok(downcast_value!(array, BinaryArray))
}

// Downcast ArrayRef to BinaryViewArray
pub fn as_binary_view_array(array: &dyn Array) -> Result<&BinaryViewArray> {
Ok(downcast_value!(array, BinaryViewArray))
}

// Downcast ArrayRef to FixedSizeListArray
pub fn as_fixed_size_list_array(array: &dyn Array) -> Result<&FixedSizeListArray> {
Ok(downcast_value!(array, FixedSizeListArray))
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl ParquetOptions {
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_string_view: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -440,6 +441,7 @@ mod tests {
maximum_buffered_record_batches_per_stream: defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_string_view: defaults.schema_force_string_view,
}
}

Expand Down Expand Up @@ -540,6 +542,8 @@ mod tests {
maximum_buffered_record_batches_per_stream: global_options_defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_string_view: global_options_defaults
.schema_force_string_view,
},
column_specific_options,
key_value_metadata,
Expand Down
125 changes: 110 additions & 15 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use arrow_buffer::IntervalMonthDayNano;

#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
as_large_list_array, as_list_array, as_map_array, as_primitive_array,
as_string_array, as_struct_array,
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -415,8 +415,10 @@ pub fn create_hashes<'a>(
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
Expand Down Expand Up @@ -540,22 +542,57 @@ mod tests {
Ok(())
}

#[test]
fn create_hashes_binary() -> Result<()> {
let byte_array = Arc::new(BinaryArray::from_vec(vec![
&[4, 3, 2],
&[4, 3, 2],
&[1, 2, 3],
]));
macro_rules! create_hash_binary {
($NAME:ident, $ARRAY:ty) => {
#[cfg(not(feature = "force_hash_collisions"))]
#[test]
fn $NAME() {
let binary = [
Some(b"short".to_byte_slice()),
None,
Some(b"long but different 12 bytes string"),
Some(b"short2"),
Some(b"Longer than 12 bytes string"),
Some(b"short"),
Some(b"Longer than 12 bytes string"),
];

let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut binary_hashes = vec![0; binary.len()];
create_hashes(&[binary_array], &random_state, &mut binary_hashes)
.unwrap();

let mut ref_hashes = vec![0; binary.len()];
create_hashes(&[ref_array], &random_state, &mut ref_hashes).unwrap();

// Null values result in a zero hash,
for (val, hash) in binary.iter().zip(binary_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; byte_array.len()];
let hashes = create_hashes(&[byte_array], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 3,);
// same logical values should hash to the same hash value
assert_eq!(binary_hashes, ref_hashes);

Ok(())
// Same values should map to same hash values
assert_eq!(binary[0], binary[5]);
assert_eq!(binary[4], binary[6]);

// different binary should map to different hash values
assert_ne!(binary[0], binary[2]);
}
};
}

create_hash_binary!(binary_array, BinaryArray);
create_hash_binary!(binary_view_array, BinaryViewArray);

#[test]
fn create_hashes_fixed_size_binary() -> Result<()> {
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
Expand All @@ -571,6 +608,64 @@ mod tests {
Ok(())
}

macro_rules! create_hash_string {
($NAME:ident, $ARRAY:ty) => {
#[cfg(not(feature = "force_hash_collisions"))]
#[test]
fn $NAME() {
let strings = [
Some("short"),
None,
Some("long but different 12 bytes string"),
Some("short2"),
Some("Longer than 12 bytes string"),
Some("short"),
Some("Longer than 12 bytes string"),
];

let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array = Arc::new(
strings
.iter()
.cloned()
.collect::<DictionaryArray<Int8Type>>(),
);

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut string_hashes = vec![0; strings.len()];
create_hashes(&[string_array], &random_state, &mut string_hashes)
.unwrap();

let mut dict_hashes = vec![0; strings.len()];
create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();

// Null values result in a zero hash,
for (val, hash) in strings.iter().zip(string_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}

// same logical values should hash to the same hash value
assert_eq!(string_hashes, dict_hashes);

// Same values should map to same hash values
assert_eq!(strings[0], strings[5]);
assert_eq!(strings[4], strings[6]);

// different strings should map to different hash values
assert_ne!(strings[0], strings[2]);
}
};
}

create_hash_string!(string_array, StringArray);
create_hash_string!(large_string_array, LargeStringArray);
create_hash_string!(string_view_array, StringArray);
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -204,6 +205,28 @@ pub fn file_type_to_format(
}
}

/// Transform a schema to use view types for Utf8 and Binary
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileFormatFactory, FileScanConfig};
use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -316,6 +316,17 @@ impl FileFormat for ParquetFormat {
Schema::try_merge(schemas)
}?;

let schema = if state
.config_options()
.execution
.parquet
.schema_force_string_view
{
transform_schema_to_view(&schema)
} else {
schema
};

Ok(Arc::new(schema))
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ impl ListingOptions {
.try_collect()
.await?;

self.format.infer_schema(state, &store, &files).await
let schema = self.format.infer_schema(state, &store, &files).await?;

Ok(schema)
}

/// Infers the partition columns stored in `LOCATION` and compares
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,10 @@ impl ExecutionPlan for ParquetExec {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
schema_force_string_view: self
.table_parquet_options
.global
.schema_force_string_view,
};

let stream =
Expand Down
Loading