diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index dec52aa6f8e22..283bc4847db53 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -508,7 +508,7 @@ impl TestCase { let mut ret = TestCaseResult::default(); let bound = { - let mut binder = Binder::new(&session); + let mut binder = Binder::new(&session, vec![]); match binder.bind(stmt.clone()) { Ok(bound) => bound, Err(err) => { diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 991961c3ae055..11526b0864253 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -210,11 +210,7 @@ impl Binder { } } - pub fn new(session: &SessionImpl) -> Binder { - Self::new_inner(session, false, vec![]) - } - - pub fn new_with_param_types(session: &SessionImpl, param_types: Vec) -> Binder { + pub fn new(session: &SessionImpl, param_types: Vec) -> Binder { Self::new_inner(session, false, param_types) } @@ -327,12 +323,12 @@ pub mod test_utils { #[cfg(test)] pub fn mock_binder() -> Binder { - Binder::new(&SessionImpl::mock()) + Binder::new(&SessionImpl::mock(), vec![]) } #[cfg(test)] pub fn mock_binder_with_param_types(param_types: Vec) -> Binder { - Binder::new_with_param_types(&SessionImpl::mock(), param_types) + Binder::new(&SessionImpl::mock(), param_types) } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b0a85f675b021..1869caec9427e 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -81,7 +81,7 @@ pub fn gen_sink_plan( let definition = context.normalized_sql().to_owned(); let (dependent_relations, bound) = { - let mut binder = Binder::new(session); + let mut binder = Binder::new(session, vec![]); let bound = binder.bind_query(*query)?; (binder.included_relations(), bound) }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index bf39b1dfe4cca..04d7c41e97b90 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -475,7 +475,7 @@ pub(super) fn bind_source_watermark( source_watermarks: Vec, column_catalogs: &[ColumnCatalog], ) -> Result> { - let mut binder = Binder::new(session); + let mut binder = Binder::new(session, vec![]); binder.bind_columns_to_context(name.clone(), column_catalogs.to_vec())?; let watermark_descs = source_watermarks diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 488dffe8729c7..68befde7e3a71 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -55,7 +55,7 @@ pub async fn handle_create_as( // Generate catalog descs from query let mut column_descs: Vec<_> = { - let mut binder = Binder::new(&session); + let mut binder = Binder::new(&session, vec![]); let bound = binder.bind(Statement::Query(query.clone()))?; if let BoundStatement::Query(query) = bound { // Create ColumnCatelog by Field diff --git a/src/frontend/src/handler/create_view.rs b/src/frontend/src/handler/create_view.rs index 53286b37cca00..d1f9118c90b11 100644 --- a/src/frontend/src/handler/create_view.rs +++ b/src/frontend/src/handler/create_view.rs @@ -49,7 +49,7 @@ pub async fn handle_create_view( schema, dependent_relations, .. - } = super::query::gen_batch_query_plan( + } = super::query::gen_batch_plan_by_statement( &session, context.into(), Statement::Query(Box::new(query.clone())), diff --git a/src/frontend/src/handler/describe.rs b/src/frontend/src/handler/describe.rs index 22c0e474826a0..afa7427ec9fd7 100644 --- a/src/frontend/src/handler/describe.rs +++ b/src/frontend/src/handler/describe.rs @@ -32,7 +32,7 @@ use crate::handler::HandlerArgs; pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Result { let session = handler_args.session; - let mut binder = Binder::new(&session); + let mut binder = Binder::new(&session, vec![]); let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; // For Source, it doesn't have table catalog so use get source to get column descs. let (columns, pk_columns, indices): (Vec, Vec, Vec>) = { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 74e9a474668ff..e8f3753d41822 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -26,7 +26,7 @@ use super::create_table::{ check_create_table_with_source, gen_create_table_plan, gen_create_table_plan_with_source, ColumnIdGenerator, }; -use super::query::gen_batch_query_plan; +use super::query::gen_batch_plan_by_statement; use super::RwPgResponse; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{Convention, Explain}; @@ -120,7 +120,7 @@ pub async fn handle_explain( .0 } - stmt => gen_batch_query_plan(&session, context.into(), stmt)?.plan, + stmt => gen_batch_plan_by_statement(&session, context.into(), stmt)?.plan, }; let ctx = plan.plan_base().ctx.clone(); diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index 6fd5843131f59..73844deeb6337 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -20,8 +20,8 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{CreateSink, Query, Statement}; +use super::query::BoundResult; use super::{handle, query, HandlerArgs, RwPgResponse}; -use crate::binder::BoundStatement; use crate::session::SessionImpl; /// Except for Query,Insert,Delete,Update statement, we store other statement as `PureStatement`. @@ -40,8 +40,7 @@ pub enum PrepareStatement { #[derive(Clone)] pub struct PreparedResult { pub statement: Statement, - pub bound_statement: BoundStatement, - pub param_types: Vec, + pub bound_result: BoundResult, } #[derive(Clone)] @@ -53,7 +52,7 @@ pub enum Portal { #[derive(Clone)] pub struct PortalResult { pub statement: Statement, - pub bound_statement: BoundStatement, + pub bound_result: BoundResult, pub result_formats: Vec, } @@ -119,15 +118,29 @@ pub fn handle_bind( match prepare_statement { PrepareStatement::Prepared(prepared_result) => { let PreparedResult { + bound_result, statement, - bound_statement, - .. } = prepared_result; - let bound_statement = bound_statement.bind_parameter(params, param_formats)?; + let BoundResult { + stmt_type, + must_dist, + bound, + param_types, + dependent_relations, + } = bound_result; + + let new_bound = bound.bind_parameter(params, param_formats)?; + let new_bound_result = BoundResult { + stmt_type, + must_dist, + param_types, + dependent_relations, + bound: new_bound, + }; Ok(Portal::Portal(PortalResult { - statement, - bound_statement, + bound_result: new_bound_result, result_formats, + statement, })) } PrepareStatement::PureStatement(stmt) => { diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 41c36994351d3..76ea2da3fad82 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; @@ -49,62 +50,120 @@ use crate::scheduler::{ use crate::session::SessionImpl; use crate::PlanRef; -fn must_run_in_distributed_mode(stmt: &Statement) -> Result { - fn is_insert_using_select(stmt: &Statement) -> bool { - fn has_select_query(set_expr: &SetExpr) -> bool { - match set_expr { - SetExpr::Select(_) => true, - SetExpr::Query(query) => has_select_query(&query.body), - SetExpr::SetOperation { left, right, .. } => { - has_select_query(left) || has_select_query(right) - } - SetExpr::Values(_) => false, - } - } +pub async fn handle_query( + handler_args: HandlerArgs, + stmt: Statement, + formats: Vec, +) -> Result { + let session = handler_args.session.clone(); - matches!( - stmt, - Statement::Insert {source, ..} if has_select_query(&source.body) - ) - } + let plan_fragmenter_result = { + let context = OptimizerContext::from_handler_args(handler_args); + let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?; + gen_batch_plan_fragmenter(&session, plan_result)? + }; + execute(session, plan_fragmenter_result, formats).await +} - let stmt_type = StatementType::infer_from_statement(stmt) +pub fn handle_parse( + handler_args: HandlerArgs, + statement: Statement, + specific_param_types: Vec, +) -> Result { + let session = handler_args.session; + let bound_result = gen_bound(&session, statement.clone(), specific_param_types)?; + + Ok(PrepareStatement::Prepared(PreparedResult { + statement, + bound_result, + })) +} + +pub async fn handle_execute( + handler_args: HandlerArgs, + portal: PortalResult, +) -> Result { + let PortalResult { + bound_result, + result_formats, + .. + } = portal; + + let session = handler_args.session.clone(); + let plan_fragmenter_result = { + let context = OptimizerContext::from_handler_args(handler_args); + let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?; + + gen_batch_plan_fragmenter(&session, plan_result)? + }; + execute(session, plan_fragmenter_result, result_formats).await +} + +pub fn gen_batch_plan_by_statement( + session: &SessionImpl, + context: OptimizerContextRef, + stmt: Statement, +) -> Result { + let bound_result = gen_bound(session, stmt, vec![])?; + gen_batch_query_plan(session, context, bound_result) +} + +#[derive(Clone)] +pub struct BoundResult { + pub(crate) stmt_type: StatementType, + pub(crate) must_dist: bool, + pub(crate) bound: BoundStatement, + pub(crate) param_types: Vec, + pub(crate) dependent_relations: HashSet, +} + +fn gen_bound( + session: &SessionImpl, + stmt: Statement, + specific_param_types: Vec, +) -> Result { + let stmt_type = StatementType::infer_from_statement(&stmt) .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?; + let must_dist = must_run_in_distributed_mode(&stmt)?; - Ok(matches!( + let mut binder = Binder::new(session, specific_param_types); + let bound = binder.bind(stmt)?; + + let check_items = resolve_privileges(&bound); + session.check_privileges(&check_items)?; + + Ok(BoundResult { stmt_type, - StatementType::UPDATE - | StatementType::DELETE - | StatementType::UPDATE_RETURNING - | StatementType::DELETE_RETURNING - ) | is_insert_using_select(stmt)) + must_dist, + bound, + param_types: binder.export_param_types()?, + dependent_relations: binder.included_relations(), + }) } pub struct BatchQueryPlanResult { pub(crate) plan: PlanRef, pub(crate) query_mode: QueryMode, pub(crate) schema: Schema, + pub(crate) stmt_type: StatementType, // Note that these relations are only resolved in the binding phase, and it may only be a // subset of the final one. i.e. the final one may contain more implicit dependencies on // indices. pub(crate) dependent_relations: Vec, } -pub fn gen_batch_query_plan( +fn gen_batch_query_plan( session: &SessionImpl, context: OptimizerContextRef, - stmt: Statement, + bind_result: BoundResult, ) -> Result { - let must_dist = must_run_in_distributed_mode(&stmt)?; - - let (dependent_relations, bound) = { - let mut binder = Binder::new(session); - let bound = binder.bind(stmt)?; - (binder.included_relations(), bound) - }; - - let check_items = resolve_privileges(&bound); - session.check_privileges(&check_items)?; + let BoundResult { + stmt_type, + must_dist, + bound, + dependent_relations, + .. + } = bind_result; let mut planner = Planner::new(context); @@ -143,10 +202,46 @@ pub fn gen_batch_query_plan( plan: physical, query_mode, schema, + stmt_type, dependent_relations: dependent_relations.into_iter().collect_vec(), }) } +fn must_run_in_distributed_mode(stmt: &Statement) -> Result { + fn is_insert_using_select(stmt: &Statement) -> bool { + fn has_select_query(set_expr: &SetExpr) -> bool { + match set_expr { + SetExpr::Select(_) => true, + SetExpr::Query(query) => has_select_query(&query.body), + SetExpr::SetOperation { left, right, .. } => { + has_select_query(left) || has_select_query(right) + } + SetExpr::Values(_) => false, + } + } + + matches!( + stmt, + Statement::Insert {source, ..} if has_select_query(&source.body) + ) + } + + let stmt_type = StatementType::infer_from_statement(stmt) + .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?; + + Ok(matches!( + stmt_type, + StatementType::UPDATE + | StatementType::DELETE + | StatementType::UPDATE_RETURNING + | StatementType::DELETE_RETURNING + ) | is_insert_using_select(stmt)) +} + +fn must_run_in_local_mode(batch_plan: PlanRef) -> bool { + SysTableVisitor::has_sys_table(batch_plan) +} + fn determine_query_mode(batch_plan: PlanRef) -> QueryMode { if ExecutionModeDecider::run_in_local_mode(batch_plan) { QueryMode::Local @@ -155,60 +250,77 @@ fn determine_query_mode(batch_plan: PlanRef) -> QueryMode { } } -fn must_run_in_local_mode(batch_plan: PlanRef) -> bool { - SysTableVisitor::has_sys_table(batch_plan) +struct BatchPlanFragmenterResult { + pub(crate) plan_fragmenter: BatchPlanFragmenter, + pub(crate) query_mode: QueryMode, + pub(crate) schema: Schema, + pub(crate) stmt_type: StatementType, + pub(crate) _dependent_relations: Vec, + pub(crate) notice: String, } -pub async fn handle_query( - handler_args: HandlerArgs, - stmt: Statement, +fn gen_batch_plan_fragmenter( + session: &SessionImpl, + plan_result: BatchQueryPlanResult, +) -> Result { + let BatchQueryPlanResult { + plan, + query_mode, + schema, + stmt_type, + dependent_relations, + } = plan_result; + + let context = plan.plan_base().ctx.clone(); + tracing::trace!( + "Generated query plan: {:?}, query_mode:{:?}", + plan.explain_to_string()?, + query_mode + ); + let plan_fragmenter = BatchPlanFragmenter::new( + session.env().worker_node_manager_ref(), + session.env().catalog_reader().clone(), + session.config().get_batch_parallelism(), + plan, + )?; + let mut notice = String::new(); + context.append_notice(&mut notice); + + Ok(BatchPlanFragmenterResult { + plan_fragmenter, + query_mode, + schema, + stmt_type, + _dependent_relations: dependent_relations, + notice, + }) +} + +async fn execute( + session: Arc, + plan_fragmenter_result: BatchPlanFragmenterResult, formats: Vec, ) -> Result { - let stmt_type = StatementType::infer_from_statement(&stmt) - .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?; - let session = handler_args.session.clone(); - let query_start_time = Instant::now(); - let only_checkpoint_visible = handler_args.session.config().only_checkpoint_visible(); - let mut notice = String::new(); + let BatchPlanFragmenterResult { + plan_fragmenter, + query_mode, + schema, + stmt_type, + notice, + .. + } = plan_fragmenter_result; - // Subblock to make sure PlanRef (an Rc) is dropped before `await` below. - let (plan_fragmenter, query_mode, output_schema) = { - let context = OptimizerContext::from_handler_args(handler_args); - let BatchQueryPlanResult { - plan, - query_mode, - schema, - .. - } = gen_batch_query_plan(&session, context.into(), stmt)?; - - let context = plan.plan_base().ctx.clone(); - tracing::trace!( - "Generated query plan: {:?}, query_mode:{:?}", - plan.explain_to_string()?, - query_mode - ); - let plan_fragmenter = BatchPlanFragmenter::new( - session.env().worker_node_manager_ref(), - session.env().catalog_reader().clone(), - session.config().get_batch_parallelism(), - plan, - )?; - context.append_notice(&mut notice); - (plan_fragmenter, query_mode, schema) - }; + let only_checkpoint_visible = session.config().only_checkpoint_visible(); + let query_start_time = Instant::now(); let query = plan_fragmenter.generate_complete_query().await?; tracing::trace!("Generated query after plan fragmenter: {:?}", &query); - let pg_descs = output_schema + let pg_descs = schema .fields() .iter() .map(to_pg_field) .collect::>(); - let column_types = output_schema - .fields() - .iter() - .map(|f| f.data_type()) - .collect_vec(); + let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec(); // Used in counting row count. let first_field_format = formats.first().copied().unwrap_or(Format::Text); @@ -334,7 +446,7 @@ pub async fn handle_query( )) } -pub async fn distribute_execute( +async fn distribute_execute( session: Arc, query: Query, pinned_snapshot: PinnedHummockSnapshot, @@ -348,7 +460,7 @@ pub async fn distribute_execute( } #[expect(clippy::unused_async)] -pub async fn local_execute( +async fn local_execute( session: Arc, query: Query, pinned_snapshot: PinnedHummockSnapshot, @@ -367,243 +479,3 @@ pub async fn local_execute( Ok(execution.stream_rows()) } - -// TODO: Following code have redundant code with `handle_query`, we may need to refactor them in -// future. -pub fn handle_parse( - handler_args: HandlerArgs, - statement: Statement, - specific_param_types: Vec, -) -> Result { - let session = handler_args.session; - let mut binder = Binder::new_with_param_types(&session, specific_param_types); - let bound_statement = binder.bind(statement.clone())?; - - let check_items = resolve_privileges(&bound_statement); - session.check_privileges(&check_items)?; - - let param_types = binder.export_param_types()?; - - Ok(PrepareStatement::Prepared(PreparedResult { - statement, - bound_statement, - param_types, - })) -} - -pub fn gen_batch_query_plan_for_bound( - session: &SessionImpl, - context: OptimizerContextRef, - stmt: Statement, - bound: BoundStatement, -) -> Result<(PlanRef, QueryMode, Schema)> { - let must_dist = must_run_in_distributed_mode(&stmt)?; - - let mut planner = Planner::new(context); - - let mut logical = planner.plan(bound)?; - let schema = logical.schema(); - let batch_plan = logical.gen_batch_plan()?; - - let must_local = must_run_in_local_mode(batch_plan.clone()); - - let query_mode = match (must_dist, must_local) { - (true, true) => { - return Err(ErrorCode::InternalError( - "the query is forced to both local and distributed mode by optimizer".to_owned(), - ) - .into()) - } - (true, false) => QueryMode::Distributed, - (false, true) => QueryMode::Local, - (false, false) => match session.config().get_query_mode() { - QueryMode::Auto => determine_query_mode(batch_plan.clone()), - QueryMode::Local => QueryMode::Local, - QueryMode::Distributed => QueryMode::Distributed, - }, - }; - - let physical = match query_mode { - QueryMode::Auto => unreachable!(), - QueryMode::Local => logical.gen_batch_local_plan(batch_plan)?, - QueryMode::Distributed => logical.gen_batch_distributed_plan(batch_plan)?, - }; - Ok((physical, query_mode, schema)) -} - -pub async fn handle_execute( - handler_args: HandlerArgs, - portal: PortalResult, -) -> Result { - let PortalResult { - statement, - bound_statement, - result_formats, - } = portal; - - let stmt_type = StatementType::infer_from_statement(&statement) - .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?; - let session = handler_args.session.clone(); - let query_start_time = Instant::now(); - let only_checkpoint_visible = handler_args.session.config().only_checkpoint_visible(); - let mut notice = String::new(); - - // Subblock to make sure PlanRef (an Rc) is dropped before `await` below. - let (plan_fragmenter, query_mode, output_schema) = { - let context = OptimizerContext::from_handler_args(handler_args); - - let (physical, query_mode, schema) = - gen_batch_query_plan_for_bound(&session, context.into(), statement, bound_statement)?; - - let context = physical.plan_base().ctx.clone(); - tracing::trace!( - "Generated query plan: {:?}, query_mode:{:?}", - physical.explain_to_string()?, - query_mode - ); - let plan_fragmenter = BatchPlanFragmenter::new( - session.env().worker_node_manager_ref(), - session.env().catalog_reader().clone(), - session.config().get_batch_parallelism(), - physical, - )?; - context.append_notice(&mut notice); - (plan_fragmenter, query_mode, schema) - }; - let query = plan_fragmenter.generate_complete_query().await?; - tracing::trace!("Generated query after plan fragmenter: {:?}", &query); - - let pg_descs = output_schema - .fields() - .iter() - .map(to_pg_field) - .collect::>(); - let column_types = output_schema - .fields() - .iter() - .map(|f| f.data_type()) - .collect_vec(); - - // Used in counting row count. - let first_field_format = result_formats.first().copied().unwrap_or(Format::Text); - - let mut row_stream = { - let query_epoch = session.config().get_query_epoch(); - let query_snapshot = if let Some(query_epoch) = query_epoch { - PinnedHummockSnapshot::Other(query_epoch) - } else { - // Acquire hummock snapshot for execution. - // TODO: if there's no table scan, we don't need to acquire snapshot. - let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); - let query_id = query.query_id().clone(); - let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; - PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, only_checkpoint_visible) - }; - match query_mode { - QueryMode::Auto => unreachable!(), - QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query, query_snapshot).await?, - column_types, - result_formats, - session.clone(), - )), - // Local mode do not support cancel tasks. - QueryMode::Distributed => { - PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query, query_snapshot).await?, - column_types, - result_formats, - session.clone(), - )) - } - } - }; - - let rows_count: Option = match stmt_type { - StatementType::SELECT - | StatementType::INSERT_RETURNING - | StatementType::DELETE_RETURNING - | StatementType::UPDATE_RETURNING => None, - - StatementType::INSERT | StatementType::DELETE | StatementType::UPDATE => { - let first_row_set = row_stream.next().await; - let first_row_set = match first_row_set { - None => { - return Err(RwError::from(ErrorCode::InternalError( - "no affected rows in output".to_string(), - ))) - } - Some(row) => { - row.map_err(|err| RwError::from(ErrorCode::InternalError(format!("{}", err))))? - } - }; - let affected_rows_str = first_row_set[0].values()[0] - .as_ref() - .expect("compute node should return affected rows in output"); - if let Format::Binary = first_field_format { - Some( - i64::from_sql(&postgres_types::Type::INT8, affected_rows_str) - .unwrap() - .try_into() - .expect("affected rows count large than i32"), - ) - } else { - Some( - String::from_utf8(affected_rows_str.to_vec()) - .unwrap() - .parse() - .unwrap_or_default(), - ) - } - } - _ => unreachable!(), - }; - - // We need to do some post work after the query is finished and before the `Complete` response - // it sent. This is achieved by the `callback` in `PgResponse`. - let callback = async move { - // Implicitly flush the writes. - if session.config().get_implicit_flush() && stmt_type.is_dml() { - do_flush(&session).await?; - } - - // update some metrics - match query_mode { - QueryMode::Auto => unreachable!(), - QueryMode::Local => { - session - .env() - .frontend_metrics - .latency_local_execution - .observe(query_start_time.elapsed().as_secs_f64()); - - session - .env() - .frontend_metrics - .query_counter_local_execution - .inc(); - } - QueryMode::Distributed => { - session - .env() - .query_manager() - .query_metrics - .query_latency - .observe(query_start_time.elapsed().as_secs_f64()); - - session - .env() - .query_manager() - .query_metrics - .completed_query_counter - .inc(); - } - } - - Ok(()) - }; - - Ok(PgResponse::new_for_stream_extra( - stmt_type, rows_count, row_stream, pg_descs, notice, callback, - )) -} diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index ee8281619b48c..f9cdd194eebb0 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -35,7 +35,7 @@ pub fn get_columns_from_table( session: &SessionImpl, table_name: ObjectName, ) -> Result> { - let mut binder = Binder::new(session); + let mut binder = Binder::new(session, vec![]); let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; let catalogs = match relation { Relation::Source(s) => s.catalog.columns, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 9abe51cdef25d..5b34d2ebdf48e 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -856,9 +856,9 @@ impl Session for SessionImpl { ) -> std::result::Result<(Vec, Vec), BoxedError> { Ok(match prepare_statement { PrepareStatement::Prepared(prepare_statement) => ( - prepare_statement.param_types, + prepare_statement.bound_result.param_types, infer( - Some(prepare_statement.bound_statement), + Some(prepare_statement.bound_result.bound), prepare_statement.statement, )?, ), @@ -871,7 +871,7 @@ impl Session for SessionImpl { portal: Portal, ) -> std::result::Result, BoxedError> { match portal { - Portal::Portal(portal) => Ok(infer(Some(portal.bound_statement), portal.statement)?), + Portal::Portal(portal) => Ok(infer(Some(portal.bound_result.bound), portal.statement)?), Portal::PureStatement(statement) => Ok(infer(None, statement)?), } } diff --git a/src/tests/sqlsmith/tests/frontend/mod.rs b/src/tests/sqlsmith/tests/frontend/mod.rs index 618b739d2c2ac..546c48679ceec 100644 --- a/src/tests/sqlsmith/tests/frontend/mod.rs +++ b/src/tests/sqlsmith/tests/frontend/mod.rs @@ -179,7 +179,7 @@ fn run_batch_query( context: OptimizerContextRef, stmt: Statement, ) -> Result<()> { - let mut binder = Binder::new(&session); + let mut binder = Binder::new(&session, vec![]); let bound = binder .bind(stmt) .map_err(|e| Failed::from(format!("Failed to bind:\nReason:\n{}", e)))?;