diff --git a/e2e_test/batch/basic/strict_mode.slt.part b/e2e_test/batch/basic/strict_mode.slt.part new file mode 100644 index 0000000000000..0f5081058b54b --- /dev/null +++ b/e2e_test/batch/basic/strict_mode.slt.part @@ -0,0 +1,38 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t (v int); + +statement ok +insert into t values(-1), (0), (1); + +statement ok +SET batch_expr_strict_mode = false; + +query I +SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v; +---- +-1 +NULL +1 + +# This plan consists of a BatchExchange. +query I +SELECT 1/v FROM t order by v; +---- +-1 +NULL +1 + +statement ok +SET batch_expr_strict_mode = DEFAULT; + +statement error Division by zero +SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v; + +statement error Division by zero +SELECT 1/v FROM t order by v; + +statement ok +drop table t; diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 73e84f371c35e..32dbd17b7b48a 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -23,6 +23,7 @@ user background_ddl user batch_enable_distributed_dml user batch_enable_lookup_join user batch_enable_sort_agg +user batch_expr_strict_mode user batch_parallelism user bypass_cluster_limits user bytea_output diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 487ab54e2a666..4a93a84a9265b 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -236,6 +236,7 @@ message Cardinality { // Provide statement-local context, e.g. session info like time zone, for execution. message ExprContext { string time_zone = 1; + bool strict_mode = 2; } message AdditionalColumnKey {} diff --git a/src/batch/src/executor/filter.rs b/src/batch/src/executor/filter.rs index 96049b96a4179..0291582b95b59 100644 --- a/src/batch/src/executor/filter.rs +++ b/src/batch/src/executor/filter.rs @@ -17,7 +17,7 @@ use risingwave_common::array::ArrayImpl::Bool; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_expr::expr::{build_from_prost, BoxedExpression}; +use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; @@ -90,7 +90,7 @@ impl BoxedExecutorBuilder for FilterExecutor { )?; let expr_node = filter_node.get_search_condition()?; - let expr = build_from_prost(expr_node)?; + let expr = build_batch_expr_from_prost(expr_node)?; Ok(Box::new(Self::new( expr, input, diff --git a/src/batch/src/executor/project.rs b/src/batch/src/executor/project.rs index 7fbc5540b975b..4e23aaa587985 100644 --- a/src/batch/src/executor/project.rs +++ b/src/batch/src/executor/project.rs @@ -18,7 +18,7 @@ use futures::{Stream, StreamExt}; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression}; +use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression, Expression}; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; @@ -89,7 +89,7 @@ impl BoxedExecutorBuilder for ProjectExecutor { let project_exprs: Vec<_> = project_node .get_select_list() .iter() - .map(build_from_prost) + .map(build_batch_expr_from_prost) .try_collect()?; let fields = project_exprs diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 4db15df2dbe85..c9fbde6369ef5 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -148,6 +148,7 @@ impl BatchManager { TracingContext::none(), ExprContext { time_zone: "UTC".to_string(), + strict_mode: false, }, ) .await diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index d452c51ad3539..7595ceeb21a70 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -109,6 +109,12 @@ pub struct SessionConfig { #[parameter(default = false, rename = "batch_enable_distributed_dml")] batch_enable_distributed_dml: bool, + /// Evaluate expression in strict mode for batch queries. + /// If set to false, an expression failure will not cause an error but leave a null value + /// on the result set. + #[parameter(default = true)] + batch_expr_strict_mode: bool, + /// The max gap allowed to transform small range scan into multi point lookup. #[parameter(default = 8)] max_split_range_gap: i32, diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 988adbb5d8342..3b87a78b70984 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -16,6 +16,7 @@ use std::iter::Peekable; use itertools::Itertools; use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_expr::expr::LogReport; use risingwave_pb::expr::expr_node::{PbType, RexNode}; use risingwave_pb::expr::ExprNode; @@ -29,6 +30,7 @@ use super::NonStrictExpression; use crate::expr::{ BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, }; +use crate::expr_context::strict_mode; use crate::sig::FUNCTION_REGISTRY; use crate::{bail, Result}; @@ -48,6 +50,21 @@ pub fn build_non_strict_from_prost( .map(NonStrictExpression) } +/// Build a strict or non-strict expression according to expr context. +/// +/// When strict mode is off, the expression will not fail but leave a null value as result. +/// +/// Unlike [`build_non_strict_from_prost`], the returning value here can be either non-strict or +/// strict. Thus, the caller is supposed to handle potential errors under strict mode. +pub fn build_batch_expr_from_prost(prost: &ExprNode) -> Result { + if strict_mode()? { + build_from_prost(prost) + } else { + // TODO(eric): report errors to users via psql notice + Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed()) + } +} + /// Build an expression from protobuf with possibly some wrappers attached to each node. struct ExprBuilder { /// The error reporting for non-strict mode. diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index e66f6322706f9..67d74f2a71a0d 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -22,11 +22,16 @@ define_context! { pub TIME_ZONE: String, pub FRAGMENT_ID: u32, pub VNODE_COUNT: usize, + pub STRICT_MODE: bool, } pub fn capture_expr_context() -> ExprResult { let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?; - Ok(ExprContext { time_zone }) + let strict_mode = STRICT_MODE::try_with(|v| *v)?; + Ok(ExprContext { + time_zone, + strict_mode, + }) } /// Get the vnode count from the context. @@ -36,9 +41,23 @@ pub fn vnode_count() -> ExprResult { VNODE_COUNT::try_with(|&x| x) } +/// Get the strict mode from expr context +/// +/// The return value depends on session variable. Default is true for batch query. +/// +/// Conceptually, streaming always use non-strict mode. Our implementation doesn't read this value, +/// although it's set to false as a placeholder. +pub fn strict_mode() -> ExprResult { + STRICT_MODE::try_with(|&v| v) +} + pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output where Fut: Future, { - TIME_ZONE::scope(expr_context.time_zone.to_owned(), future).await + TIME_ZONE::scope( + expr_context.time_zone.to_owned(), + STRICT_MODE::scope(expr_context.strict_mode, future), + ) + .await } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 83d85b378a114..087ba189770df 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -721,6 +721,7 @@ impl StageRunner { async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> { let expr_context = ExprContext { time_zone: self.ctx.session().config().timezone().to_owned(), + strict_mode: self.ctx.session().config().batch_expr_strict_mode(), }; // If root, we execute it locally. if !self.is_root_stage() { diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index b11d462b151c7..633dac2d2d562 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -118,6 +118,7 @@ impl LocalQueryExecution { self.batch_query_epoch, self.shutdown_rx().clone(), ); + let executor = executor.build().await?; #[for_await] @@ -146,6 +147,7 @@ impl LocalQueryExecution { let db_name = self.session.database().to_string(); let search_path = self.session.config().search_path(); let time_zone = self.session.config().timezone(); + let strict_mode = self.session.config().batch_expr_strict_mode(); let timeout = self.timeout; let meta_client = self.front_env.meta_client_ref(); @@ -166,7 +168,7 @@ impl LocalQueryExecution { } }; - use risingwave_expr::expr_context::TIME_ZONE; + use risingwave_expr::expr_context::*; use crate::expr::function_impl::context::{ AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER, @@ -179,6 +181,7 @@ impl LocalQueryExecution { let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed(); let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed(); let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed(); + let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed(); let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed(); if let Some(timeout) = timeout { diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 672fef180afde..a2d0094ef2d0a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1678,6 +1678,7 @@ mod tests { mview_definition: "".to_string(), expr_context: Some(PbExprContext { time_zone: String::from("America/New_York"), + strict_mode: false, }), } }) @@ -1798,6 +1799,7 @@ mod tests { .map(VnodeBitmap::from), expr_context: ExprContext::from(&PbExprContext { time_zone: String::from("America/New_York"), + strict_mode: false, }), } }) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index e5490a86365b9..c1a4fe1279946 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -144,6 +144,7 @@ impl StreamContext { PbExprContext { // `self.timezone` must always be set; an invalid value is used here for debugging if it's not. time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()), + strict_mode: false, } } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index db34e5fd312bd..c235d838ae27a 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -446,6 +446,7 @@ async fn test_graph_builder() -> MetaResult<()> { let graph = make_stream_graph(); let expr_context = ExprContext { time_zone: graph.ctx.as_ref().unwrap().timezone.clone(), + strict_mode: false, }; let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?; let internal_tables = fragment_graph.incomplete_internal_tables(); diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 41495198632f9..01d4ced06805c 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -46,6 +46,7 @@ use crate::task::barrier_test_utils::LocalBarrierTestEnv; async fn test_merger_sum_aggr() { let expr_context = ExprContext { time_zone: String::from("UTC"), + strict_mode: false, }; let barrier_test_env = LocalBarrierTestEnv::for_test().await;