-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(expr): new interface for expression directly returning scalar #9049
Changes from 7 commits
76e8a24
ebd4670
2f6964a
cdeb820
c3900ab
649fe6e
1a26a7d
8a85fab
446b95a
5a25ac9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,9 +64,11 @@ pub(crate) mod data_types; | |
pub(crate) mod template; | ||
pub(crate) mod template_fast; | ||
pub mod test_utils; | ||
mod value; | ||
|
||
use std::sync::Arc; | ||
|
||
use futures_util::TryFutureExt; | ||
use risingwave_common::array::{ArrayRef, DataChunk}; | ||
use risingwave_common::row::{OwnedRow, Row}; | ||
use risingwave_common::types::{DataType, Datum}; | ||
|
@@ -76,9 +78,14 @@ pub use self::agg::AggKind; | |
pub use self::build::*; | ||
pub use self::expr_input_ref::InputRefExpression; | ||
pub use self::expr_literal::LiteralExpression; | ||
pub use self::value::{ValueImpl, ValueRef}; | ||
use super::{ExprError, Result}; | ||
|
||
/// Instance of an expression | ||
/// Interface of an expression. | ||
/// | ||
/// There're two functions to evaluate an expression: `eval` and `eval_new`, exactly one of them | ||
/// should be implemented. Prefer calling and implementing `eval_new` instead of `eval` if possible, | ||
/// to gain the performance benefit of scalar expression. | ||
#[async_trait::async_trait] | ||
pub trait Expression: std::fmt::Debug + Sync + Send { | ||
/// Get the return data type. | ||
|
@@ -94,14 +101,30 @@ pub trait Expression: std::fmt::Debug + Sync + Send { | |
Ok(res) | ||
} | ||
|
||
/// Evaluate the expression | ||
/// Evaluate the expression in vectorized execution. Returns an array. | ||
/// | ||
/// # Arguments | ||
/// The default implementation calls `eval_new` and always converts the result to an array. | ||
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> { | ||
let value = self.eval_new(input).await?; | ||
Ok(match value { | ||
ValueImpl::Array(array) => array, | ||
ValueImpl::Scalar { value, capacity } => { | ||
let mut builder = self.return_type().create_array_builder(capacity); | ||
builder.append_datum_n(capacity, value); | ||
builder.finish().into() | ||
} | ||
}) | ||
} | ||
|
||
/// Evaluate the expression in vectorized execution. Returns a value that can be either an | ||
/// array, or a scalar if all values in the array are the same. | ||
/// | ||
/// * `input` - input data of the Project Executor | ||
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef>; | ||
/// The default implementation calls `eval` and puts the result into the `Array` variant. | ||
async fn eval_new(&self, input: &DataChunk) -> Result<ValueImpl> { | ||
self.eval(input).map_ok(ValueImpl::Array).await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it stackoverflows if both not implemented? 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. 🥵 Not sure if there's a way to avoid this. |
||
} | ||
|
||
/// Evaluate the expression in row-based execution. | ||
/// Evaluate the expression in row-based execution. Returns a nullable scalar. | ||
async fn eval_row(&self, input: &OwnedRow) -> Result<Datum>; | ||
|
||
/// Evaluate if the expression is constant. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,49 +19,73 @@ use std::future::Future; | |
use std::pin::Pin; | ||
use std::sync::Arc; | ||
|
||
use itertools::multizip; | ||
use itertools::{multizip, Itertools}; | ||
use paste::paste; | ||
use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk, Utf8Array}; | ||
use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk, Utf8Array}; | ||
use risingwave_common::row::OwnedRow; | ||
use risingwave_common::types::{option_as_scalar_ref, DataType, Datum, Scalar}; | ||
use risingwave_common::util::iter_util::ZipEqDebug; | ||
|
||
use crate::expr::{BoxedExpression, Expression}; | ||
use crate::expr::{BoxedExpression, Expression, ValueImpl, ValueRef}; | ||
|
||
macro_rules! gen_eval { | ||
{ ($macro:ident, $macro_row:ident), $ty_name:ident, $OA:ty, $($arg:ident,)* } => { | ||
fn eval<'a, 'b, 'async_trait>(&'a self, data_chunk: &'b DataChunk) | ||
-> Pin<Box<dyn Future<Output = $crate::Result<ArrayRef>> + Send + 'async_trait>> | ||
fn eval_new<'a, 'b, 'async_trait>(&'a self, data_chunk: &'b DataChunk) | ||
-> Pin<Box<dyn Future<Output = $crate::Result<ValueImpl>> + Send + 'async_trait>> | ||
where | ||
'a: 'async_trait, | ||
'b: 'async_trait, | ||
{ | ||
Box::pin(async move { paste! { | ||
$( | ||
let [<ret_ $arg:lower>] = self.[<expr_ $arg:lower>].eval_checked(data_chunk).await?; | ||
let [<arr_ $arg:lower>]: &$arg = [<ret_ $arg:lower>].as_ref().into(); | ||
let [<ret_ $arg:lower>] = self.[<expr_ $arg:lower>].eval_new(data_chunk).await?; | ||
let [<val_ $arg:lower>]: ValueRef<'_, $arg> = (&[<ret_ $arg:lower>]).into(); | ||
)* | ||
|
||
let bitmap = data_chunk.visibility(); | ||
let mut output_array = <$OA as Array>::Builder::with_meta(data_chunk.capacity(), (&self.return_type).into()); | ||
Ok(Arc::new(match bitmap { | ||
Some(bitmap) => { | ||
for (($([<v_ $arg:lower>], )*), visible) in multizip(($([<arr_ $arg:lower>].iter(), )*)).zip_eq_debug(bitmap.iter()) { | ||
if !visible { | ||
output_array.append_null(); | ||
continue; | ||
} | ||
$macro!(self, output_array, $([<v_ $arg:lower>],)*) | ||
Ok(match ($([<val_ $arg:lower>], )*) { | ||
// If all arguments are scalar, we can directly compute the result. | ||
($(ValueRef::Scalar { value: [<scalar_ref_ $arg:lower>], capacity: [<cap_ $arg:lower>] }, )*) => { | ||
let output_scalar = $macro_row!(self, $([<scalar_ref_ $arg:lower>],)*); | ||
let output_datum = output_scalar.map(|s| s.to_scalar_value()); | ||
let capacity = data_chunk.capacity(); | ||
|
||
if cfg!(debug_assertions) { | ||
let all_capacities = [capacity, $([<cap_ $arg:lower>], )*]; | ||
assert!(all_capacities.into_iter().all_equal(), "capacities mismatched: {:?}", all_capacities); | ||
} | ||
output_array.finish().into() | ||
|
||
ValueImpl::Scalar { value: output_datum, capacity } | ||
} | ||
None => { | ||
for ($([<v_ $arg:lower>], )*) in multizip(($([<arr_ $arg:lower>].iter(), )*)) { | ||
$macro!(self, output_array, $([<v_ $arg:lower>],)*) | ||
} | ||
output_array.finish().into() | ||
|
||
// Otherwise, fallback to array computation. | ||
// TODO: match all possible combinations to further get rid of the overhead of `Either` iterators. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For execution, yes. The difference is that we're not allocating a new array for
BugenZhao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
($([<val_ $arg:lower>], )*) => { | ||
let bitmap = data_chunk.visibility(); | ||
let mut output_array = <$OA as Array>::Builder::with_meta(data_chunk.capacity(), (&self.return_type).into()); | ||
let array = match bitmap { | ||
Some(bitmap) => { | ||
// TODO: use `izip` here. | ||
for (($([<v_ $arg:lower>], )*), visible) in multizip(($([<val_ $arg:lower>].iter(), )*)).zip_eq_debug(bitmap.iter()) { | ||
if !visible { | ||
output_array.append_null(); | ||
continue; | ||
} | ||
$macro!(self, output_array, $([<v_ $arg:lower>],)*) | ||
} | ||
output_array.finish().into() | ||
} | ||
None => { | ||
// TODO: use `izip` here. | ||
for ($([<v_ $arg:lower>], )*) in multizip(($([<val_ $arg:lower>].iter(), )*)) { | ||
$macro!(self, output_array, $([<v_ $arg:lower>],)*) | ||
} | ||
output_array.finish().into() | ||
} | ||
}; | ||
|
||
ValueImpl::Array(Arc::new(array)) | ||
} | ||
})) | ||
}) | ||
}}) | ||
} | ||
|
||
|
@@ -141,8 +165,8 @@ macro_rules! gen_expr_normal { | |
F: Fn($($arg::RefItem<'_>, )*) -> $crate::Result<OA::OwnedItem> + Sync + Send, | ||
> Expression for $ty_name<$($arg, )* OA, F> | ||
where | ||
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)* | ||
for<'a> &'a OA: std::convert::From<&'a ArrayImpl>, | ||
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)* | ||
for<'a> ValueRef<'a, OA>: std::convert::From<&'a ValueImpl>, | ||
{ | ||
fn return_type(&self) -> DataType { | ||
self.return_type.clone() | ||
|
@@ -227,7 +251,10 @@ macro_rules! gen_expr_bytes { | |
F: Fn($($arg::RefItem<'_>, )* &mut dyn std::fmt::Write) -> $crate::Result<()> + Sync + Send, | ||
> Expression for $ty_name<$($arg, )* F> | ||
where | ||
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)* | ||
$( | ||
for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>, | ||
for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>, | ||
)* | ||
{ | ||
fn return_type(&self) -> DataType { | ||
self.return_type.clone() | ||
|
@@ -303,8 +330,8 @@ macro_rules! gen_expr_nullable { | |
F: Fn($(Option<$arg::RefItem<'_>>, )*) -> $crate::Result<Option<OA::OwnedItem>> + Sync + Send, | ||
> Expression for $ty_name<$($arg, )* OA, F> | ||
where | ||
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)* | ||
for<'a> &'a OA: std::convert::From<&'a ArrayImpl>, | ||
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)* | ||
for<'a> ValueRef<'a, OA>: std::convert::From<&'a ValueImpl>, | ||
{ | ||
fn return_type(&self) -> DataType { | ||
self.return_type.clone() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
// Copyright 2023 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use either::Either; | ||
use risingwave_common::array::*; | ||
use risingwave_common::for_all_variants; | ||
use risingwave_common::types::{Datum, Scalar}; | ||
|
||
/// The type-erased return value of an expression. | ||
/// | ||
/// It can be either an array, or a scalar if all values in the array are the same. | ||
#[derive(Debug, Clone)] | ||
pub enum ValueImpl { | ||
Array(ArrayRef), | ||
Scalar { value: Datum, capacity: usize }, | ||
} | ||
|
||
/// The generic reference type of [`ValueImpl`]. Used as the arguments of expressions. | ||
#[derive(Debug, Clone, Copy)] | ||
pub enum ValueRef<'a, A: Array> { | ||
Array(&'a A), | ||
Scalar { | ||
value: Option<<A as Array>::RefItem<'a>>, | ||
capacity: usize, | ||
}, | ||
} | ||
|
||
impl<'a, A: Array> ValueRef<'a, A> { | ||
/// Iterates over all scalars in this value. | ||
pub fn iter(self) -> impl Iterator<Item = Option<A::RefItem<'a>>> + 'a { | ||
match self { | ||
Self::Array(array) => Either::Left(array.iter()), | ||
Self::Scalar { value, capacity } => { | ||
Either::Right(std::iter::repeat(value).take(capacity)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
macro_rules! impl_convert { | ||
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => { | ||
$( | ||
paste::paste! { | ||
/// Converts a type-erased value to a reference of a specific array type. | ||
impl<'a> From<&'a ValueImpl> for ValueRef<'a, $array> { | ||
fn from(value: &'a ValueImpl) -> Self { | ||
match value { | ||
ValueImpl::Array(array) => { | ||
let array = array.[<as_ $suffix_name>](); | ||
ValueRef::Array(array) | ||
}, | ||
ValueImpl::Scalar { value, capacity } => { | ||
let value = value.as_ref().map(|v| v.[<as_ $suffix_name>]().as_scalar_ref()); | ||
ValueRef::Scalar { value, capacity: *capacity } | ||
}, | ||
} | ||
} | ||
} | ||
} | ||
)* | ||
}; | ||
} | ||
|
||
for_all_variants! { impl_convert } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if is there any general way to construct this test case. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not... Let's keep it manual construction :(