From c99ecec25f79522cfa947ca539898bc07d79360c Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 25 Nov 2024 17:37:54 +0800 Subject: [PATCH 1/6] add strict_mode in ExprContext --- proto/plan_common.proto | 1 + src/batch/src/executor/filter.rs | 12 ++++++++-- src/batch/src/executor/project.rs | 13 +++++++++-- src/expr/core/src/expr/build.rs | 5 ++++ src/expr/core/src/expr_context.rs | 23 +++++++++++++++++-- .../src/scheduler/distributed/stage.rs | 1 + src/frontend/src/scheduler/local.rs | 5 +++- src/meta/src/model/stream.rs | 1 + src/meta/src/stream/test_fragmenter.rs | 1 + src/stream/src/executor/integration_tests.rs | 1 + 10 files changed, 56 insertions(+), 7 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 487ab54e2a666..4a93a84a9265b 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -236,6 +236,7 @@ message Cardinality { // Provide statement-local context, e.g. session info like time zone, for execution. message ExprContext { string time_zone = 1; + bool strict_mode = 2; } message AdditionalColumnKey {} diff --git a/src/batch/src/executor/filter.rs b/src/batch/src/executor/filter.rs index 96049b96a4179..3ea6d80fa2eef 100644 --- a/src/batch/src/executor/filter.rs +++ b/src/batch/src/executor/filter.rs @@ -17,7 +17,10 @@ use risingwave_common::array::ArrayImpl::Bool; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_expr::expr::{build_from_prost, BoxedExpression}; +use risingwave_expr::expr::{ + build_from_prost, build_non_strict_from_prost_log_report, BoxedExpression, +}; +use risingwave_expr::expr_context::strict_mode; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; @@ -89,8 +92,13 @@ impl BoxedExecutorBuilder for FilterExecutor { NodeBody::Filter )?; + let build_expr_fn = if strict_mode()? { + build_from_prost + } else { + build_non_strict_from_prost_log_report + }; let expr_node = filter_node.get_search_condition()?; - let expr = build_from_prost(expr_node)?; + let expr = build_expr_fn(expr_node)?; Ok(Box::new(Self::new( expr, input, diff --git a/src/batch/src/executor/project.rs b/src/batch/src/executor/project.rs index 7fbc5540b975b..cab710fe0ea41 100644 --- a/src/batch/src/executor/project.rs +++ b/src/batch/src/executor/project.rs @@ -18,7 +18,10 @@ use futures::{Stream, StreamExt}; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression}; +use risingwave_expr::expr::{ + build_from_prost, build_non_strict_from_prost_log_report, BoxedExpression, Expression, +}; +use risingwave_expr::expr_context::strict_mode; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; @@ -86,10 +89,16 @@ impl BoxedExecutorBuilder for ProjectExecutor { NodeBody::Project )?; + let build_expr_fn = if strict_mode()? { + build_from_prost + } else { + build_non_strict_from_prost_log_report + }; + let project_exprs: Vec<_> = project_node .get_select_list() .iter() - .map(build_from_prost) + .map(build_expr_fn) .try_collect()?; let fields = project_exprs diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 988adbb5d8342..bbf37bb64c213 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -16,6 +16,7 @@ use std::iter::Peekable; use itertools::Itertools; use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_expr::expr::LogReport; use risingwave_pb::expr::expr_node::{PbType, RexNode}; use risingwave_pb::expr::ExprNode; @@ -48,6 +49,10 @@ pub fn build_non_strict_from_prost( .map(NonStrictExpression) } +pub fn build_non_strict_from_prost_log_report(prost: &ExprNode) -> Result { + Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed()) +} + /// Build an expression from protobuf with possibly some wrappers attached to each node. struct ExprBuilder { /// The error reporting for non-strict mode. diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index e66f6322706f9..67d74f2a71a0d 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -22,11 +22,16 @@ define_context! { pub TIME_ZONE: String, pub FRAGMENT_ID: u32, pub VNODE_COUNT: usize, + pub STRICT_MODE: bool, } pub fn capture_expr_context() -> ExprResult { let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?; - Ok(ExprContext { time_zone }) + let strict_mode = STRICT_MODE::try_with(|v| *v)?; + Ok(ExprContext { + time_zone, + strict_mode, + }) } /// Get the vnode count from the context. @@ -36,9 +41,23 @@ pub fn vnode_count() -> ExprResult { VNODE_COUNT::try_with(|&x| x) } +/// Get the strict mode from expr context +/// +/// The return value depends on session variable. Default is true for batch query. +/// +/// Conceptually, streaming always use non-strict mode. Our implementation doesn't read this value, +/// although it's set to false as a placeholder. +pub fn strict_mode() -> ExprResult { + STRICT_MODE::try_with(|&v| v) +} + pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output where Fut: Future, { - TIME_ZONE::scope(expr_context.time_zone.to_owned(), future).await + TIME_ZONE::scope( + expr_context.time_zone.to_owned(), + STRICT_MODE::scope(expr_context.strict_mode, future), + ) + .await } diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 83d85b378a114..c9557bd25920d 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -721,6 +721,7 @@ impl StageRunner { async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> { let expr_context = ExprContext { time_zone: self.ctx.session().config().timezone().to_owned(), + strict_mode: false, // todo }; // If root, we execute it locally. if !self.is_root_stage() { diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index b11d462b151c7..ded30b8e168f2 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -118,6 +118,7 @@ impl LocalQueryExecution { self.batch_query_epoch, self.shutdown_rx().clone(), ); + let executor = executor.build().await?; #[for_await] @@ -146,6 +147,7 @@ impl LocalQueryExecution { let db_name = self.session.database().to_string(); let search_path = self.session.config().search_path(); let time_zone = self.session.config().timezone(); + let strict_mode = false; // todo! let timeout = self.timeout; let meta_client = self.front_env.meta_client_ref(); @@ -166,7 +168,7 @@ impl LocalQueryExecution { } }; - use risingwave_expr::expr_context::TIME_ZONE; + use risingwave_expr::expr_context::*; use crate::expr::function_impl::context::{ AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER, @@ -179,6 +181,7 @@ impl LocalQueryExecution { let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed(); let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed(); let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed(); + let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed(); let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed(); if let Some(timeout) = timeout { diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index e5490a86365b9..c1a4fe1279946 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -144,6 +144,7 @@ impl StreamContext { PbExprContext { // `self.timezone` must always be set; an invalid value is used here for debugging if it's not. time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()), + strict_mode: false, } } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index db34e5fd312bd..2e133c3351fa0 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -446,6 +446,7 @@ async fn test_graph_builder() -> MetaResult<()> { let graph = make_stream_graph(); let expr_context = ExprContext { time_zone: graph.ctx.as_ref().unwrap().timezone.clone(), + strict_mode: false, // TODO }; let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?; let internal_tables = fragment_graph.incomplete_internal_tables(); diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 41495198632f9..01d4ced06805c 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -46,6 +46,7 @@ use crate::task::barrier_test_utils::LocalBarrierTestEnv; async fn test_merger_sum_aggr() { let expr_context = ExprContext { time_zone: String::from("UTC"), + strict_mode: false, }; let barrier_test_env = LocalBarrierTestEnv::for_test().await; From 1e887f1cdc685218e7fa6da6e1dc5bb3aa1dec29 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 25 Nov 2024 17:45:37 +0800 Subject: [PATCH 2/6] add batch_expr_strict_mode in session vars --- src/batch/src/task/task_manager.rs | 1 + src/common/src/session_config/mod.rs | 6 ++++++ src/frontend/src/scheduler/distributed/stage.rs | 2 +- src/frontend/src/scheduler/local.rs | 2 +- src/meta/src/stream/test_fragmenter.rs | 2 +- 5 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 4db15df2dbe85..c9fbde6369ef5 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -148,6 +148,7 @@ impl BatchManager { TracingContext::none(), ExprContext { time_zone: "UTC".to_string(), + strict_mode: false, }, ) .await diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index d452c51ad3539..7595ceeb21a70 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -109,6 +109,12 @@ pub struct SessionConfig { #[parameter(default = false, rename = "batch_enable_distributed_dml")] batch_enable_distributed_dml: bool, + /// Evaluate expression in strict mode for batch queries. + /// If set to false, an expression failure will not cause an error but leave a null value + /// on the result set. + #[parameter(default = true)] + batch_expr_strict_mode: bool, + /// The max gap allowed to transform small range scan into multi point lookup. #[parameter(default = 8)] max_split_range_gap: i32, diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index c9557bd25920d..087ba189770df 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -721,7 +721,7 @@ impl StageRunner { async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> { let expr_context = ExprContext { time_zone: self.ctx.session().config().timezone().to_owned(), - strict_mode: false, // todo + strict_mode: self.ctx.session().config().batch_expr_strict_mode(), }; // If root, we execute it locally. if !self.is_root_stage() { diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index ded30b8e168f2..633dac2d2d562 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -147,7 +147,7 @@ impl LocalQueryExecution { let db_name = self.session.database().to_string(); let search_path = self.session.config().search_path(); let time_zone = self.session.config().timezone(); - let strict_mode = false; // todo! + let strict_mode = self.session.config().batch_expr_strict_mode(); let timeout = self.timeout; let meta_client = self.front_env.meta_client_ref(); diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 2e133c3351fa0..c235d838ae27a 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -446,7 +446,7 @@ async fn test_graph_builder() -> MetaResult<()> { let graph = make_stream_graph(); let expr_context = ExprContext { time_zone: graph.ctx.as_ref().unwrap().timezone.clone(), - strict_mode: false, // TODO + strict_mode: false, }; let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?; let internal_tables = fragment_graph.incomplete_internal_tables(); From 018c31113f9129c3eb0895f1cd7be106470d7b80 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 25 Nov 2024 18:02:57 +0800 Subject: [PATCH 3/6] refine code --- src/batch/src/executor/filter.rs | 12 ++---------- src/batch/src/executor/project.rs | 13 ++----------- src/expr/core/src/expr/build.rs | 16 ++++++++++++++-- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/batch/src/executor/filter.rs b/src/batch/src/executor/filter.rs index 3ea6d80fa2eef..0291582b95b59 100644 --- a/src/batch/src/executor/filter.rs +++ b/src/batch/src/executor/filter.rs @@ -17,10 +17,7 @@ use risingwave_common::array::ArrayImpl::Bool; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_expr::expr::{ - build_from_prost, build_non_strict_from_prost_log_report, BoxedExpression, -}; -use risingwave_expr::expr_context::strict_mode; +use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; @@ -92,13 +89,8 @@ impl BoxedExecutorBuilder for FilterExecutor { NodeBody::Filter )?; - let build_expr_fn = if strict_mode()? { - build_from_prost - } else { - build_non_strict_from_prost_log_report - }; let expr_node = filter_node.get_search_condition()?; - let expr = build_expr_fn(expr_node)?; + let expr = build_batch_expr_from_prost(expr_node)?; Ok(Box::new(Self::new( expr, input, diff --git a/src/batch/src/executor/project.rs b/src/batch/src/executor/project.rs index cab710fe0ea41..4e23aaa587985 100644 --- a/src/batch/src/executor/project.rs +++ b/src/batch/src/executor/project.rs @@ -18,10 +18,7 @@ use futures::{Stream, StreamExt}; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_expr::expr::{ - build_from_prost, build_non_strict_from_prost_log_report, BoxedExpression, Expression, -}; -use risingwave_expr::expr_context::strict_mode; +use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression, Expression}; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; @@ -89,16 +86,10 @@ impl BoxedExecutorBuilder for ProjectExecutor { NodeBody::Project )?; - let build_expr_fn = if strict_mode()? { - build_from_prost - } else { - build_non_strict_from_prost_log_report - }; - let project_exprs: Vec<_> = project_node .get_select_list() .iter() - .map(build_expr_fn) + .map(build_batch_expr_from_prost) .try_collect()?; let fields = project_exprs diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index bbf37bb64c213..3b87a78b70984 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -30,6 +30,7 @@ use super::NonStrictExpression; use crate::expr::{ BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, }; +use crate::expr_context::strict_mode; use crate::sig::FUNCTION_REGISTRY; use crate::{bail, Result}; @@ -49,8 +50,19 @@ pub fn build_non_strict_from_prost( .map(NonStrictExpression) } -pub fn build_non_strict_from_prost_log_report(prost: &ExprNode) -> Result { - Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed()) +/// Build a strict or non-strict expression according to expr context. +/// +/// When strict mode is off, the expression will not fail but leave a null value as result. +/// +/// Unlike [`build_non_strict_from_prost`], the returning value here can be either non-strict or +/// strict. Thus, the caller is supposed to handle potential errors under strict mode. +pub fn build_batch_expr_from_prost(prost: &ExprNode) -> Result { + if strict_mode()? { + build_from_prost(prost) + } else { + // TODO(eric): report errors to users via psql notice + Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed()) + } } /// Build an expression from protobuf with possibly some wrappers attached to each node. From 63cc69f8848fe08982884cafb1c2105c87e724ab Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 25 Nov 2024 18:18:58 +0800 Subject: [PATCH 4/6] add e2e test --- e2e_test/batch/basic/strict_mode.slt.part | 38 +++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 e2e_test/batch/basic/strict_mode.slt.part diff --git a/e2e_test/batch/basic/strict_mode.slt.part b/e2e_test/batch/basic/strict_mode.slt.part new file mode 100644 index 0000000000000..0f5081058b54b --- /dev/null +++ b/e2e_test/batch/basic/strict_mode.slt.part @@ -0,0 +1,38 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t (v int); + +statement ok +insert into t values(-1), (0), (1); + +statement ok +SET batch_expr_strict_mode = false; + +query I +SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v; +---- +-1 +NULL +1 + +# This plan consists of a BatchExchange. +query I +SELECT 1/v FROM t order by v; +---- +-1 +NULL +1 + +statement ok +SET batch_expr_strict_mode = DEFAULT; + +statement error Division by zero +SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v; + +statement error Division by zero +SELECT 1/v FROM t order by v; + +statement ok +drop table t; From 553b184d6c0f3d7b379411bbdec5693707abba12 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 26 Nov 2024 15:55:16 +0800 Subject: [PATCH 5/6] fix --- src/meta/src/controller/fragment.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 672fef180afde..a2d0094ef2d0a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1678,6 +1678,7 @@ mod tests { mview_definition: "".to_string(), expr_context: Some(PbExprContext { time_zone: String::from("America/New_York"), + strict_mode: false, }), } }) @@ -1798,6 +1799,7 @@ mod tests { .map(VnodeBitmap::from), expr_context: ExprContext::from(&PbExprContext { time_zone: String::from("America/New_York"), + strict_mode: false, }), } }) From 4c13c650a58cea834e45cc0d2e5b0bb12886dcfe Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 26 Nov 2024 16:17:04 +0800 Subject: [PATCH 6/6] fix slt --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 73e84f371c35e..32dbd17b7b48a 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -23,6 +23,7 @@ user background_ddl user batch_enable_distributed_dml user batch_enable_lookup_join user batch_enable_sort_agg +user batch_expr_strict_mode user batch_parallelism user bypass_cluster_limits user bytea_output