Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): expression node level non-strict evaluation #12461

Merged
merged 22 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 40 additions & 26 deletions e2e_test/streaming/non_strict_mode.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Test compute errors are replaced with NULLs.
# See also <https://github.com/risingwavelabs/risingwave/issues/4625>
#
# UPDATE: after <https://github.com/risingwavelabs/risingwave/pull/12461>, the non-strict
# NULL padding is applied for each expression node instead of the root node only.

statement ok
create table t(x int);
Expand All @@ -8,46 +11,37 @@ statement ok
insert into t values (0),(1),(2),(NULL);

statement ok
create materialized view mv_proj as select 10/x as v from t;
create materialized view mv_proj as select x, 10/x as v from t;

statement ok
create materialized view mv_proj_is_null as select 10/x is null as v from t;
create materialized view mv_proj_is_null as select x, 10/x is null as v from t;

query I rowsort
select * from mv_proj;
query I
select * from mv_proj order by x;
----
10
5
NULL
NULL

# result for 0 is NULL, instead of true
query B rowsort
select * from mv_proj_is_null;
0 NULL
1 10
2 5
NULL NULL

# 10/0 fails, which is replaced with NULL, then NULL `is null`
query T
select * from mv_proj_is_null order by x;
----
NULL
f
f
t
0 t
1 f
2 f
NULL t

statement ok
create materialized view mv_filter as select * from t where 10/x > 0;

statement ok
create materialized view mv_filter_is_null as select * from t where 10/x > 0 is null;

query I rowsort
select * from mv_filter;
----
1
2

# result for 0 is not included
query I rowsort
select * from mv_filter_is_null;
----
NULL

statement ok
drop materialized view mv_proj;

Expand All @@ -58,7 +52,27 @@ statement ok
drop materialized view mv_filter;

statement ok
drop materialized view mv_filter_is_null;
drop table t;

statement ok
create table t(x varchar);

statement ok
insert into t values ('two'), ('4');

statement ok
create materialized view mv_coalesce as select coalesce(x::int, 0) as v from t;

# convert 'two' to int fails, which is replaced with NULL, then coalesced to 0
# https://github.com/risingwavelabs/risingwave/issues/11586
query I rowsort
select * from mv_coalesce;
----
0
4

statement ok
drop materialized view mv_coalesce;

statement ok
drop table t;
1 change: 1 addition & 0 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = "0.1"
auto_impl = "1"
await-tree = { workspace = true }
cfg-or-panic = "0.2"
chrono = { version = "0.4", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl AggCall {
})
.collect();
let filter = match agg_call.filter {
Some(ref pb_filter) => Some(build_from_prost(pb_filter)?.into()),
Some(ref pb_filter) => Some(build_from_prost(pb_filter)?.into()), /* TODO: non-strict filter in streaming */
None => None,
};
let direct_args = agg_call
Expand Down
200 changes: 160 additions & 40 deletions src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,57 +27,160 @@ use super::expr_in::InExpression;
use super::expr_some_all::SomeAllExpression;
use super::expr_udf::UdfExpression;
use super::expr_vnode::VnodeExpression;
use super::wrapper::Checked;
use crate::expr::{
BoxedExpression, Expression, InputRefExpression, LiteralExpression, TryFromExprNodeBoxed,
};
use super::wrapper::{Checked, EvalErrorReport, NonStrict};
use crate::expr::{BoxedExpression, Expression, InputRefExpression, LiteralExpression};
use crate::sig::func::FUNC_SIG_MAP;
use crate::sig::FuncSigDebug;
use crate::{bail, ExprError, Result};

