From 00a22aa305b93eb2544fcfd67be86edec1ba794a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 21 Sep 2023 13:21:44 +0800 Subject: [PATCH] add more docs Signed-off-by: Bugen Zhao --- src/expr/Cargo.toml | 2 +- src/expr/src/expr/wrapper/non_strict.rs | 20 +++++++++++++++----- src/stream/src/executor/actor.rs | 3 ++- src/stream/src/task/stream_manager.rs | 9 +++++---- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 0a213a170c799..a11b80ad763f6 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -21,7 +21,7 @@ arrow-array = { workspace = true } arrow-schema = { workspace = true } async-trait = "0.1" auto_enums = "0.8" -auto_impl = "1.1.0" +auto_impl = "1" await-tree = { workspace = true } cfg-or-panic = "0.2" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } diff --git a/src/expr/src/expr/wrapper/non_strict.rs b/src/expr/src/expr/wrapper/non_strict.rs index 32f3b8556b9d7..19ead754a17f1 100644 --- a/src/expr/src/expr/wrapper/non_strict.rs +++ b/src/expr/src/expr/wrapper/non_strict.rs @@ -22,17 +22,27 @@ use crate::error::Result; use crate::expr::{Expression, ValueImpl}; use crate::ExprError; +/// Report an error during evaluation. #[auto_impl(Arc)] pub trait EvalErrorReport: Send + Sync { + /// Perform the error reporting. + /// + /// Called when an error occurs during row-level evaluation of a non-strict expression, + /// that is, wrapped by [`NonStrict`]. fn report(&self, error: ExprError); } +/// A dummy implementation that panics when called. impl EvalErrorReport for ! { fn report(&self, _error: ExprError) { unreachable!() } } +/// A wrapper of [`Expression`] that evaluates in a non-strict way. Basically... +/// - When an error occurs during chunk-level evaluation, recompute in row-based execution and pad +/// with NULL for each failed row. +/// - Report all error occurred during row-level evaluation to the [`EvalErrorReport`]. pub struct NonStrict { inner: E, report: R, @@ -59,13 +69,12 @@ where Self { inner, report } } + /// Evaluate expression in row-based execution with `eval_row_infallible`. async fn eval_chunk_infallible_by_row(&self, input: &DataChunk) -> ArrayRef { - // When eval failed, recompute in row-based execution - // and pad with NULL for each failed row. let mut array_builder = self.return_type().create_array_builder(input.capacity()); for row in input.rows_with_holes() { if let Some(row) = row { - let datum = self.eval_row_infallible(&row.into_owned_row()).await; + let datum = self.eval_row_infallible(&row.into_owned_row()).await; // TODO: use `Row` trait array_builder.append(&datum); } else { array_builder.append_null(); @@ -74,12 +83,13 @@ where array_builder.finish().into() } + /// Evaluate expression on a single row, report error and return NULL if failed. async fn eval_row_infallible(&self, input: &OwnedRow) -> Datum { match self.inner.eval_row(input).await { Ok(datum) => datum, Err(error) => { self.report.report(error); - None + None // NULL } } } @@ -115,6 +125,6 @@ where } fn eval_const(&self) -> Result { - self.inner.eval_const() // TODO? + self.inner.eval_const() // do not handle error } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 17f874acb0fc6..7a3c951292086 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -78,7 +78,8 @@ impl ActorContext { } pub fn on_compute_error(&self, err: ExprError, identity: &str) { - tracing::error!("Compute error: {}, executor: {identity}", err); + tracing::error!(identity, %err, "failed to evaluate expression"); + let executor_name = identity.split(' ').next().unwrap_or("name_not_found"); let mut err_str = err.to_string(); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 812949205becc..f7f2ce2f56d14 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -105,6 +105,7 @@ pub struct LocalStreamManager { total_mem_val: Arc>, } +/// Report expression evaluation errors to the actor context. #[derive(Clone)] pub struct ActorEvalErrorReport { actor_context: ActorContextRef, @@ -121,7 +122,7 @@ pub struct ExecutorParams { pub env: StreamEnvironment, /// Indices of primary keys - // TODO: directly use this for `ExecutorInfo` + // TODO: directly use it for `ExecutorInfo` pub pk_indices: PkIndices, /// Executor id, unique across all actors. @@ -131,15 +132,15 @@ pub struct ExecutorParams { pub operator_id: u64, /// Information of the operator from plan node, like `StreamHashJoin { .. }`. - // TODO: use this for `identity` + // TODO: use it for `identity` pub op_info: String, /// The output schema of the executor. - // TODO: directly use this for `ExecutorInfo` + // TODO: directly use it for `ExecutorInfo` pub schema: Schema, /// The identity of the executor, like `HashJoin 1234ABCD`. - // TODO: directly use this for `ExecutorInfo` + // TODO: directly use it for `ExecutorInfo` pub identity: String, /// The input executor.