Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(expr): new interface for expression directly returning scalar #9049

Merged
merged 10 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/expr/benches/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ fn bench_expr(c: &mut Criterion) {
.to_async(FuturesExecutor)
.iter(|| constant.eval(&input))
});
c.bench_function("extract(constant)", |bencher| {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if is there any general way to construct this test case. :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not... Let's keep it manual construction :(

let extract = build_from_pretty(format!(
"(extract:decimal HOUR:varchar ${}:timestamp)",
input_index_for_type(DataType::Timestamp)
));
bencher
.to_async(FuturesExecutor)
.iter(|| extract.eval(&input))
});

let sigs = func_sigs();
let sigs = sigs.sorted_by_cached_key(|sig| format!("{sig:?}"));
Expand Down
41 changes: 9 additions & 32 deletions src/expr/src/expr/expr_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
// limitations under the License.

use std::convert::TryFrom;
use std::sync::Arc;

use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, ArrayRef, DataChunk};
use risingwave_common::for_all_variants;
use risingwave_common::array::DataChunk;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{literal_type_match, DataType, Datum, Scalar, ScalarImpl};
use risingwave_common::types::{literal_type_match, DataType, Datum};
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::expr::expr_node::{RexNode, Type};
use risingwave_pb::expr::ExprNode;

use super::ValueImpl;
use crate::expr::Expression;
use crate::{bail, ensure, ExprError, Result};

Expand All @@ -39,33 +38,11 @@ impl Expression for LiteralExpression {
self.return_type.clone()
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
let mut array_builder = self.return_type.create_array_builder(input.capacity());
let capacity = input.capacity();
let builder = &mut array_builder;
let literal = &self.literal;

macro_rules! array_impl_literal_append {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
match (builder, literal) {
$(
(ArrayBuilderImpl::$variant_name(inner), Some(ScalarImpl::$variant_name(v))) => {
inner.append_n(capacity, Some(v.as_scalar_ref()));
}
(ArrayBuilderImpl::$variant_name(inner), None) => {
inner.append_n(capacity, None);
}
)*
(_, _) => $crate::bail!(
"Do not support values in insert values executor".to_string()
),
}
};
}

for_all_variants! { array_impl_literal_append }

Ok(Arc::new(array_builder.finish()))
async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
Ok(ValueImpl::Scalar {
value: self.literal.clone(),
capacity: input.capacity(),
})
}

async fn eval_row(&self, _input: &OwnedRow) -> Result<Datum> {
Expand Down Expand Up @@ -126,7 +103,7 @@ mod tests {
use risingwave_common::array::{I32Array, StructValue};
use risingwave_common::array_nonnull;
use risingwave_common::types::test_utils::IntervalTestExt;
use risingwave_common::types::{Decimal, Interval, IntoOrdered};
use risingwave_common::types::{Decimal, Interval, IntoOrdered, Scalar, ScalarImpl};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_pb::data::data_type::{IntervalType, TypeName};
use risingwave_pb::data::{PbDataType, PbDatum};
Expand Down
35 changes: 29 additions & 6 deletions src/expr/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ pub(crate) mod data_types;
pub(crate) mod template;
pub(crate) mod template_fast;
pub mod test_utils;
mod value;

use std::sync::Arc;

use futures_util::TryFutureExt;
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
Expand All @@ -76,9 +78,14 @@ pub use self::agg::AggKind;
pub use self::build::*;
pub use self::expr_input_ref::InputRefExpression;
pub use self::expr_literal::LiteralExpression;
pub use self::value::{ValueImpl, ValueRef};
use super::{ExprError, Result};

/// Instance of an expression
/// Interface of an expression.
///
/// There're two functions to evaluate an expression: `eval` and `eval_v2`, exactly one of them
/// should be implemented. Prefer calling and implementing `eval_v2` instead of `eval` if possible,
/// to gain the performance benefit of scalar expression.
#[async_trait::async_trait]
pub trait Expression: std::fmt::Debug + Sync + Send {
/// Get the return data type.
Expand All @@ -94,14 +101,30 @@ pub trait Expression: std::fmt::Debug + Sync + Send {
Ok(res)
}

/// Evaluate the expression
/// Evaluate the expression in vectorized execution. Returns an array.
///
/// # Arguments
/// The default implementation calls `eval_v2` and always converts the result to an array.
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
let value = self.eval_v2(input).await?;
Ok(match value {
ValueImpl::Array(array) => array,
ValueImpl::Scalar { value, capacity } => {
let mut builder = self.return_type().create_array_builder(capacity);
builder.append_datum_n(capacity, value);
builder.finish().into()
}
})
}

/// Evaluate the expression in vectorized execution. Returns a value that can be either an
/// array, or a scalar if all values in the array are the same.
///
/// * `input` - input data of the Project Executor
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef>;
/// The default implementation calls `eval` and puts the result into the `Array` variant.
async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
self.eval(input).map_ok(ValueImpl::Array).await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it stackoverflows if both not implemented? 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. 🥵 Not sure if there's a way to avoid this.

}

/// Evaluate the expression in row-based execution.
/// Evaluate the expression in row-based execution. Returns a nullable scalar.
async fn eval_row(&self, input: &OwnedRow) -> Result<Datum>;

/// Evaluate if the expression is constant.
Expand Down
81 changes: 52 additions & 29 deletions src/expr/src/expr/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,72 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use itertools::multizip;
use itertools::{multizip, Itertools};
use paste::paste;
use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk, Utf8Array};
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Utf8Array};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{option_as_scalar_ref, DataType, Datum, Scalar};
use risingwave_common::util::iter_util::ZipEqDebug;

