Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: query handler #951

Closed
wants to merge 11 commits into from
10 changes: 10 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
[target.aarch64-unknown-linux-gnu]
linker = "aarch64-linux-gnu-gcc"

[build]
rustflags = [
# lints
# TODO: use lint configuration in cargo https://github.com/rust-lang/cargo/issues/5034
"-Wclippy::print_stdout",
"-Wclippy::print_stderr",
# false positive: https://github.com/rust-lang/rust/issues/51443#issuecomment-1374847313
"-Awhere_clauses_object_safety",
]
2 changes: 1 addition & 1 deletion .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ jobs:
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Run cargo clippy
run: cargo clippy --workspace --all-targets -- -D warnings -D clippy::print_stdout -D clippy::print_stderr
run: cargo clippy --workspace --all-targets -- -D warnings

coverage:
if: github.event.pull_request.draft == false
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ check: ## Cargo check all the targets.

.PHONY: clippy
clippy: ## Check clippy rules.
cargo clippy --workspace --all-targets -- -D warnings -D clippy::print_stdout -D clippy::print_stderr
cargo clippy --workspace --all-targets -- -D warnings

.PHONY: fmt-check
fmt-check: ## Check code format.
Expand Down
7 changes: 7 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ pub enum Error {
source: catalog::error::Error,
},

#[snafu(display("Failed to find catalog, source: {}", source))]
FindCatalog {
#[snafu(backtrace)]
source: servers::error::Error,
},

