Skip to content

Commit

Permalink
refactor(flow): func spec api&use Error not EvalError in mfp (#3657)
Browse files Browse the repository at this point in the history
* refactor: func's specialization& use Error not EvalError

* docs: some pub item

* chore: typo

* docs: add comments for every pub item

* chore: per review

* chore: per reveiw&derive Copy

* chore: per review&test for binary fn spec

* docs: comment explain how binary func spec works

* chore: minor style change

* fix: Error not EvalError
  • Loading branch information
discord9 authored Apr 9, 2024
1 parent 2896e1f commit ea9367f
Show file tree
Hide file tree
Showing 21 changed files with 785 additions and 281 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde.workspace = true
servers.workspace = true
smallvec.workspace = true
snafu.workspace = true
strum.workspace = true
tokio.workspace = true
tonic.workspace = true

Expand Down
3 changes: 3 additions & 0 deletions src/flow/clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Whether to only check for missing documentation in items visible within the current crate. For example, pub(crate) items. (default: false)
# This is a config for clippy::missing_docs_in_private_items
missing-docs-in-crate-items = true
27 changes: 26 additions & 1 deletion src/flow/src/adapter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Error definition for flow module

use std::any::Any;

use common_macro::stack_trace_debug;
Expand All @@ -25,6 +27,7 @@ use snafu::{Location, Snafu};

use crate::expr::EvalError;

/// This error is used to represent all possible errors that can occur in the flow module.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
Expand Down Expand Up @@ -54,8 +57,25 @@ pub enum Error {

#[snafu(display("No protobuf type for value: {value}"))]
NoProtoType { value: Value, location: Location },

#[snafu(display("Not implement in flow: {reason}"))]
NotImplemented { reason: String, location: Location },

#[snafu(display("Flow plan error: {reason}"))]
Plan { reason: String, location: Location },

#[snafu(display("Unsupported temporal filter: {reason}"))]
UnsupportedTemporalFilter { reason: String, location: Location },

#[snafu(display("Datatypes error: {source} with extra message: {extra}"))]
Datatypes {
source: datatypes::Error,
extra: String,
location: Location,
},
}

/// Result type for flow module
pub type Result<T> = std::result::Result<T, Error>;

impl ErrorExt for Error {
Expand All @@ -64,8 +84,13 @@ impl ErrorExt for Error {
Self::Eval { .. } | &Self::JoinTask { .. } => StatusCode::Internal,
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. } => StatusCode::TableNotFound,
&Self::InvalidQuery { .. } => StatusCode::PlanQuery,
&Self::InvalidQuery { .. } | &Self::Plan { .. } | &Self::Datatypes { .. } => {
StatusCode::PlanQuery
}
Self::NoProtoType { .. } => StatusCode::Unexpected,
&Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
let arrange_handler_inner = ArrangeHandler::from(arrange);

// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp).context(EvalSnafu)?;
let mfp_plan = MfpPlan::create_from(mfp)?;
let now = self.compute_state.current_time_ref();

let err_collector = self.err_collector.clone();
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod id;
mod linear;
mod relation;
mod scalar;
mod signature;

pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
Expand Down
5 changes: 2 additions & 3 deletions src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Error handling for expression evaluation.

use std::any::Any;

use common_macro::stack_trace_debug;
Expand Down Expand Up @@ -59,9 +61,6 @@ pub enum EvalError {
#[snafu(display("Optimize error: {reason}"))]
Optimize { reason: String, location: Location },

#[snafu(display("Unsupported temporal filter: {reason}"))]
UnsupportedTemporalFilter { reason: String, location: Location },

#[snafu(display("Overflowed during evaluation"))]
Overflow { location: Location },
}
Loading

0 comments on commit ea9367f

Please sign in to comment.