From 1c1354ca5de0dc817808a88ec55a81b9920f6b6c Mon Sep 17 00:00:00 2001 From: lmatz Date: Fri, 16 Jun 2023 15:36:00 +0800 Subject: [PATCH] chore: return a warning message when creating sink with order by (#10239) --- src/frontend/src/handler/create_sink.rs | 19 ++++++++++++++----- src/frontend/src/handler/explain.rs | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 894bf68c6daa6..8bc2d280c531e 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::rc::Rc; + use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId}; @@ -66,7 +68,7 @@ pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, -) -> Result<(PlanRef, SinkCatalog)> { +) -> Result<(Box, PlanRef, SinkCatalog)> { let db_name = session.database(); let (sink_schema_name, sink_table_name) = Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?; @@ -83,7 +85,7 @@ pub fn gen_sink_plan( let (dependent_relations, bound) = { let mut binder = Binder::new_for_stream(session); - let bound = binder.bind_query(*query)?; + let bound = binder.bind_query(*query.clone())?; (binder.included_relations(), bound) }; @@ -127,7 +129,7 @@ pub fn gen_sink_plan( dependent_relations.into_iter().collect_vec(), ); - Ok((sink_plan, sink_catalog)) + Ok((query, sink_plan, sink_catalog)) } pub async fn handle_create_sink( @@ -139,8 +141,15 @@ pub async fn handle_create_sink( session.check_relation_name_duplicated(stmt.sink_name.clone())?; let (sink, graph) = { - let context = OptimizerContext::from_handler_args(handle_args); - let (plan, sink) = gen_sink_plan(&session, context.into(), stmt)?; + let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); + let (query, plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?; + let has_order_by = !query.order_by.is_empty(); + if has_order_by { + context.warn_to_user( + r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."# + .to_string(), + ); + } let mut graph = build_graph(plan); graph.parallelism = session .config() diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index c300989fce156..05fd38ed77a85 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -122,7 +122,7 @@ async fn do_handle_explain( .map(|x| x.0), Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|x| x.0) + gen_sink_plan(&session, context.clone(), stmt).map(|x| x.1) } Statement::CreateIndex {