#[snafu(display("Failed to find table {} from catalog, source: {}", table_name, source))]
FindTable {
table_name: String,
Expand Down Expand Up @@ -341,6 +347,7 @@ impl ErrorExt for Error {
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
Error::DropTable { source, .. } => source.status_code(),
Error::FindCatalog { source } => source.status_code(),

Error::Insert { source, .. } => source.status_code(),

Expand Down
5 changes: 3 additions & 2 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, InsertRequest};
use async_trait::async_trait;
use common_query::Output;
use query::parser::QueryLanguageParser;
use query::parser::{QueryLanguage, QueryLanguageParser};
use query::plan::LogicalPlan;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -52,7 +52,8 @@ impl Instance {
async fn handle_query(&self, query: Query, ctx: QueryContextRef) -> Result<Output> {
Ok(match query {
Query::Sql(sql) => {
let stmt = QueryLanguageParser::parse_sql(&sql).context(ExecuteSqlSnafu)?;
let stmt =
QueryLanguageParser::parse(QueryLanguage::Sql(sql)).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, ctx).await?
}
Query::LogicalPlan(plan) => self.execute_logical(plan).await?,
Expand Down
74 changes: 34 additions & 40 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use datatypes::schema::Schema;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::parser::{QueryLanguage, QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::sql::QueryHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::{CreateDatabaseRequest, DropTableRequest};

use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
use crate::error::{
self, BumpTableIdSnafu, ExecuteSqlSnafu, FindCatalogSnafu, Result, TableIdProviderNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metric;
use crate::sql::SqlRequest;
Expand Down Expand Up @@ -149,7 +151,8 @@ impl Instance {
QueryStatement::Sql(Statement::Use(ref schema)) => {
let catalog = &query_ctx.current_catalog();
ensure!(
self.is_valid_schema(catalog, schema)?,
self.is_valid_schema(catalog, schema)
.context(FindCatalogSnafu)?,
error::DatabaseNotFoundSnafu { catalog, schema }
);

Expand All @@ -161,12 +164,14 @@ impl Instance {
}

pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = QueryLanguageParser::parse_sql(sql).context(ExecuteSqlSnafu)?;
let stmt = QueryLanguageParser::parse(QueryLanguage::Sql(sql.to_owned()))
.context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}

pub async fn execute_promql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = QueryLanguageParser::parse_promql(sql).context(ExecuteSqlSnafu)?;
let stmt = QueryLanguageParser::parse(QueryLanguage::Promql(sql.to_owned()))
.context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}
Expand Down Expand Up @@ -203,53 +208,42 @@ pub fn table_idents_to_full_name(
}

#[async_trait]
impl SqlQueryHandler for Instance {
type Error = error::Error;

async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
impl QueryHandler for Instance {
async fn statement_query(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
// we assume sql string has only 1 statement in datanode
let result = self.execute_sql(query, query_ctx).await;
vec![result]
self.execute_stmt(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQueryStatementSnafu)
}

async fn do_promql_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let result = self.execute_promql(query, query_ctx).await;
vec![result]
fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.map(|s| s.is_some())
.map_err(BoxedError::new)
.context(server_error::CheckDatabaseValiditySnafu)
}

async fn do_statement_query(
fn describe(
&self,
stmt: Statement,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_stmt(QueryStatement::Sql(stmt), query_ctx)
.await
}

fn do_describe(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Option<Schema>> {
if let Statement::Query(_) = stmt {
) -> server_error::Result<Option<Schema>> {
if let QueryStatement::Sql(Statement::Query(_)) = stmt {
self.query_engine
.describe(QueryStatement::Sql(stmt), query_ctx)
.describe(stmt, query_ctx)
.map(Some)
.context(error::DescribeStatementSnafu)
.map_err(BoxedError::new)
.context(server_error::DescribeStatementSnafu)
} else {
Ok(None)
}
}

fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalog_manager
.schema(catalog, schema)
.map(|s| s.is_some())
.context(error::CatalogSnafu)
}
}

#[async_trait]
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use servers::error::Error::InternalIo;
use servers::grpc::GrpcServer;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::query_handler::sql::ServerQueryHandlerAdaptor;
use servers::server::Server;
use servers::tls::TlsOption;
use servers::Mode;
Expand Down Expand Up @@ -70,7 +70,7 @@ impl Services {
Some(MysqlServer::create_server(
mysql_io_runtime,
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
ServerQueryHandlerAdaptor::arc(instance.clone()),
None,
)),
Arc::new(MysqlSpawnConfig::new(
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::parser::{QueryLanguage, QueryLanguageParser, QueryStatement};
use query::QueryEngineFactory;
use sql::statements::statement::Statement;
use storage::config::EngineConfig as StorageEngineConfig;
Expand Down Expand Up @@ -241,7 +241,7 @@ mod tests {
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(table_engine, catalog_list.clone(), query_engine.clone());

let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() {
let stmt = match QueryLanguageParser::parse(QueryLanguage::Sql(sql.to_owned())).unwrap() {
QueryStatement::Sql(Statement::Insert(i)) => i,
_ => {
unreachable!()
Expand Down
24 changes: 19 additions & 5 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ pub enum Error {
source: sql::error::Error,
},

#[snafu(display("Failed to parse query, source: {}", source))]
ParseQuery {
#[snafu(backtrace)]
source: query::error::Error,
},

#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
Expand Down Expand Up @@ -338,13 +344,19 @@ pub enum Error {
},

// TODO(ruihang): merge all query execution error kinds
#[snafu(display("failed to execute PromQL query {}, source: {}", query, source))]
#[snafu(display("Failed to execute PromQL query {}, source: {}", query, source))]
ExecutePromql {
query: String,
#[snafu(backtrace)]
source: servers::error::Error,
},

#[snafu(display("Failed to execute query statement, source: {}", source))]
ExecuteQueryStatement {
#[snafu(backtrace)]
source: BoxedError,
},

#[snafu(display("Failed to describe schema for given statement, source: {}", source))]
DescribeStatement {
#[snafu(backtrace)]
Expand Down Expand Up @@ -413,9 +425,9 @@ impl ErrorExt for Error {
| Error::FindNewColumnsOnInsertion { source } => source.status_code(),

Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
Error::ExecuteStatement { source, .. } | Error::DescribeStatement { source } => {
source.status_code()
}
Error::ExecuteStatement { source, .. }
| Error::ParseQuery { source }
| Error::DescribeStatement { source } => source.status_code(),
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,
Expand All @@ -424,7 +436,9 @@ impl ErrorExt for Error {
Error::InvokeDatanode { source } => source.status_code(),
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
Error::External { source } => source.status_code(),
Error::External { source } | Error::ExecuteQueryStatement { source } => {
source.status_code()
}
Error::DeserializePartition { source, .. } | Error::FindTableRoute { source, .. } => {
source.status_code()
}
Expand Down
Loading