diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 5da2469a764f..a001fc7c5803 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -88,6 +88,10 @@ harness = false name = "filter_query_sql" harness = false +[[bench]] +name = "window_query_sql" +harness = false + [[bench]] name = "scalar" harness = false diff --git a/datafusion/benches/aggregate_query_sql.rs b/datafusion/benches/aggregate_query_sql.rs index 74798ae572cd..b8fe06fd9145 100644 --- a/datafusion/benches/aggregate_query_sql.rs +++ b/datafusion/benches/aggregate_query_sql.rs @@ -17,68 +17,21 @@ #[macro_use] extern crate criterion; -use criterion::Criterion; - -use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; -use std::sync::{Arc, Mutex}; -use tokio::runtime::Runtime; - extern crate arrow; extern crate datafusion; -use arrow::{ - array::Float32Array, - array::Float64Array, - array::StringArray, - array::UInt64Array, - datatypes::{DataType, Field, Schema}, - record_batch::RecordBatch, -}; - -use datafusion::datasource::MemTable; +mod data_utils; +use crate::criterion::Criterion; +use data_utils::create_table_provider; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; - -pub fn seedable_rng() -> StdRng { - StdRng::seed_from_u64(42) -} +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); - - // execute the query let df = ctx.lock().unwrap().sql(sql).unwrap(); - rt.block_on(df.collect()).unwrap(); -} - -fn create_data(size: usize, null_density: f64) -> Vec> { - // use random numbers to avoid spurious compiler optimizations wrt to branching - let mut rng = seedable_rng(); - - (0..size) - .map(|_| { - if rng.gen::() > null_density { - None - } else { - Some(rng.gen::()) - } - }) - .collect() -} - -fn create_integer_data(size: usize, value_density: f64) -> Vec> { - // use random numbers to avoid spurious compiler optimizations wrt to branching - let mut rng = seedable_rng(); - - (0..size) - .map(|_| { - if rng.gen::() > value_density { - None - } else { - Some(rng.gen::()) - } - }) - .collect() + criterion::black_box(rt.block_on(df.collect()).unwrap()); } fn create_context( @@ -86,72 +39,9 @@ fn create_context( array_len: usize, batch_size: usize, ) -> Result>> { - // define a schema. - let schema = Arc::new(Schema::new(vec![ - Field::new("utf8", DataType::Utf8, false), - Field::new("f32", DataType::Float32, false), - Field::new("f64", DataType::Float64, false), - // This field will contain integers randomly selected from a large - // range of values, i.e. [0, u64::MAX], such that there are none (or - // very few) repeated values. - Field::new("u64_wide", DataType::UInt64, false), - // This field will contain integers randomly selected from a narrow - // range of values such that there are a few distinct values, but they - // are repeated often. - Field::new("u64_narrow", DataType::UInt64, false), - ])); - - let mut rng = seedable_rng(); - - // define data. - let partitions = (0..partitions_len) - .map(|_| { - (0..array_len / batch_size / partitions_len) - .map(|i| { - // the 4 here is the number of different keys. - // a higher number increase sparseness - let vs = vec![0, 1, 2, 3]; - let keys: Vec = (0..batch_size) - .map( - // use random numbers to avoid spurious compiler optimizations wrt to branching - |_| format!("hi{:?}", vs.choose(&mut rng)), - ) - .collect(); - let keys: Vec<&str> = keys.iter().map(|e| &**e).collect(); - - let values = create_data(batch_size, 0.5); - - // Integer values between [0, u64::MAX]. - let integer_values_wide = create_integer_data(batch_size, 9.0); - - // Integer values between [0, 9]. - let integer_values_narrow_choices = (0..10).collect::>(); - let integer_values_narrow = (0..batch_size) - .map(|_| *integer_values_narrow_choices.choose(&mut rng).unwrap()) - .collect::>(); - - RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(StringArray::from(keys)), - Arc::new(Float32Array::from(vec![i as f32; batch_size])), - Arc::new(Float64Array::from(values)), - Arc::new(UInt64Array::from(integer_values_wide)), - Arc::new(UInt64Array::from(integer_values_narrow)), - ], - ) - .unwrap() - }) - .collect::>() - }) - .collect::>(); - let mut ctx = ExecutionContext::new(); - - // declare a table in memory. In spark API, this corresponds to createDataFrame(...). - let provider = MemTable::try_new(schema, partitions)?; - ctx.register_table("t", Arc::new(provider))?; - + let provider = create_table_provider(partitions_len, array_len, batch_size)?; + ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } diff --git a/datafusion/benches/data_utils/mod.rs b/datafusion/benches/data_utils/mod.rs new file mode 100644 index 000000000000..4fd8f57fa190 --- /dev/null +++ b/datafusion/benches/data_utils/mod.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides the in-memory table for more realistic benchmarking. + +use arrow::{ + array::Float32Array, + array::Float64Array, + array::StringArray, + array::UInt64Array, + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use datafusion::datasource::MemTable; +use datafusion::error::Result; +use rand::rngs::StdRng; +use rand::seq::SliceRandom; +use rand::{Rng, SeedableRng}; +use std::sync::Arc; + +/// create an in-memory table given the partition len, array len, and batch size, +/// and the result table will be of array_len in total, and then partitioned, and batched. +pub(crate) fn create_table_provider( + partitions_len: usize, + array_len: usize, + batch_size: usize, +) -> Result> { + let schema = Arc::new(create_schema()); + let partitions = + create_record_batches(schema.clone(), array_len, partitions_len, batch_size); + // declare a table in memory. In spark API, this corresponds to createDataFrame(...). + MemTable::try_new(schema, partitions).map(Arc::new) +} + +/// create a seedable [`StdRng`](rand::StdRng) +fn seedable_rng() -> StdRng { + StdRng::seed_from_u64(42) +} + +fn create_schema() -> Schema { + Schema::new(vec![ + Field::new("utf8", DataType::Utf8, false), + Field::new("f32", DataType::Float32, false), + Field::new("f64", DataType::Float64, false), + // This field will contain integers randomly selected from a large + // range of values, i.e. [0, u64::MAX], such that there are none (or + // very few) repeated values. + Field::new("u64_wide", DataType::UInt64, false), + // This field will contain integers randomly selected from a narrow + // range of values such that there are a few distinct values, but they + // are repeated often. + Field::new("u64_narrow", DataType::UInt64, false), + ]) +} + +fn create_data(size: usize, null_density: f64) -> Vec> { + // use random numbers to avoid spurious compiler optimizations wrt to branching + let mut rng = seedable_rng(); + + (0..size) + .map(|_| { + if rng.gen::() > null_density { + None + } else { + Some(rng.gen::()) + } + }) + .collect() +} + +fn create_integer_data(size: usize, value_density: f64) -> Vec> { + // use random numbers to avoid spurious compiler optimizations wrt to branching + let mut rng = seedable_rng(); + + (0..size) + .map(|_| { + if rng.gen::() > value_density { + None + } else { + Some(rng.gen::()) + } + }) + .collect() +} + +fn create_record_batch( + schema: SchemaRef, + rng: &mut StdRng, + batch_size: usize, + i: usize, +) -> RecordBatch { + // the 4 here is the number of different keys. + // a higher number increase sparseness + let vs = vec![0, 1, 2, 3]; + let keys: Vec = (0..batch_size) + .map( + // use random numbers to avoid spurious compiler optimizations wrt to branching + |_| format!("hi{:?}", vs.choose(rng)), + ) + .collect(); + let keys: Vec<&str> = keys.iter().map(|e| &**e).collect(); + + let values = create_data(batch_size, 0.5); + + // Integer values between [0, u64::MAX]. + let integer_values_wide = create_integer_data(batch_size, 9.0); + + // Integer values between [0, 9]. + let integer_values_narrow = (0..batch_size) + .map(|_| rng.gen_range(0_u64..10)) + .collect::>(); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(keys)), + Arc::new(Float32Array::from(vec![i as f32; batch_size])), + Arc::new(Float64Array::from(values)), + Arc::new(UInt64Array::from(integer_values_wide)), + Arc::new(UInt64Array::from(integer_values_narrow)), + ], + ) + .unwrap() +} + +fn create_record_batches( + schema: SchemaRef, + array_len: usize, + partitions_len: usize, + batch_size: usize, +) -> Vec> { + let mut rng = seedable_rng(); + (0..partitions_len) + .map(|_| { + (0..array_len / batch_size / partitions_len) + .map(|i| create_record_batch(schema.clone(), &mut rng, batch_size, i)) + .collect::>() + }) + .collect::>() +} diff --git a/datafusion/benches/window_query_sql.rs b/datafusion/benches/window_query_sql.rs new file mode 100644 index 000000000000..7c323be2b5ed --- /dev/null +++ b/datafusion/benches/window_query_sql.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate arrow; +extern crate datafusion; + +mod data_utils; +use crate::criterion::Criterion; +use data_utils::create_table_provider; +use datafusion::error::Result; +use datafusion::execution::context::ExecutionContext; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; + +fn query(ctx: Arc>, sql: &str) { + let rt = Runtime::new().unwrap(); + let df = ctx.lock().unwrap().sql(sql).unwrap(); + criterion::black_box(rt.block_on(df.collect()).unwrap()); +} + +fn create_context( + partitions_len: usize, + array_len: usize, + batch_size: usize, +) -> Result>> { + let mut ctx = ExecutionContext::new(); + let provider = create_table_provider(partitions_len, array_len, batch_size)?; + ctx.register_table("t", provider)?; + Ok(Arc::new(Mutex::new(ctx))) +} + +fn criterion_benchmark(c: &mut Criterion) { + let partitions_len = 8; + let array_len = 1024 * 1024; + let batch_size = 8 * 1024; + let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); + + c.bench_function("window empty over, aggregate functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + MAX(f64) OVER (), \ + MIN(f32) OVER (), \ + SUM(u64_narrow) OVER () \ + FROM t", + ) + }) + }); + + c.bench_function("window empty over, built-in functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + FIRST_VALUE(f64) OVER (), \ + LAST_VALUE(f32) OVER (), \ + NTH_VALUE(u64_narrow, 50) OVER () \ + FROM t", + ) + }) + }); + + c.bench_function("window order by, aggregate functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + MAX(f64) OVER (ORDER BY u64_narrow), \ + MIN(f32) OVER (ORDER BY u64_narrow DESC), \ + SUM(u64_narrow) OVER (ORDER BY u64_narrow ASC) \ + FROM t", + ) + }) + }); + + c.bench_function("window order by, built-in functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + FIRST_VALUE(f64) OVER (ORDER BY u64_narrow), \ + LAST_VALUE(f32) OVER (ORDER BY u64_narrow DESC), \ + NTH_VALUE(u64_narrow, 50) OVER (ORDER BY u64_narrow ASC) \ + FROM t", + ) + }) + }); + + c.bench_function("window partition by, u64_wide, aggregate functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + MAX(f64) OVER (PARTITION BY u64_wide), \ + MIN(f32) OVER (PARTITION BY u64_wide), \ + SUM(u64_narrow) OVER (PARTITION BY u64_wide) \ + FROM t", + ) + }) + }); + + c.bench_function( + "window partition by, u64_narrow, aggregate functions", + |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + MAX(f64) OVER (PARTITION BY u64_narrow), \ + MIN(f32) OVER (PARTITION BY u64_narrow), \ + SUM(u64_narrow) OVER (PARTITION BY u64_narrow) \ + FROM t", + ) + }) + }, + ); + + c.bench_function("window partition by, u64_wide, built-in functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + FIRST_VALUE(f64) OVER (PARTITION BY u64_wide), \ + LAST_VALUE(f32) OVER (PARTITION BY u64_wide), \ + NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_wide) \ + FROM t", + ) + }) + }); + + c.bench_function("window partition by, u64_narrow, built-in functions", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow), \ + LAST_VALUE(f32) OVER (PARTITION BY u64_narrow), \ + NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_narrow) \ + FROM t", + ) + }) + }); + + c.bench_function( + "window partition and order by, u64_wide, aggregate functions", + |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + MAX(f64) OVER (PARTITION BY u64_wide ORDER by f64), \ + MIN(f32) OVER (PARTITION BY u64_wide ORDER by f64), \ + SUM(u64_narrow) OVER (PARTITION BY u64_wide ORDER by f64) \ + FROM t", + ) + }) + }, + ); + + c.bench_function( + "window partition and order by, u64_narrow, aggregate functions", + |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + MAX(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \ + MIN(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \ + SUM(u64_narrow) OVER (PARTITION BY u64_narrow ORDER by f64) \ + FROM t", + ) + }) + }, + ); + + c.bench_function( + "window partition and order by, u64_wide, built-in functions", + |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + FIRST_VALUE(f64) OVER (PARTITION BY u64_wide ORDER by f64), \ + LAST_VALUE(f32) OVER (PARTITION BY u64_wide ORDER by f64), \ + NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_wide ORDER by f64) \ + FROM t", + ) + }) + }, + ); + + c.bench_function( + "window partition and order by, u64_narrow, built-in functions", + |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT \ + FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \ + LAST_VALUE(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \ + NTH_VALUE(u64_narrow, 50) OVER (PARTITION BY u64_narrow ORDER by f64) \ + FROM t", + ) + }) + }, + ); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches);