Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): add batch_expr_strict_mode to ignore expression error in batch query #19562

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions e2e_test/batch/basic/strict_mode.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v int);

statement ok
insert into t values(-1), (0), (1);

statement ok
SET batch_expr_strict_mode = false;

query I
SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v;
----
-1
NULL
1

# This plan consists of a BatchExchange.
query I
SELECT 1/v FROM t order by v;
----
-1
NULL
1

statement ok
SET batch_expr_strict_mode = DEFAULT;

statement error Division by zero
SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v;

statement error Division by zero
SELECT 1/v FROM t order by v;

statement ok
drop table t;
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ user background_ddl
user batch_enable_distributed_dml
user batch_enable_lookup_join
user batch_enable_sort_agg
user batch_expr_strict_mode
user batch_parallelism
user bypass_cluster_limits
user bytea_output
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ message Cardinality {
// Provide statement-local context, e.g. session info like time zone, for execution.
message ExprContext {
string time_zone = 1;
bool strict_mode = 2;
}

message AdditionalColumnKey {}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::array::ArrayImpl::Bool;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that build_from_prost will not be directly used anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still used quite a bit. Only Filter and Project have been changed to build_batch_expr_from_prost. Examples of the rest that still uses build_from_prost include

  • HopWindowExecutor: because the expression is derived from +/- interval, so it must be non-fallable
  • SortAggExecutor: same as above, but it's even simpler - there are only ValueRefs
  • UpdateExecutor: because I can't image why users need to ignore expressions here, so I just keep it unchanged.

use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -90,7 +90,7 @@ impl BoxedExecutorBuilder for FilterExecutor {
)?;

let expr_node = filter_node.get_search_condition()?;
let expr = build_from_prost(expr_node)?;
let expr = build_batch_expr_from_prost(expr_node)?;
Ok(Box::new(Self::new(
expr,
input,
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -89,7 +89,7 @@ impl BoxedExecutorBuilder for ProjectExecutor {
let project_exprs: Vec<_> = project_node
.get_select_list()
.iter()
.map(build_from_prost)
.map(build_batch_expr_from_prost)
.try_collect()?;

let fields = project_exprs
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl BatchManager {
TracingContext::none(),
ExprContext {
time_zone: "UTC".to_string(),
strict_mode: false,
},
)
.await
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ pub struct SessionConfig {
#[parameter(default = false, rename = "batch_enable_distributed_dml")]
batch_enable_distributed_dml: bool,

/// Evaluate expression in strict mode for batch queries.
/// If set to false, an expression failure will not cause an error but leave a null value
/// on the result set.
#[parameter(default = true)]
batch_expr_strict_mode: bool,

/// The max gap allowed to transform small range scan into multi point lookup.
#[parameter(default = 8)]
max_split_range_gap: i32,
Expand Down
17 changes: 17 additions & 0 deletions src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::iter::Peekable;

use itertools::Itertools;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::expr::LogReport;
use risingwave_pb::expr::expr_node::{PbType, RexNode};
use risingwave_pb::expr::ExprNode;

Expand All @@ -29,6 +30,7 @@ use super::NonStrictExpression;
use crate::expr::{
BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
};
use crate::expr_context::strict_mode;
use crate::sig::FUNCTION_REGISTRY;
use crate::{bail, Result};

Expand All @@ -48,6 +50,21 @@ pub fn build_non_strict_from_prost(
.map(NonStrictExpression)
}

/// Build a strict or non-strict expression according to expr context.
///
/// When strict mode is off, the expression will not fail but leave a null value as result.
///
/// Unlike [`build_non_strict_from_prost`], the returning value here can be either non-strict or
/// strict. Thus, the caller is supposed to handle potential errors under strict mode.
pub fn build_batch_expr_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
if strict_mode()? {
build_from_prost(prost)
} else {
// TODO(eric): report errors to users via psql notice
Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed())
}
}

/// Build an expression from protobuf with possibly some wrappers attached to each node.
struct ExprBuilder<R> {
/// The error reporting for non-strict mode.
Expand Down
23 changes: 21 additions & 2 deletions src/expr/core/src/expr_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ define_context! {
pub TIME_ZONE: String,
pub FRAGMENT_ID: u32,
pub VNODE_COUNT: usize,
pub STRICT_MODE: bool,
}

pub fn capture_expr_context() -> ExprResult<ExprContext> {
let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?;
Ok(ExprContext { time_zone })
let strict_mode = STRICT_MODE::try_with(|v| *v)?;
Ok(ExprContext {
time_zone,
strict_mode,
})
}

/// Get the vnode count from the context.
Expand All @@ -36,9 +41,23 @@ pub fn vnode_count() -> ExprResult<usize> {
VNODE_COUNT::try_with(|&x| x)
}

/// Get the strict mode from expr context
///
/// The return value depends on session variable. Default is true for batch query.
///
/// Conceptually, streaming always use non-strict mode. Our implementation doesn't read this value,
/// although it's set to false as a placeholder.
pub fn strict_mode() -> ExprResult<bool> {
STRICT_MODE::try_with(|&v| v)
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
where
Fut: Future,
{
TIME_ZONE::scope(expr_context.time_zone.to_owned(), future).await
TIME_ZONE::scope(
expr_context.time_zone.to_owned(),
STRICT_MODE::scope(expr_context.strict_mode, future),
)
.await
}
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ impl StageRunner {
async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> {
let expr_context = ExprContext {
time_zone: self.ctx.session().config().timezone().to_owned(),
strict_mode: self.ctx.session().config().batch_expr_strict_mode(),
};
// If root, we execute it locally.
if !self.is_root_stage() {
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl LocalQueryExecution {
self.batch_query_epoch,
self.shutdown_rx().clone(),
);

let executor = executor.build().await?;

#[for_await]
Expand Down Expand Up @@ -146,6 +147,7 @@ impl LocalQueryExecution {
let db_name = self.session.database().to_string();
let search_path = self.session.config().search_path();
let time_zone = self.session.config().timezone();
let strict_mode = self.session.config().batch_expr_strict_mode();
let timeout = self.timeout;
let meta_client = self.front_env.meta_client_ref();

Expand All @@ -166,7 +168,7 @@ impl LocalQueryExecution {
}
};

use risingwave_expr::expr_context::TIME_ZONE;
use risingwave_expr::expr_context::*;

use crate::expr::function_impl::context::{
AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER,
Expand All @@ -179,6 +181,7 @@ impl LocalQueryExecution {
let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed();
let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed();

if let Some(timeout) = timeout {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ mod tests {
mview_definition: "".to_string(),
expr_context: Some(PbExprContext {
time_zone: String::from("America/New_York"),
strict_mode: false,
}),
}
})
Expand Down Expand Up @@ -1798,6 +1799,7 @@ mod tests {
.map(VnodeBitmap::from),
expr_context: ExprContext::from(&PbExprContext {
time_zone: String::from("America/New_York"),
strict_mode: false,
}),
}
})
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl StreamContext {
PbExprContext {
// `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
strict_mode: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ async fn test_graph_builder() -> MetaResult<()> {
let graph = make_stream_graph();
let expr_context = ExprContext {
time_zone: graph.ctx.as_ref().unwrap().timezone.clone(),
strict_mode: false,
};
let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?;
let internal_tables = fragment_graph.incomplete_internal_tables();
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::task::barrier_test_utils::LocalBarrierTestEnv;
async fn test_merger_sum_aggr() {
let expr_context = ExprContext {
time_zone: String::from("UTC"),
strict_mode: false,
};

let barrier_test_env = LocalBarrierTestEnv::for_test().await;
Expand Down
Loading