Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: tune return msg #2506

Merged
merged 10 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ pub enum Error {
#[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
QueryAccessDenied { catalog: String, schema: String },

#[snafu(display("msg: {}", msg))]
#[snafu(display(""))]
Datafusion {
msg: String,
#[snafu(source)]
error: DataFusionError,
location: Location,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use query::QueryEngine;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use session::context::QueryContext;
use snafu::{ErrorCompat, ResultExt};
use snafu::ResultExt;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};

use crate::cli::cmd::ReplCommand;
Expand Down Expand Up @@ -148,7 +148,7 @@ impl Repl {
.await
.map_err(|e| {
let status_code = e.status_code();
let root_cause = e.iter_chain().last().unwrap();
let root_cause = e.output_msg();
println!("Error: {}({status_code}), {root_cause}", status_code as u32)
})
.is_ok()
Expand Down
37 changes: 34 additions & 3 deletions src/common/error/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use crate::status_code::StatusCode;

/// Extension to [`Error`](std::error::Error) in std.
pub trait ErrorExt: std::error::Error + StackError {
pub trait ErrorExt: StackError {
/// Map this error to [StatusCode].
fn status_code(&self) -> StatusCode {
StatusCode::Unknown
Expand All @@ -34,12 +34,43 @@ pub trait ErrorExt: std::error::Error + StackError {
/// Returns the error as [Any](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

fn output_msg(&self) -> String
where
Self: Sized,
{
let error = self.last();
if let Some(external_error) = error.source() {
let external_root = external_error.sources().last().unwrap();

if error.to_string().is_empty() {
format!("{external_root}")
} else {
format!("{error}: {external_root}")
}
} else {
format!("{error}")
}
}
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
}

pub trait StackError {
pub trait StackError: std::error::Error {
fn debug_fmt(&self, layer: usize, buf: &mut Vec<String>);

fn next(&self) -> Option<&dyn StackError>;

fn last(&self) -> &dyn StackError
where
Self: Sized,
{
let Some(mut result) = self.next() else {
return self;
};
while let Some(err) = result.next() {
result = err;
}
result
}
}

impl<T: ?Sized + StackError> StackError for Arc<T> {
Expand All @@ -52,7 +83,7 @@ impl<T: ?Sized + StackError> StackError for Arc<T> {
}
}

impl<T: ?Sized + StackError> StackError for Box<T> {
impl<T: StackError> StackError for Box<T> {
fn debug_fmt(&self, layer: usize, buf: &mut Vec<String>) {
self.as_ref().debug_fmt(layer, buf)
}
Expand Down
1 change: 1 addition & 0 deletions src/common/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(error_iter)]

pub mod ext;
pub mod format;
Expand Down
12 changes: 2 additions & 10 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,13 @@ mod tests {
use std::sync::Arc;

use async_trait::async_trait;
use common_error::ext::PlainError;
use common_error::ext::{ErrorExt, PlainError};
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::ObjectStore;
use snafu::ErrorCompat;

use super::*;
use crate::local::test_util;
Expand Down Expand Up @@ -943,14 +942,7 @@ mod tests {

// Run the runner and execute the procedure.
runner.run().await;
let err = meta
.state()
.error()
.unwrap()
.iter_chain()
.last()
.unwrap()
.to_string();
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
}
}
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to poll stream"))]
#[snafu(display(""))]
PollStream {
#[snafu(source)]
error: datafusion::error::DataFusionError,
Expand Down
32 changes: 8 additions & 24 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,7 @@ impl QueryEngine for DatafusionQueryEngine {
Ok(DataFrame::DataFusion(
self.state
.read_table(table)
.context(error::DatafusionSnafu {
msg: "Fail to create dataframe for table",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?,
))
Expand All @@ -295,9 +293,7 @@ impl LogicalOptimizer for DatafusionQueryEngine {
.state
.session_state()
.optimize(df_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize logical plan",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Expand All @@ -321,9 +317,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
let physical_plan = state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu {
msg: "Fail to create physical plan",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Expand Down Expand Up @@ -394,9 +388,7 @@ impl QueryExecutor for DatafusionQueryEngine {
assert_eq!(1, plan.output_partitioning().partition_count());
let df_stream = plan
.execute(0, task_ctx)
.context(error::DatafusionSnafu {
msg: "Failed to execute DataFusion merge exec",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let stream = RecordBatchStreamAdapter::try_new(df_stream)
Expand Down Expand Up @@ -447,35 +439,27 @@ pub async fn execute_show_with_filter(
let context = SessionContext::new();
context
.register_batch(table_name, record_batch.into_df_record_batch())
.context(error::DatafusionSnafu {
msg: "Fail to register a record batch as a table",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut dataframe = context
.sql(&format!("SELECT * FROM {table_name}"))
.await
.context(error::DatafusionSnafu {
msg: "Fail to execute a sql",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
if let Some(filter) = filter {
let filter = convert_filter_to_df_filter(filter)?;
dataframe = dataframe
.filter(filter)
.context(error::DatafusionSnafu {
msg: "Fail to filter",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
}
let df_batches = dataframe
.collect()
.await
.context(error::DatafusionSnafu {
msg: "Fail to collect the record batches",
})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut batches = Vec::with_capacity(df_batches.len());
Expand Down
3 changes: 1 addition & 2 deletions src/query/src/datafusion/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ use snafu::{Location, Snafu};
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum InnerError {
#[snafu(display("msg: {}", msg))]
#[snafu(display(""))]
Datafusion {
msg: &'static str,
#[snafu(source)]
error: DataFusionError,
location: Location,
Expand Down
5 changes: 2 additions & 3 deletions src/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub enum Error {
location: Location,
},

#[snafu(display("DataFusion error"))]
#[snafu(display(""))]
DataFusion {
#[snafu(source)]
error: DataFusionError,
Expand All @@ -140,9 +140,8 @@ pub enum Error {
source: sql::error::Error,
},

#[snafu(display("Cannot plan SQL: {}", sql))]
#[snafu(display(""))]
PlanSql {
sql: String,
#[snafu(source)]
error: DataFusionError,
location: Location,
Expand Down
11 changes: 3 additions & 8 deletions src/query/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,9 @@ impl DfLogicalPlanner {

let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);

let result = sql_to_rel.statement_to_plan(df_stmt).with_context(|_| {
let sql = if let Statement::Query(query) = stmt {
query.inner.to_string()
} else {
format!("{stmt:?}")
};
PlanSqlSnafu { sql }
})?;
let result = sql_to_rel
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?;
let plan = RangePlanRewriter::new(table_provider, context_provider)
.rewrite(result)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/script/src/python/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub enum Error {
error: ArrowError,
},

#[snafu(display("DataFusion error"))]
#[snafu(display(""))]
DataFusion {
location: SnafuLocation,
#[snafu(source)]
Expand Down
11 changes: 5 additions & 6 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_telemetry::logging;
use datatypes::prelude::ConcreteDataType;
use query::parser::PromQuery;
use serde_json::json;
use snafu::{ErrorCompat, Location, Snafu};
use snafu::{Location, Snafu};
use tonic::Code;

#[derive(Snafu)]
Expand Down Expand Up @@ -511,7 +511,6 @@ macro_rules! define_into_tonic_status {
impl From<$Error> for tonic::Status {
fn from(err: $Error) -> Self {
use common_error::{GREPTIME_ERROR_CODE, GREPTIME_ERROR_MSG};
use snafu::ErrorCompat;
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;

Expand All @@ -521,16 +520,16 @@ macro_rules! define_into_tonic_status {
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
headers.insert(GREPTIME_ERROR_CODE, HeaderValue::from(status_code as u32));
let root_error = err.iter_chain().last().unwrap();
let root_error = err.output_msg();

if let Ok(err_msg) = HeaderValue::from_bytes(root_error.to_string().as_bytes()) {
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.as_bytes()) {
let _ = headers.insert(GREPTIME_ERROR_MSG, err_msg);
}

let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(
$crate::error::status_to_tonic_code(status_code),
err.to_string(),
root_error,
metadata,
)
}
Expand All @@ -548,7 +547,7 @@ impl From<std::io::Error> for Error {

impl IntoResponse for Error {
fn into_response(self) -> Response {
let error_msg = self.iter_chain().last().unwrap().to_string();
let error_msg = self.output_msg();
let status = match self {
Error::InfluxdbLineProtocol { .. }
| Error::InfluxdbLinesWrite { .. }
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl PrometheusGatewayService {
Err(err) => {
return PrometheusJsonResponse::error(
err.status_code().to_string(),
err.to_string(),
err.output_msg(),
)
.0
}
Expand Down
7 changes: 2 additions & 5 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use futures::FutureExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::{ensure, ErrorCompat, ResultExt};
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tower::timeout::TimeoutLayer;
Expand Down Expand Up @@ -314,10 +314,7 @@ impl JsonResponse {
}
},
Err(e) => {
return Self::with_error(
e.iter_chain().last().unwrap().to_string(),
e.status_code(),
);
return Self::with_error(e.output_msg(), e.status_code());
}
}
}
Expand Down
Loading
Loading