Skip to content

Commit

Permalink
refactor(frontend): refactor query to reduce redundent code in extend…
Browse files Browse the repository at this point in the history
…ed mode (risingwavelabs#9023)
  • Loading branch information
ZENOTME authored Apr 7, 2023
1 parent 8e0bf7a commit 09e6a63
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 349 deletions.
2 changes: 1 addition & 1 deletion src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl TestCase {
let mut ret = TestCaseResult::default();

let bound = {
let mut binder = Binder::new(&session);
let mut binder = Binder::new(&session, vec![]);
match binder.bind(stmt.clone()) {
Ok(bound) => bound,
Err(err) => {
Expand Down
10 changes: 3 additions & 7 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,7 @@ impl Binder {
}
}

pub fn new(session: &SessionImpl) -> Binder {
Self::new_inner(session, false, vec![])
}

pub fn new_with_param_types(session: &SessionImpl, param_types: Vec<DataType>) -> Binder {
pub fn new(session: &SessionImpl, param_types: Vec<DataType>) -> Binder {
Self::new_inner(session, false, param_types)
}

Expand Down Expand Up @@ -327,12 +323,12 @@ pub mod test_utils {

#[cfg(test)]
pub fn mock_binder() -> Binder {
Binder::new(&SessionImpl::mock())
Binder::new(&SessionImpl::mock(), vec![])
}

#[cfg(test)]
pub fn mock_binder_with_param_types(param_types: Vec<DataType>) -> Binder {
Binder::new_with_param_types(&SessionImpl::mock(), param_types)
Binder::new(&SessionImpl::mock(), param_types)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn gen_sink_plan(
let definition = context.normalized_sql().to_owned();

let (dependent_relations, bound) = {
let mut binder = Binder::new(session);
let mut binder = Binder::new(session, vec![]);
let bound = binder.bind_query(*query)?;
(binder.included_relations(), bound)
};
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ pub(super) fn bind_source_watermark(
source_watermarks: Vec<SourceWatermark>,
column_catalogs: &[ColumnCatalog],
) -> Result<Vec<WatermarkDesc>> {
let mut binder = Binder::new(session);
let mut binder = Binder::new(session, vec![]);
binder.bind_columns_to_context(name.clone(), column_catalogs.to_vec())?;

let watermark_descs = source_watermarks
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn handle_create_as(

// Generate catalog descs from query
let mut column_descs: Vec<_> = {
let mut binder = Binder::new(&session);
let mut binder = Binder::new(&session, vec![]);
let bound = binder.bind(Statement::Query(query.clone()))?;
if let BoundStatement::Query(query) = bound {
// Create ColumnCatelog by Field
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn handle_create_view(
schema,
dependent_relations,
..
} = super::query::gen_batch_query_plan(
} = super::query::gen_batch_plan_by_statement(
&session,
context.into(),
Statement::Query(Box::new(query.clone())),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::handler::HandlerArgs;

pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Result<RwPgResponse> {
let session = handler_args.session;
let mut binder = Binder::new(&session);
let mut binder = Binder::new(&session, vec![]);
let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?;
// For Source, it doesn't have table catalog so use get source to get column descs.
let (columns, pk_columns, indices): (Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>) = {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::create_table::{
check_create_table_with_source, gen_create_table_plan, gen_create_table_plan_with_source,
ColumnIdGenerator,
};
use super::query::gen_batch_query_plan;
use super::query::gen_batch_plan_by_statement;
use super::RwPgResponse;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::{Convention, Explain};
Expand Down Expand Up @@ -120,7 +120,7 @@ pub async fn handle_explain(
.0
}

stmt => gen_batch_query_plan(&session, context.into(), stmt)?.plan,
stmt => gen_batch_plan_by_statement(&session, context.into(), stmt)?.plan,
};

let ctx = plan.plan_base().ctx.clone();
Expand Down
31 changes: 22 additions & 9 deletions src/frontend/src/handler/extended_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::{CreateSink, Query, Statement};

use super::query::BoundResult;
use super::{handle, query, HandlerArgs, RwPgResponse};
use crate::binder::BoundStatement;
use crate::session::SessionImpl;

/// Except for Query,Insert,Delete,Update statement, we store other statement as `PureStatement`.
Expand All @@ -40,8 +40,7 @@ pub enum PrepareStatement {
#[derive(Clone)]
pub struct PreparedResult {
pub statement: Statement,
pub bound_statement: BoundStatement,
pub param_types: Vec<DataType>,
pub bound_result: BoundResult,
}

#[derive(Clone)]
Expand All @@ -53,7 +52,7 @@ pub enum Portal {
#[derive(Clone)]
pub struct PortalResult {
pub statement: Statement,
pub bound_statement: BoundStatement,
pub bound_result: BoundResult,
pub result_formats: Vec<Format>,
}

Expand Down Expand Up @@ -119,15 +118,29 @@ pub fn handle_bind(
match prepare_statement {
PrepareStatement::Prepared(prepared_result) => {
let PreparedResult {
bound_result,
statement,
bound_statement,
..
} = prepared_result;
let bound_statement = bound_statement.bind_parameter(params, param_formats)?;
let BoundResult {
stmt_type,
must_dist,
bound,
param_types,
dependent_relations,
} = bound_result;

let new_bound = bound.bind_parameter(params, param_formats)?;
let new_bound_result = BoundResult {
stmt_type,
must_dist,
param_types,
dependent_relations,
bound: new_bound,
};
Ok(Portal::Portal(PortalResult {
statement,
bound_statement,
bound_result: new_bound_result,
result_formats,
statement,
}))
}
PrepareStatement::PureStatement(stmt) => {
Expand Down
Loading

0 comments on commit 09e6a63

Please sign in to comment.