use crate::expr::{BoxedExpression, Expression};
use crate::expr::{BoxedExpression, Expression, ValueImpl, ValueRef};

macro_rules! gen_eval {
{ ($macro:ident, $macro_row:ident), $ty_name:ident, $OA:ty, $($arg:ident,)* } => {
fn eval<'a, 'b, 'async_trait>(&'a self, data_chunk: &'b DataChunk)
-> Pin<Box<dyn Future<Output = $crate::Result<ArrayRef>> + Send + 'async_trait>>
fn eval_v2<'a, 'b, 'async_trait>(&'a self, data_chunk: &'b DataChunk)
-> Pin<Box<dyn Future<Output = $crate::Result<ValueImpl>> + Send + 'async_trait>>
where
'a: 'async_trait,
'b: 'async_trait,
{
Box::pin(async move { paste! {
$(
let [<ret_ $arg:lower>] = self.[<expr_ $arg:lower>].eval_checked(data_chunk).await?;
let [<arr_ $arg:lower>]: &$arg = [<ret_ $arg:lower>].as_ref().into();
let [<ret_ $arg:lower>] = self.[<expr_ $arg:lower>].eval_v2(data_chunk).await?;
let [<val_ $arg:lower>]: ValueRef<'_, $arg> = (&[<ret_ $arg:lower>]).into();
)*

let bitmap = data_chunk.visibility();
let mut output_array = <$OA as Array>::Builder::with_meta(data_chunk.capacity(), (&self.return_type).into());
Ok(Arc::new(match bitmap {
Some(bitmap) => {
for (($([<v_ $arg:lower>], )*), visible) in multizip(($([<arr_ $arg:lower>].iter(), )*)).zip_eq_debug(bitmap.iter()) {
if !visible {
output_array.append_null();
continue;
}
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
Ok(match ($([<val_ $arg:lower>], )*) {
// If all arguments are scalar, we can directly compute the result.
($(ValueRef::Scalar { value: [<scalar_ref_ $arg:lower>], capacity: [<cap_ $arg:lower>] }, )*) => {
let output_scalar = $macro_row!(self, $([<scalar_ref_ $arg:lower>],)*);
let output_datum = output_scalar.map(|s| s.to_scalar_value());
let capacity = data_chunk.capacity();

if cfg!(debug_assertions) {
let all_capacities = [capacity, $([<cap_ $arg:lower>], )*];
assert!(all_capacities.into_iter().all_equal(), "capacities mismatched: {:?}", all_capacities);
}
output_array.finish().into()

ValueImpl::Scalar { value: output_datum, capacity }
}
None => {
for ($([<v_ $arg:lower>], )*) in multizip(($([<arr_ $arg:lower>].iter(), )*)) {
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
}
output_array.finish().into()

// Otherwise, fallback to array computation.
($([<val_ $arg:lower>], )*) => {
let bitmap = data_chunk.visibility();
let mut output_array = <$OA as Array>::Builder::with_meta(data_chunk.capacity(), (&self.return_type).into());
let array = match bitmap {
Some(bitmap) => {
// TODO: use `izip` here.
for (($([<v_ $arg:lower>], )*), visible) in multizip(($([<val_ $arg:lower>].iter(), )*)).zip_eq_debug(bitmap.iter()) {
if !visible {
output_array.append_null();
continue;
}
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
}
output_array.finish().into()
}
None => {
// TODO: use `izip` here.
for ($([<v_ $arg:lower>], )*) in multizip(($([<val_ $arg:lower>].iter(), )*)) {
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
}
output_array.finish().into()
}
};

ValueImpl::Array(Arc::new(array))
}
}))
})
}})
}

Expand Down Expand Up @@ -141,8 +164,8 @@ macro_rules! gen_expr_normal {
F: Fn($($arg::RefItem<'_>, )*) -> $crate::Result<OA::OwnedItem> + Sync + Send,
> Expression for $ty_name<$($arg, )* OA, F>
where
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)*
for<'a> &'a OA: std::convert::From<&'a ArrayImpl>,
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)*
for<'a> ValueRef<'a, OA>: std::convert::From<&'a ValueImpl>,
{
fn return_type(&self) -> DataType {
self.return_type.clone()
Expand Down Expand Up @@ -227,7 +250,7 @@ macro_rules! gen_expr_bytes {
F: Fn($($arg::RefItem<'_>, )* &mut dyn std::fmt::Write) -> $crate::Result<()> + Sync + Send,
> Expression for $ty_name<$($arg, )* F>
where
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)*
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)*
{
fn return_type(&self) -> DataType {
self.return_type.clone()
Expand Down Expand Up @@ -303,8 +326,8 @@ macro_rules! gen_expr_nullable {
F: Fn($(Option<$arg::RefItem<'_>>, )*) -> $crate::Result<Option<OA::OwnedItem>> + Sync + Send,
> Expression for $ty_name<$($arg, )* OA, F>
where
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)*
for<'a> &'a OA: std::convert::From<&'a ArrayImpl>,
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)*
for<'a> ValueRef<'a, OA>: std::convert::From<&'a ValueImpl>,
{
fn return_type(&self) -> DataType {
self.return_type.clone()
Expand Down
75 changes: 75 additions & 0 deletions src/expr/src/expr/value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use either::Either;
use risingwave_common::array::*;
use risingwave_common::for_all_variants;
use risingwave_common::types::{Datum, Scalar};

/// The type-erased return value of an expression.
///
/// It can be either an array, or a scalar if all values in the array are the same.
#[derive(Debug, Clone)]
pub enum ValueImpl {
Array(ArrayRef),
Scalar { value: Datum, capacity: usize },
}

/// The generic reference type of [`ValueImpl`]. Used as the arguments of expressions.
#[derive(Debug, Clone, Copy)]
pub enum ValueRef<'a, A: Array> {
Array(&'a A),
Scalar {
value: Option<<A as Array>::RefItem<'a>>,
capacity: usize,
},
}

impl<'a, A: Array> ValueRef<'a, A> {
/// Iterates over all scalars in this value.
pub fn iter(self) -> impl Iterator<Item = Option<A::RefItem<'a>>> + 'a {
match self {
Self::Array(array) => Either::Left(array.iter()),
Self::Scalar { value, capacity } => {
Either::Right(std::iter::repeat(value).take(capacity))
}
}
}
}

macro_rules! impl_convert {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
$(
paste::paste! {
/// Converts a type-erased value to a reference of a specific array type.
impl<'a> From<&'a ValueImpl> for ValueRef<'a, $array> {
fn from(value: &'a ValueImpl) -> Self {
match value {
ValueImpl::Array(array) => {
let array = array.[<as_ $suffix_name>]();
ValueRef::Array(array)
},
ValueImpl::Scalar { value, capacity } => {
let value = value.as_ref().map(|v| v.[<as_ $suffix_name>]().as_scalar_ref());
ValueRef::Scalar { value, capacity: *capacity }
},
}
}
}
}
)*
};
}

for_all_variants! { impl_convert }