/// Build an expression from protobuf.
fn build_from_prost_inner(prost: &ExprNode) -> Result<BoxedExpression> {
use PbType as E;

let func_call = match prost.get_rex_node()? {
RexNode::InputRef(_) => return InputRefExpression::try_from_boxed(prost),
RexNode::Constant(_) => return LiteralExpression::try_from_boxed(prost),
RexNode::Udf(_) => return UdfExpression::try_from_boxed(prost),
RexNode::FuncCall(func_call) => func_call,
RexNode::Now(_) => unreachable!("now should not be built at backend"),
};

let func_type = prost.function_type();

match func_type {
// Dedicated types
E::All | E::Some => SomeAllExpression::try_from_boxed(prost),
E::In => InExpression::try_from_boxed(prost),
E::Case => CaseExpression::try_from_boxed(prost),
E::Coalesce => CoalesceExpression::try_from_boxed(prost),
E::Field => FieldExpression::try_from_boxed(prost),
E::Vnode => VnodeExpression::try_from_boxed(prost),

_ => {
let ret_type = DataType::from(prost.get_return_type().unwrap());
let children = func_call
.get_children()
.iter()
.map(build_from_prost)
.try_collect()?;

build_func(func_type, ret_type, children)
pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
ExprBuilder::new_strict().build(prost)
}

/// Build an expression from protobuf in non-strict mode.
pub fn build_non_strict_from_prost(
prost: &ExprNode,
error_report: impl EvalErrorReport + 'static,
) -> Result<BoxedExpression> {
ExprBuilder::new_non_strict(error_report).build(prost)
}

/// Build an expression from protobuf with possibly some wrappers attached to each node.
struct ExprBuilder<R> {
/// The error reporting for non-strict mode.
///
/// If set, each expression node will be wrapped with a [`NonStrict`] node that reports
/// errors to this error reporting.
error_report: Option<R>,
}

impl ExprBuilder<!> {
/// Create a new builder in strict mode.
fn new_strict() -> Self {
Self { error_report: None }
}
}

impl<R> ExprBuilder<R>
where
R: EvalErrorReport + 'static,
{
/// Create a new builder in non-strict mode with the given error reporting.
fn new_non_strict(error_report: R) -> Self {
Self {
error_report: Some(error_report),
}
}

/// Attach wrappers to an expression.
#[expect(clippy::let_and_return)]
fn wrap(&self, expr: impl Expression + 'static) -> BoxedExpression {
let checked = Checked(expr);

let may_non_strict = if let Some(error_report) = &self.error_report {
NonStrict::new(checked, error_report.clone()).boxed()
} else {
checked.boxed()
};

may_non_strict
}

/// Build an expression with `build_inner` and attach some wrappers.
fn build(&self, prost: &ExprNode) -> Result<BoxedExpression> {
let expr = self.build_inner(prost)?;
Ok(self.wrap(expr))
}

/// Build an expression from protobuf.
fn build_inner(&self, prost: &ExprNode) -> Result<BoxedExpression> {
use PbType as E;

let build_child = |prost: &'_ ExprNode| self.build(prost);

match prost.get_rex_node()? {
RexNode::InputRef(_) => InputRefExpression::build_boxed(prost, build_child),
RexNode::Constant(_) => LiteralExpression::build_boxed(prost, build_child),
RexNode::Udf(_) => UdfExpression::build_boxed(prost, build_child),

RexNode::FuncCall(_) => match prost.function_type() {
// Dedicated types
E::All | E::Some => SomeAllExpression::build_boxed(prost, build_child),
E::In => InExpression::build_boxed(prost, build_child),
E::Case => CaseExpression::build_boxed(prost, build_child),
E::Coalesce => CoalesceExpression::build_boxed(prost, build_child),
E::Field => FieldExpression::build_boxed(prost, build_child),
E::Vnode => VnodeExpression::build_boxed(prost, build_child),

// General types, lookup in the function signature map
_ => FuncCallBuilder::build_boxed(prost, build_child),
},

RexNode::Now(_) => unreachable!("now should not be built at backend"),
}
}
}

/// Build an expression from protobuf with wrappers.
pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
let expr = build_from_prost_inner(prost)?;
/// Manually build the expression `Self` from protobuf.
pub(crate) trait Build: Expression + Sized {
/// Build the expression `Self` from protobuf.
///
/// To build children, call `build_child` on each child instead of [`build_from_prost`].
fn build(
prost: &ExprNode,
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
) -> Result<Self>;

/// Build the expression `Self` from protobuf for test, where each child is built with
/// [`build_from_prost`].
fn build_for_test(prost: &ExprNode) -> Result<Self> {
Self::build(prost, build_from_prost)
}
}

/// Manually build a boxed expression from protobuf.
pub(crate) trait BuildBoxed: 'static {
/// Build a boxed expression from protobuf.
fn build_boxed(
prost: &ExprNode,
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
) -> Result<BoxedExpression>;
}

/// Implement [`BuildBoxed`] for all expressions that implement [`Build`].
impl<E: Build + 'static> BuildBoxed for E {
fn build_boxed(
prost: &ExprNode,
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
) -> Result<BoxedExpression> {
Self::build(prost, build_child).map(Expression::boxed)
}
}

let checked = Checked(expr);
/// Build a function call expression from protobuf with [`build_func`].
struct FuncCallBuilder;

Ok(checked.boxed())
impl BuildBoxed for FuncCallBuilder {
fn build_boxed(
prost: &ExprNode,
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
) -> Result<BoxedExpression> {
let func_type = prost.function_type();
let ret_type = DataType::from(prost.get_return_type().unwrap());
let func_call = prost
.get_rex_node()?
.as_func_call()
.expect("not a func call");

let children = func_call
.get_children()
.iter()
.map(build_child)
.try_collect()?;

build_func(func_type, ret_type, children)
}
}

/// Build an expression in `FuncCall` variant.
Expand Down Expand Up @@ -111,9 +214,26 @@ pub fn build_func(
}
))
})?;

(desc.build)(ret_type, children)
}

/// Build an expression in `FuncCall` variant in non-strict mode.
///
/// Note: This is a workaround, and only the root node are wrappedin non-strict mode.
/// Prefer [`build_non_strict_from_prost`] if possible.
pub fn build_func_non_strict(
func: PbType,
ret_type: DataType,
children: Vec<BoxedExpression>,
error_report: impl EvalErrorReport + 'static,
) -> Result<BoxedExpression> {
let expr = build_func(func, ret_type, children)?;
let wrapped = ExprBuilder::new_non_strict(error_report).wrap(expr);

Ok(wrapped)
}

pub(super) fn get_children_and_return_type(prost: &ExprNode) -> Result<(&[ExprNode], DataType)> {
let ret_type = DataType::from(prost.get_return_type().unwrap());
if let RexNode::FuncCall(func_call) = prost.get_rex_node().unwrap() {
Expand Down
Loading
Loading