Skip to content

Commit

Permalink
add more docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 21, 2023
1 parent 5402efb commit a24495a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
20 changes: 15 additions & 5 deletions src/expr/src/expr/wrapper/non_strict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,27 @@ use crate::error::Result;
use crate::expr::{Expression, ValueImpl};
use crate::ExprError;

/// Report an error during evaluation.
#[auto_impl(Arc)]
pub trait EvalErrorReport: Send + Sync {
/// Perform the error reporting.
///
/// Called when an error occurs during row-level evaluation of a non-strict expression,
/// that is, wrapped by [`NonStrict`].
fn report(&self, error: ExprError);
}

/// A dummy implementation that panics when called.
impl EvalErrorReport for ! {
fn report(&self, _error: ExprError) {
unreachable!()
}
}

/// A wrapper of [`Expression`] that evaluates in a non-strict way. Basically...
/// - When an error occurs during chunk-level evaluation, recompute in row-based execution and pad
/// with NULL for each failed row.
/// - Report all error occurred during row-level evaluation to the [`EvalErrorReport`].
pub struct NonStrict<E, R> {
inner: E,
report: R,
Expand All @@ -59,13 +69,12 @@ where
Self { inner, report }
}

/// Evaluate expression in row-based execution with `eval_row_infallible`.
async fn eval_chunk_infallible_by_row(&self, input: &DataChunk) -> ArrayRef {
// When eval failed, recompute in row-based execution
// and pad with NULL for each failed row.
let mut array_builder = self.return_type().create_array_builder(input.capacity());
for row in input.rows_with_holes() {
if let Some(row) = row {
let datum = self.eval_row_infallible(&row.into_owned_row()).await;
let datum = self.eval_row_infallible(&row.into_owned_row()).await; // TODO: use `Row` trait
array_builder.append(&datum);
} else {
array_builder.append_null();
Expand All @@ -74,12 +83,13 @@ where
array_builder.finish().into()
}

/// Evaluate expression on a single row, report error and return NULL if failed.
async fn eval_row_infallible(&self, input: &OwnedRow) -> Datum {
match self.inner.eval_row(input).await {
Ok(datum) => datum,
Err(error) => {
self.report.report(error);
None
None // NULL
}
}
}
Expand Down Expand Up @@ -115,6 +125,6 @@ where
}

fn eval_const(&self) -> Result<Datum> {
self.inner.eval_const() // TODO?
self.inner.eval_const() // do not handle error
}
}
3 changes: 2 additions & 1 deletion src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ impl ActorContext {
}

pub fn on_compute_error(&self, err: ExprError, identity: &str) {
tracing::error!("Compute error: {}, executor: {identity}", err);
tracing::error!(identity, %err, "failed to evaluate expression");

let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
let mut err_str = err.to_string();

Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub struct LocalStreamManager {
total_mem_val: Arc<TrAdder<i64>>,
}

/// Report expression evaluation errors to the actor context.
#[derive(Clone)]
pub struct ActorEvalErrorReport {
actor_context: ActorContextRef,
Expand All @@ -121,7 +122,7 @@ pub struct ExecutorParams {
pub env: StreamEnvironment,

/// Indices of primary keys
// TODO: directly use this for `ExecutorInfo`
// TODO: directly use it for `ExecutorInfo`
pub pk_indices: PkIndices,

/// Executor id, unique across all actors.
Expand All @@ -131,15 +132,15 @@ pub struct ExecutorParams {
pub operator_id: u64,

/// Information of the operator from plan node, like `StreamHashJoin { .. }`.
// TODO: use this for `identity`
// TODO: use it for `identity`
pub op_info: String,

/// The output schema of the executor.
// TODO: directly use this for `ExecutorInfo`
// TODO: directly use it for `ExecutorInfo`
pub schema: Schema,

/// The identity of the executor, like `HashJoin 1234ABCD`.
// TODO: directly use this for `ExecutorInfo`
// TODO: directly use it for `ExecutorInfo`
pub identity: String,

/// The input executor.
Expand Down

0 comments on commit a24495a

Please sign in to comment.