From e997ac7946fe7ee1b9975b6a4b1b69ca86722931 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 17:10:11 +0800 Subject: [PATCH 01/13] add result on gen_create_mv_plan --- rust/frontend/src/optimizer/mod.rs | 3 ++- rust/frontend/src/optimizer/plan_node/stream_materialize.rs | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/frontend/src/optimizer/mod.rs b/rust/frontend/src/optimizer/mod.rs index ac995cbcd7c6f..ac278f697ea4b 100644 --- a/rust/frontend/src/optimizer/mod.rs +++ b/rust/frontend/src/optimizer/mod.rs @@ -25,6 +25,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools as _; use property::{Distribution, Order}; use risingwave_common::catalog::Schema; +use risingwave_common::error::Result; use self::heuristic::{ApplyOrder, HeuristicOptimizer}; use self::plan_node::{LogicalProject, StreamMaterialize}; @@ -170,7 +171,7 @@ impl PlanRoot { /// /// The `MaterializeExecutor` won't be generated at this stage, and will be attached in /// `gen_create_mv_plan`. - pub fn gen_create_mv_plan(&mut self, mv_name: String) -> StreamMaterialize { + pub fn gen_create_mv_plan(&mut self, mv_name: String) -> Result { let stream_plan = match self.plan.convention() { Convention::Logical => { let plan = self.gen_optimized_logical_plan(); diff --git a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs index e202ca4ef51fc..1d2ecf284bd66 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -17,6 +17,7 @@ use std::fmt; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, OrderedColumnDesc, Schema, TableId}; +use risingwave_common::error::Result; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::expr::InputRefExpr; use risingwave_pb::plan::ColumnOrder; @@ -64,7 +65,7 @@ impl StreamMaterialize { mv_name: String, user_order_by: Order, user_cols: FixedBitSet, - ) -> Self { + ) -> Result { let base = Self::derive_plan_base(&input); let schema = &base.schema; let pk_indices = &base.pk_indices; @@ -115,7 +116,7 @@ impl StreamMaterialize { pk_desc, }; - Self { base, input, table } + Ok(Self { base, input, table }) } /// Get a reference to the stream materialize's table. From e3fddc8f0a04c6f77a8da8760659c786294f2f80 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 17:10:54 +0800 Subject: [PATCH 02/13] add result on gen_create_mv_plan --- rust/frontend/src/handler/create_mv.rs | 2 +- rust/frontend/src/handler/create_table.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/frontend/src/handler/create_mv.rs b/rust/frontend/src/handler/create_mv.rs index f6741d3f7334b..a8a5bbd834528 100644 --- a/rust/frontend/src/handler/create_mv.rs +++ b/rust/frontend/src/handler/create_mv.rs @@ -47,7 +47,7 @@ pub fn gen_create_mv_plan( let mut plan_root = Planner::new(context).plan_query(bound)?; plan_root.set_required_dist(Distribution::any().clone()); - let materialize = plan_root.gen_create_mv_plan(table_name); + let materialize = plan_root.gen_create_mv_plan(table_name)?; let table = materialize.table().to_prost(schema_id, database_id); let plan: PlanRef = materialize.into(); diff --git a/rust/frontend/src/handler/create_table.rs b/rust/frontend/src/handler/create_table.rs index 2a56c98078d0f..196246fc7bb01 100644 --- a/rust/frontend/src/handler/create_table.rs +++ b/rust/frontend/src/handler/create_table.rs @@ -106,7 +106,7 @@ pub fn gen_create_table_plan( Order::any().clone(), required_cols, ) - .gen_create_mv_plan(table_name) + .gen_create_mv_plan(table_name)? }; let table = materialize.table().to_prost(schema_id, database_id); From 569a2255af7c4973d2893c7735a239f315c1517c Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 17:42:08 +0800 Subject: [PATCH 03/13] refactor row id --- rust/frontend/src/catalog/mod.rs | 8 ++++++++ rust/frontend/src/catalog/table_catalog.rs | 8 ++++---- rust/frontend/src/handler/create_source.rs | 7 ++++--- rust/frontend/src/handler/create_table.rs | 8 ++++---- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/rust/frontend/src/catalog/mod.rs b/rust/frontend/src/catalog/mod.rs index ec802bbac4334..12b9dfd44b7cb 100644 --- a/rust/frontend/src/catalog/mod.rs +++ b/rust/frontend/src/catalog/mod.rs @@ -30,6 +30,14 @@ pub(crate) type SchemaId = u32; pub(crate) type TableId = risingwave_common::catalog::TableId; pub(crate) type ColumnId = risingwave_common::catalog::ColumnId; +pub const ROWID_PREFIX: &str = "_row_id"; + +pub fn gen_row_id_column_name(idx: Option) -> String { + match idx { + Some(idx) => ROWID_PREFIX.to_string() + "#" + &idx.to_string(), + None => ROWID_PREFIX.to_string(), + } +} #[derive(Error, Debug)] pub enum CatalogError { #[error("{0} not found: {1}")] diff --git a/rust/frontend/src/catalog/table_catalog.rs b/rust/frontend/src/catalog/table_catalog.rs index 1749038e933bb..351edbd1e661b 100644 --- a/rust/frontend/src/catalog/table_catalog.rs +++ b/rust/frontend/src/catalog/table_catalog.rs @@ -159,8 +159,8 @@ mod tests { use risingwave_pb::plan::{ColumnCatalog as ProstColumnCatalog, ColumnDesc as ProstColumnDesc}; use crate::catalog::column_catalog::ColumnCatalog; + use crate::catalog::gen_row_id_column_name; use crate::catalog::table_catalog::TableCatalog; - use crate::handler::create_table::ROWID_NAME; #[test] fn test_into_table_catalog() { @@ -173,7 +173,7 @@ mod tests { ProstColumnCatalog { column_desc: Some(ProstColumnDesc { column_id: 0, - name: ROWID_NAME.to_string(), + name: gen_row_id_column_name(None).to_string(), field_descs: vec![], column_type: Some(DataType::Int32.to_protobuf()), type_name: String::new(), @@ -220,7 +220,7 @@ mod tests { column_desc: ColumnDesc { data_type: DataType::Int32, column_id: ColumnId::new(0), - name: ROWID_NAME.to_string(), + name: gen_row_id_column_name(None).to_string(), field_descs: vec![], type_name: String::new() }, @@ -258,7 +258,7 @@ mod tests { column_desc: ColumnDesc { data_type: DataType::Int32, column_id: ColumnId::new(0), - name: ROWID_NAME.to_string(), + name: gen_row_id_column_name(None).to_string(), field_descs: vec![], type_name: String::new() }, diff --git a/rust/frontend/src/handler/create_source.rs b/rust/frontend/src/handler/create_source.rs index b56e2c3ca13b7..b33f914c45f11 100644 --- a/rust/frontend/src/handler/create_source.rs +++ b/rust/frontend/src/handler/create_source.rs @@ -26,7 +26,7 @@ use risingwave_source::ProtobufParser; use risingwave_sqlparser::ast::{CreateSourceStatement, ProtobufSchema, SourceSchema}; use crate::binder::expr::bind_data_type; -use crate::handler::create_table::ROWID_NAME; +use crate::catalog::gen_row_id_column_name; use crate::session::OptimizerContext; fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result> { @@ -55,7 +55,7 @@ pub(super) async fn handle_create_source( let mut column_catalogs = vec![ColumnCatalog { column_desc: Some(ColumnDesc { column_id: 0, - name: ROWID_NAME.to_string(), + name: gen_row_id_column_name(None).to_string(), column_type: Some(DataType::Int32.to_protobuf()), field_descs: vec![], type_name: "".to_string(), @@ -142,6 +142,7 @@ mod tests { use super::*; use crate::catalog::column_catalog::ColumnCatalog; + use crate::catalog::gen_row_id_column_name; use crate::test_utils::LocalFrontend; /// Returns the file. @@ -221,7 +222,7 @@ mod tests { fields: vec![DataType::Varchar, DataType::Varchar].into(), }; let expected_columns = maplit::hashmap! { - ROWID_NAME => DataType::Int32, + gen_row_id_column_name(None) => DataType::Int32, "id" => DataType::Int32, "country.zipcode" => DataType::Varchar, "zipcode" => DataType::Int64, diff --git a/rust/frontend/src/handler/create_table.rs b/rust/frontend/src/handler/create_table.rs index 196246fc7bb01..2f083e6434dbb 100644 --- a/rust/frontend/src/handler/create_table.rs +++ b/rust/frontend/src/handler/create_table.rs @@ -25,13 +25,12 @@ use risingwave_sqlparser::ast::{ColumnDef, ObjectName}; use crate::binder::expr::bind_data_type; use crate::binder::Binder; +use crate::catalog::gen_row_id_column_name; use crate::optimizer::plan_node::StreamSource; use crate::optimizer::property::{Distribution, Order}; use crate::optimizer::{PlanRef, PlanRoot}; use crate::session::{OptimizerContext, OptimizerContextRef, SessionImpl}; -pub const ROWID_NAME: &str = "_row_id"; - pub fn gen_create_table_plan( session: &SessionImpl, context: OptimizerContextRef, @@ -51,7 +50,7 @@ pub fn gen_create_table_plan( column_descs.push(ColumnDesc { data_type: DataType::Int64, column_id: ColumnId::new(0), - name: ROWID_NAME.to_string(), + name: gen_row_id_column_name(None).to_string(), field_descs: vec![], type_name: "".to_string(), }); @@ -155,6 +154,7 @@ mod tests { use risingwave_common::types::DataType; use super::*; + use crate::catalog::gen_row_id_column_name; use crate::test_utils::LocalFrontend; #[tokio::test] @@ -189,7 +189,7 @@ mod tests { .collect::>(); let expected_columns = maplit::hashmap! { - ROWID_NAME => DataType::Int64, + gen_row_id_column_name(None) => DataType::Int64, "v1" => DataType::Int16, "v2" => DataType::Int32, "v3" => DataType::Int64, From dbc2295b253903c6a65eff93149db24cecc9c0f0 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 18:27:19 +0800 Subject: [PATCH 04/13] refactor row id --- rust/frontend/src/handler/create_source.rs | 3 ++- rust/frontend/src/handler/create_table.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/frontend/src/handler/create_source.rs b/rust/frontend/src/handler/create_source.rs index b33f914c45f11..af98332ebf695 100644 --- a/rust/frontend/src/handler/create_source.rs +++ b/rust/frontend/src/handler/create_source.rs @@ -221,8 +221,9 @@ mod tests { let city_type = DataType::Struct { fields: vec![DataType::Varchar, DataType::Varchar].into(), }; + let row_id_col_name = gen_row_id_column_name(None); let expected_columns = maplit::hashmap! { - gen_row_id_column_name(None) => DataType::Int32, + row_id_col_name.as_str() => DataType::Int32, "id" => DataType::Int32, "country.zipcode" => DataType::Varchar, "zipcode" => DataType::Int64, diff --git a/rust/frontend/src/handler/create_table.rs b/rust/frontend/src/handler/create_table.rs index 2f083e6434dbb..88ae19552c2e6 100644 --- a/rust/frontend/src/handler/create_table.rs +++ b/rust/frontend/src/handler/create_table.rs @@ -188,8 +188,9 @@ mod tests { .map(|col| (col.name(), col.data_type().clone())) .collect::>(); + let row_id_col_name = gen_row_id_column_name(None); let expected_columns = maplit::hashmap! { - gen_row_id_column_name(None) => DataType::Int64, + row_id_col_name.as_str() => DataType::Int64, "v1" => DataType::Int16, "v2" => DataType::Int32, "v3" => DataType::Int64, From 183fda4c561c776fa6113142cbe324e0256634a1 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 20:06:35 +0800 Subject: [PATCH 05/13] Materialize derive schema --- rust/frontend/src/catalog/mod.rs | 5 ++ .../optimizer/plan_node/stream_materialize.rs | 53 +++++++++++++++---- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/rust/frontend/src/catalog/mod.rs b/rust/frontend/src/catalog/mod.rs index 12b9dfd44b7cb..2454f881ea6e9 100644 --- a/rust/frontend/src/catalog/mod.rs +++ b/rust/frontend/src/catalog/mod.rs @@ -38,6 +38,11 @@ pub fn gen_row_id_column_name(idx: Option) -> String { None => ROWID_PREFIX.to_string(), } } + +pub fn is_row_id_column_name(name: &str) -> bool { + name.starts_with(ROWID_PREFIX) +} + #[derive(Error, Debug)] pub enum CatalogError { #[error("{0} not found: {1}")] diff --git a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs index 1d2ecf284bd66..591e7454e4962 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::fmt; use fixedbitset::FixedBitSet; use itertools::Itertools; -use risingwave_common::catalog::{ColumnDesc, OrderedColumnDesc, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, Field, OrderedColumnDesc, Schema, TableId}; +use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::Result; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::expr::InputRefExpr; @@ -26,7 +28,7 @@ use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use super::{PlanRef, PlanTreeNodeUnary, ToStreamProst}; use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::TableCatalog; -use crate::catalog::ColumnId; +use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ColumnId}; use crate::optimizer::plan_node::{PlanBase, PlanNode}; use crate::optimizer::property::{Order, WithSchema}; @@ -40,22 +42,55 @@ pub struct StreamMaterialize { } impl StreamMaterialize { - fn derive_plan_base(input: &PlanRef) -> PlanBase { + fn derive_plan_base(input: &PlanRef) -> Result { let ctx = input.ctx(); - let schema = input.schema(); + + let schema = Self::derive_schema(input.schema())?; let pk_indices = input.pk_indices(); - PlanBase::new_stream( + Ok(PlanBase::new_stream( ctx, - schema.clone(), + schema, pk_indices.to_vec(), input.distribution().clone(), input.append_only(), - ) + )) + } + + fn derive_schema(schema: &Schema) -> Result { + let mut col_names = HashSet::new(); + for field in schema.fields() { + if is_row_id_column_name(&field.name) { + continue; + } + if !col_names.insert(field.name.clone()) { + return Err(InternalError( + format!("column {} specified more than once", field.name).to_string(), + ) + .into()); + } + } + let mut row_id_count = 0; + let fields = schema + .fields() + .iter() + .map(|field| match is_row_id_column_name(&field.name) { + true => { + let field = Field { + data_type: field.data_type.clone(), + name: gen_row_id_column_name(Some(row_id_count)), + }; + row_id_count += 1; + field + } + false => field.clone(), + }) + .collect(); + Ok(Schema { fields }) } #[must_use] pub fn new(input: PlanRef, table: TableCatalog) -> Self { - let base = Self::derive_plan_base(&input); + let base = Self::derive_plan_base(&input).unwrap(); Self { base, input, table } } @@ -66,7 +101,7 @@ impl StreamMaterialize { user_order_by: Order, user_cols: FixedBitSet, ) -> Result { - let base = Self::derive_plan_base(&input); + let base = Self::derive_plan_base(&input)?; let schema = &base.schema; let pk_indices = &base.pk_indices; // Materialize executor won't change the append-only behavior of the stream, so it depends From 37b7a6464598f08ee35be1fd6aff00b634097cae Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 20:09:33 +0800 Subject: [PATCH 06/13] refactor gen_row_id_column_name --- rust/frontend/src/catalog/mod.rs | 7 ++----- rust/frontend/src/catalog/table_catalog.rs | 6 +++--- rust/frontend/src/handler/create_source.rs | 4 ++-- rust/frontend/src/handler/create_table.rs | 4 ++-- .../frontend/src/optimizer/plan_node/stream_materialize.rs | 2 +- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/rust/frontend/src/catalog/mod.rs b/rust/frontend/src/catalog/mod.rs index 2454f881ea6e9..90b884b63e105 100644 --- a/rust/frontend/src/catalog/mod.rs +++ b/rust/frontend/src/catalog/mod.rs @@ -32,11 +32,8 @@ pub(crate) type ColumnId = risingwave_common::catalog::ColumnId; pub const ROWID_PREFIX: &str = "_row_id"; -pub fn gen_row_id_column_name(idx: Option) -> String { - match idx { - Some(idx) => ROWID_PREFIX.to_string() + "#" + &idx.to_string(), - None => ROWID_PREFIX.to_string(), - } +pub fn gen_row_id_column_name(idx: usize) -> String { + ROWID_PREFIX.to_string() + "#" + &idx.to_string() } pub fn is_row_id_column_name(name: &str) -> bool { diff --git a/rust/frontend/src/catalog/table_catalog.rs b/rust/frontend/src/catalog/table_catalog.rs index 351edbd1e661b..89b8f59d82ba0 100644 --- a/rust/frontend/src/catalog/table_catalog.rs +++ b/rust/frontend/src/catalog/table_catalog.rs @@ -173,7 +173,7 @@ mod tests { ProstColumnCatalog { column_desc: Some(ProstColumnDesc { column_id: 0, - name: gen_row_id_column_name(None).to_string(), + name: gen_row_id_column_name(0).to_string(), field_descs: vec![], column_type: Some(DataType::Int32.to_protobuf()), type_name: String::new(), @@ -220,7 +220,7 @@ mod tests { column_desc: ColumnDesc { data_type: DataType::Int32, column_id: ColumnId::new(0), - name: gen_row_id_column_name(None).to_string(), + name: gen_row_id_column_name(0).to_string(), field_descs: vec![], type_name: String::new() }, @@ -258,7 +258,7 @@ mod tests { column_desc: ColumnDesc { data_type: DataType::Int32, column_id: ColumnId::new(0), - name: gen_row_id_column_name(None).to_string(), + name: gen_row_id_column_name(0).to_string(), field_descs: vec![], type_name: String::new() }, diff --git a/rust/frontend/src/handler/create_source.rs b/rust/frontend/src/handler/create_source.rs index af98332ebf695..09d72df2f571d 100644 --- a/rust/frontend/src/handler/create_source.rs +++ b/rust/frontend/src/handler/create_source.rs @@ -55,7 +55,7 @@ pub(super) async fn handle_create_source( let mut column_catalogs = vec![ColumnCatalog { column_desc: Some(ColumnDesc { column_id: 0, - name: gen_row_id_column_name(None).to_string(), + name: gen_row_id_column_name(0).to_string(), column_type: Some(DataType::Int32.to_protobuf()), field_descs: vec![], type_name: "".to_string(), @@ -221,7 +221,7 @@ mod tests { let city_type = DataType::Struct { fields: vec![DataType::Varchar, DataType::Varchar].into(), }; - let row_id_col_name = gen_row_id_column_name(None); + let row_id_col_name = gen_row_id_column_name(0); let expected_columns = maplit::hashmap! { row_id_col_name.as_str() => DataType::Int32, "id" => DataType::Int32, diff --git a/rust/frontend/src/handler/create_table.rs b/rust/frontend/src/handler/create_table.rs index 88ae19552c2e6..a85e78d01a2b0 100644 --- a/rust/frontend/src/handler/create_table.rs +++ b/rust/frontend/src/handler/create_table.rs @@ -50,7 +50,7 @@ pub fn gen_create_table_plan( column_descs.push(ColumnDesc { data_type: DataType::Int64, column_id: ColumnId::new(0), - name: gen_row_id_column_name(None).to_string(), + name: gen_row_id_column_name(0).to_string(), field_descs: vec![], type_name: "".to_string(), }); @@ -188,7 +188,7 @@ mod tests { .map(|col| (col.name(), col.data_type().clone())) .collect::>(); - let row_id_col_name = gen_row_id_column_name(None); + let row_id_col_name = gen_row_id_column_name(0); let expected_columns = maplit::hashmap! { row_id_col_name.as_str() => DataType::Int64, "v1" => DataType::Int16, diff --git a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs index 591e7454e4962..db51b30df916a 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -78,7 +78,7 @@ impl StreamMaterialize { true => { let field = Field { data_type: field.data_type.clone(), - name: gen_row_id_column_name(Some(row_id_count)), + name: gen_row_id_column_name(row_id_count), }; row_id_count += 1; field From 1274b1f2c819cace31d231ab320be059c112cb05 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 20:35:04 +0800 Subject: [PATCH 07/13] project schema(name) derive --- .../src/optimizer/plan_node/logical_project.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/rust/frontend/src/optimizer/plan_node/logical_project.rs b/rust/frontend/src/optimizer/plan_node/logical_project.rs index 756e71a140f8f..1c0bd3dd67202 100644 --- a/rust/frontend/src/optimizer/plan_node/logical_project.rs +++ b/rust/frontend/src/optimizer/plan_node/logical_project.rs @@ -41,7 +41,7 @@ pub struct LogicalProject { impl LogicalProject { pub fn new(input: PlanRef, exprs: Vec, expr_alias: Vec>) -> Self { let ctx = input.ctx(); - let schema = Self::derive_schema(&exprs, &expr_alias); + let schema = Self::derive_schema(&exprs, &expr_alias, input.schema()); let pk_indices = Self::derive_pk(input.schema(), input.pk_indices(), &exprs); for expr in &exprs { assert_input_ref!(expr, input.schema().fields().len()); @@ -126,13 +126,21 @@ impl LogicalProject { LogicalProject::new(input, exprs, alias).into() } - fn derive_schema(exprs: &[ExprImpl], expr_alias: &[Option]) -> Schema { + fn derive_schema( + exprs: &[ExprImpl], + expr_alias: &[Option], + input_schema: &Schema, + ) -> Schema { + let o2i = Self::o2i_col_mapping_inner(input_schema.len(), exprs); let fields = exprs .iter() .zip_eq(expr_alias.iter()) .enumerate() .map(|(id, (expr, alias))| { - let name = alias.clone().unwrap_or(format!("expr#{}", id)); + let name = alias.clone().unwrap_or(match o2i.try_map(id) { + Some(input_idx) => input_schema.fields()[input_idx].name.clone(), + None => format!("expr#{}", id), + }); Field { name, data_type: expr.return_type(), From 00b8893a093a9ceaa7a03703da0fcfe7bb9650cf Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 20:35:12 +0800 Subject: [PATCH 08/13] fix ut --- .../tests/testdata/basic_query_1.yaml | 24 +++---- .../tests/testdata/basic_query_2.yaml | 68 ++++++++++--------- .../tests/testdata/column_pruning.yaml | 24 +++---- .../test_runner/tests/testdata/mv_on_mv.yaml | 6 +- .../test_runner/tests/testdata/order_by.yaml | 4 +- .../tests/testdata/predicate_pushdown.yaml | 24 +++---- .../tests/testdata/stream_proto.yaml | 30 ++++---- .../test_runner/tests/testdata/subquery.yaml | 26 +++---- .../tests/testdata/subquery_expr.yaml | 34 +++++----- .../test_runner/tests/testdata/tpch.yaml | 12 ++-- 10 files changed, 127 insertions(+), 125 deletions(-) diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml index f095692be6e4a..f507e0a7557f2 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml @@ -8,10 +8,10 @@ select * from t; batch_plan: | BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchScan { table: t, columns: [_row_id#0, v1, v2] } stream_plan: | - StreamMaterialize { columns: [_row_id, v1, v2], pk_columns: [_row_id] } - StreamTableScan { table: t, columns: [_row_id, v1, v2], pk_indices: [0] } + StreamMaterialize { columns: [_row_id#0, v1, v2], pk_columns: [_row_id#0] } + StreamTableScan { table: t, columns: [_row_id#0, v1, v2], pk_indices: [0] } - sql: | create table t (v1 bigint, v2 double precision); select t2.* from t; @@ -22,22 +22,22 @@ batch_plan: | BatchExchange { order: [], dist: Single } BatchFilter { predicate: (((((1:Int32 > 2:Int32) AND (1:Int32 = 1:Int32)) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR (((1:Int32 = 1:Int32) AND (2:Int32 >= 1:Int32)) AND (1:Int32 <= 2:Int32))) } - BatchScan { table: t, columns: [_row_id] } + BatchScan { table: t, columns: [_row_id#0] } stream_plan: | - StreamMaterialize { columns: [_row_id], pk_columns: [_row_id] } + StreamMaterialize { columns: [_row_id#0], pk_columns: [_row_id#0] } StreamFilter { predicate: (((((1:Int32 > 2:Int32) AND (1:Int32 = 1:Int32)) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR (((1:Int32 = 1:Int32) AND (2:Int32 >= 1:Int32)) AND (1:Int32 <= 2:Int32))) } - StreamTableScan { table: t, columns: [_row_id], pk_indices: [0] } + StreamTableScan { table: t, columns: [_row_id#0], pk_indices: [0] } - sql: | create table t (v1 int); select * from t where v1<1; batch_plan: | BatchExchange { order: [], dist: Single } BatchFilter { predicate: ($1 < 1:Int32) } - BatchScan { table: t, columns: [_row_id, v1] } + BatchScan { table: t, columns: [_row_id#0, v1] } stream_plan: | - StreamMaterialize { columns: [_row_id, v1], pk_columns: [_row_id] } + StreamMaterialize { columns: [_row_id#0, v1], pk_columns: [_row_id#0] } StreamFilter { predicate: ($1 < 1:Int32) } - StreamTableScan { table: t, columns: [_row_id, v1], pk_indices: [0] } + StreamTableScan { table: t, columns: [_row_id#0, v1], pk_indices: [0] } - sql: | create table t (); select (((((false is not true) is true) is not false) is false) is not null) is null from t; @@ -53,9 +53,9 @@ BatchProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal)], expr_alias: [ ] } BatchScan { table: t, columns: [v1] } stream_plan: | - StreamMaterialize { columns: [expr#0, expr#1(hidden)], pk_columns: [expr#1] } + StreamMaterialize { columns: [expr#0, _row_id#0(hidden)], pk_columns: [_row_id#0] } StreamProject { exprs: [Case(($0 = 1:Int32), 1:Int32::Decimal, ($0 = 2:Int32), 2:Int32::Decimal, Normalized(0.0):Decimal), $1], expr_alias: [ , ] } - StreamTableScan { table: t, columns: [v1, _row_id], pk_indices: [1] } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | select length(trim(trailing '1' from '12'))+length(trim(leading '2' from '23'))+length(trim(both '3' from '34')); batch_plan: | @@ -89,4 +89,4 @@ batch_plan: | BatchDelete { table: t } BatchFilter { predicate: ($1 = 1:Int32) } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchScan { table: t, columns: [_row_id#0, v1, v2] } diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml index dbfe1f4d25570..7e4a909a59a2c 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_2.yaml @@ -3,7 +3,7 @@ delete from t; batch_plan: | BatchDelete { table: t } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | create table t (v1 int, v2 int); select v1 from t; @@ -11,8 +11,8 @@ BatchExchange { order: [], dist: Single } BatchScan { table: t, columns: [v1] } stream_plan: | - StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id] } - StreamTableScan { table: t, columns: [v1, _row_id], pk_indices: [1] } + StreamMaterialize { columns: [v1, _row_id#0(hidden)], pk_columns: [_row_id#0] } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | values(cast(1 as bigint)); batch_plan: | @@ -44,49 +44,51 @@ create table t1 (v1 int, v2 int); create table t2 (v1 int, v2 int); create table t3 (v1 int, v2 int); - select * from t1 join t2 on (t1.v1 = t2.v1) join t3 on (t2.v2 = t3.v2); + select t1.v1 as t1_v1, t1.v2 as t1_v2, t2.v1 as t2_v1, t2.v2 as t2_v2, t3.v1 as t3_v1, t3.v2 as t3_v2 from t1 join t2 on (t1.v1 = t2.v1) join t3 on (t2.v2 = t3.v2); batch_plan: | BatchExchange { order: [], dist: Single } - BatchHashJoin { type: Inner, predicate: $5 = $8 } - BatchExchange { order: [], dist: HashShard([5]) } - BatchHashJoin { type: Inner, predicate: $1 = $4 } - BatchExchange { order: [], dist: HashShard([1]) } - BatchScan { table: t1, columns: [_row_id, v1, v2] } - BatchExchange { order: [], dist: HashShard([1]) } - BatchScan { table: t2, columns: [_row_id, v1, v2] } - BatchExchange { order: [], dist: HashShard([2]) } - BatchScan { table: t3, columns: [_row_id, v1, v2] } + BatchProject { exprs: [$0, $1, $2, $3, $4, $5], expr_alias: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2] } + BatchHashJoin { type: Inner, predicate: $3 = $5 } + BatchExchange { order: [], dist: HashShard([3]) } + BatchHashJoin { type: Inner, predicate: $0 = $2 } + BatchExchange { order: [], dist: HashShard([0]) } + BatchScan { table: t1, columns: [v1, v2] } + BatchExchange { order: [], dist: HashShard([0]) } + BatchScan { table: t2, columns: [v1, v2] } + BatchExchange { order: [], dist: HashShard([1]) } + BatchScan { table: t3, columns: [v1, v2] } stream_plan: | - StreamMaterialize { columns: [_row_id, v1, v2, _row_id, v1, v2, _row_id, v1, v2], pk_columns: [_row_id, _row_id, _row_id] } - StreamHashJoin { type: Inner, predicate: $5 = $8 } - StreamExchange { dist: HashShard([5]) } - StreamHashJoin { type: Inner, predicate: $1 = $4 } - StreamExchange { dist: HashShard([1]) } - StreamTableScan { table: t1, columns: [_row_id, v1, v2], pk_indices: [0] } - StreamExchange { dist: HashShard([1]) } - StreamTableScan { table: t2, columns: [_row_id, v1, v2], pk_indices: [0] } - StreamExchange { dist: HashShard([2]) } - StreamTableScan { table: t3, columns: [_row_id, v1, v2], pk_indices: [0] } + StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [t2_v1, _row_id#0, _row_id#1] } + StreamProject { exprs: [$0, $1, $2, $3, $6, $7, $5, $8], expr_alias: [t1_v1, t1_v2, t2_v1, t2_v2, t3_v1, t3_v2, , ] } + StreamHashJoin { type: Inner, predicate: $4 = $7 } + StreamExchange { dist: HashShard([4]) } + StreamHashJoin { type: Inner, predicate: $0 = $3 } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([0]) } + StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] } + StreamExchange { dist: HashShard([1]) } + StreamTableScan { table: t3, columns: [v1, v2, _row_id#0], pk_indices: [2] } - sql: | create table t1 (v1 int not null, v2 int not null); create table t2 (v1 int not null, v2 int not null); - select t1.v2, t2.v2 from t1 join t2 on t1.v1 = t2.v1; + select t1.v2 as t1_v2, t2.v2 as t2_v2 from t1 join t2 on t1.v1 = t2.v1; batch_plan: | BatchExchange { order: [], dist: Single } - BatchProject { exprs: [$1, $3], expr_alias: [v2, v2] } + BatchProject { exprs: [$1, $3], expr_alias: [t1_v2, t2_v2] } BatchHashJoin { type: Inner, predicate: $0 = $2 } BatchExchange { order: [], dist: HashShard([0]) } BatchScan { table: t1, columns: [v1, v2] } BatchExchange { order: [], dist: HashShard([0]) } BatchScan { table: t2, columns: [v1, v2] } stream_plan: | - StreamMaterialize { columns: [v2, v2, expr#2(hidden), expr#3(hidden)], pk_columns: [expr#2, expr#3] } - StreamProject { exprs: [$1, $4, $2, $5], expr_alias: [v2, v2, , ] } + StreamMaterialize { columns: [t1_v2, t2_v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] } + StreamProject { exprs: [$1, $4, $2, $5], expr_alias: [t1_v2, t2_v2, , ] } StreamHashJoin { type: Inner, predicate: $0 = $3 } StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t1, columns: [v1, v2, _row_id], pk_indices: [2] } + StreamTableScan { table: t1, columns: [v1, v2, _row_id#0], pk_indices: [2] } StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t2, columns: [v1, v2, _row_id], pk_indices: [2] } + StreamTableScan { table: t2, columns: [v1, v2, _row_id#0], pk_indices: [2] } - sql: select 1 batch_plan: | BatchProject { exprs: [1:Int32], expr_alias: [ ] } @@ -109,7 +111,7 @@ StreamExchange { dist: HashShard([0]) } StreamProject { exprs: [$0, $1, $2, $0, $3], expr_alias: [ , , , , ] } StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] } + StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] } - sql: | create table t(v1 int, v2 int, v3 int); select min(v1) + max(v2) * count(v3) from t; @@ -119,10 +121,10 @@ BatchExchange { order: [], dist: Single } BatchScan { table: t, columns: [v1, v2, v3] } stream_plan: | - StreamMaterialize { columns: [expr#0, expr#1(hidden), expr#2(hidden), expr#3(hidden), expr#4(hidden)], pk_columns: [expr#1, expr#2, expr#3, expr#4] } + StreamMaterialize { columns: [expr#0, agg#0(hidden), agg#1(hidden), agg#2(hidden), agg#3(hidden)], pk_columns: [agg#0, agg#1, agg#2, agg#3] } StreamProject { exprs: [($1 + ($2 * $3)), $0, $1, $2, $3], expr_alias: [ , , , , ] } StreamSimpleAgg { aggs: [count, min($0), max($1), count($2)] } - StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] } + StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] } - sql: | create table t(v1 int, v2 int); select v1 from t group by v2; @@ -143,4 +145,4 @@ StreamHashAgg { group_keys: [$0], aggs: [count, min($1), sum($2), count($2)] } StreamProject { exprs: [$2, $0, ($0 + $1), $3], expr_alias: [ , , , ] } StreamExchange { dist: HashShard([2]) } - StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] } + StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] } diff --git a/rust/frontend/test_runner/tests/testdata/column_pruning.yaml b/rust/frontend/test_runner/tests/testdata/column_pruning.yaml index 6118228746e5c..825644b83788f 100644 --- a/rust/frontend/test_runner/tests/testdata/column_pruning.yaml +++ b/rust/frontend/test_runner/tests/testdata/column_pruning.yaml @@ -3,7 +3,7 @@ select v1 from t logical_plan: | LogicalProject { exprs: [$1], expr_alias: [v1] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | LogicalScan { table: t, columns: [v1] } - sql: | @@ -13,7 +13,7 @@ logical_plan: | LogicalProject { exprs: [$1], expr_alias: [v1] } LogicalFilter { predicate: ($2 > 2:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalProject { exprs: [$0], expr_alias: [v1] } LogicalFilter { predicate: ($1 > 2:Int32) } @@ -26,8 +26,8 @@ logical_plan: | LogicalProject { exprs: [$1, $5], expr_alias: [v1, v1] } LogicalJoin { type: Inner, on: ($2 = $6) } - LogicalScan { table: t1, columns: [_row_id, v1, v2, v3] } - LogicalScan { table: t2, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t1, columns: [_row_id#0, v1, v2, v3] } + LogicalScan { table: t2, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalProject { exprs: [$0, $2], expr_alias: [v1, v1] } LogicalJoin { type: Inner, on: ($1 = $3) } @@ -42,7 +42,7 @@ LogicalAgg { group_keys: [], agg_calls: [count($0)] } LogicalProject { exprs: [$1], expr_alias: [ ] } LogicalFilter { predicate: ($2 > 2:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalAgg { group_keys: [], agg_calls: [count($0)] } LogicalProject { exprs: [$0], expr_alias: [ ] } @@ -54,7 +54,7 @@ select 1 from t logical_plan: | LogicalProject { exprs: [1:Int32], expr_alias: [ ] } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalProject { exprs: [1:Int32], expr_alias: [ ] } LogicalScan { table: t, columns: [] } @@ -65,7 +65,7 @@ logical_plan: | LogicalProject { exprs: [1:Int32], expr_alias: [ ] } LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalProject { exprs: [1:Int32], expr_alias: [ ] } LogicalFilter { predicate: ($0 > 1:Int32) } @@ -78,7 +78,7 @@ LogicalProject { exprs: [$0], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count($0)] } LogicalProject { exprs: [1:Int32], expr_alias: [ ] } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalAgg { group_keys: [], agg_calls: [count($0)] } LogicalProject { exprs: [1:Int32], expr_alias: [ ] } @@ -92,7 +92,7 @@ LogicalAgg { group_keys: [], agg_calls: [count($0)] } LogicalProject { exprs: [1:Int32], expr_alias: [ ] } LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalAgg { group_keys: [], agg_calls: [count($0)] } LogicalProject { exprs: [1:Int32], expr_alias: [ ] } @@ -107,8 +107,8 @@ LogicalProject { exprs: [$1, $5], expr_alias: [v1, v1] } LogicalFilter { predicate: ($3 < 1:Int32) } LogicalJoin { type: Inner, on: ($2 = $6) } - LogicalScan { table: t1, columns: [_row_id, v1, v2, v3] } - LogicalScan { table: t2, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t1, columns: [_row_id#0, v1, v2, v3] } + LogicalScan { table: t2, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalProject { exprs: [$0, $2], expr_alias: [v1, v1] } LogicalJoin { type: Inner, on: ($1 = $3) } @@ -126,7 +126,7 @@ LogicalAgg { group_keys: [], agg_calls: [count($0), count($1)] } LogicalProject { exprs: [1:Int32, $1], expr_alias: [ , ] } LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalAgg { group_keys: [], agg_calls: [count($0), count($1)] } LogicalProject { exprs: [1:Int32, $0], expr_alias: [ , ] } diff --git a/rust/frontend/test_runner/tests/testdata/mv_on_mv.yaml b/rust/frontend/test_runner/tests/testdata/mv_on_mv.yaml index fb7866610c290..e1aaab260d4ba 100644 --- a/rust/frontend/test_runner/tests/testdata/mv_on_mv.yaml +++ b/rust/frontend/test_runner/tests/testdata/mv_on_mv.yaml @@ -10,10 +10,10 @@ sql: | select m1.v1 as m1v1, m1.v2 as m1v2, m2.v1 as m2v1, m2.v2 as m2v2 from m1 join m2 on m1.v1 = m2.v1; stream_plan: | - StreamMaterialize { columns: [m1v1, m1v2, m2v1, m2v2, expr#4(hidden), expr#5(hidden)], pk_columns: [expr#4, expr#5] } + StreamMaterialize { columns: [m1v1, m1v2, m2v1, m2v2, _row_id#0(hidden), _row_id#1(hidden)], pk_columns: [_row_id#0, _row_id#1] } StreamProject { exprs: [$0, $1, $3, $4, $2, $5], expr_alias: [m1v1, m1v2, m2v1, m2v2, , ] } StreamHashJoin { type: Inner, predicate: $0 = $3 } StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: m1, columns: [v1, v2, _row_id], pk_indices: [2] } + StreamTableScan { table: m1, columns: [v1, v2, _row_id#0], pk_indices: [2] } StreamExchange { dist: HashShard([0]) } - StreamTableScan { table: m2, columns: [v1, v2, _row_id], pk_indices: [2] } + StreamTableScan { table: m2, columns: [v1, v2, _row_id#0], pk_indices: [2] } diff --git a/rust/frontend/test_runner/tests/testdata/order_by.yaml b/rust/frontend/test_runner/tests/testdata/order_by.yaml index 2f248cc4f949b..b6c7d1086b1e0 100644 --- a/rust/frontend/test_runner/tests/testdata/order_by.yaml +++ b/rust/frontend/test_runner/tests/testdata/order_by.yaml @@ -4,14 +4,14 @@ batch_plan: | BatchExchange { order: [$1 DESC], dist: Single } BatchSort { order: [$1 DESC] } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select t.* from t order by v1; batch_plan: | BatchExchange { order: [$1 ASC], dist: Single } BatchSort { order: [$1 ASC] } - BatchScan { table: t, columns: [_row_id, v1, v2] } + BatchScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select v1, v1+1 from t order by v1; diff --git a/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml b/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml index 326c12f09b019..d28a27a734504 100644 --- a/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml +++ b/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml @@ -3,28 +3,28 @@ create table t2 (v1 int, v2 int, v3 int); select * from t1 join t2 on t1.v1=t2.v2 and t1.v1>1 where t2.v2>2; logical_plan: | - LogicalProject { exprs: [$0, $1, $2, $3, $4, $5, $6, $7], expr_alias: [_row_id, v1, v2, v3, _row_id, v1, v2, v3] } + LogicalProject { exprs: [$0, $1, $2, $3, $4, $5, $6, $7], expr_alias: [_row_id#0, v1, v2, v3, _row_id#0, v1, v2, v3] } LogicalFilter { predicate: ($6 > 2:Int32) } LogicalJoin { type: Inner, on: ($1 = $6) AND ($1 > 1:Int32) } - LogicalScan { table: t1, columns: [_row_id, v1, v2, v3] } - LogicalScan { table: t2, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t1, columns: [_row_id#0, v1, v2, v3] } + LogicalScan { table: t2, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | LogicalJoin { type: Inner, on: ($1 = $6) } LogicalFilter { predicate: ($1 > 1:Int32) } - LogicalScan { table: t1, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t1, columns: [_row_id#0, v1, v2, v3] } LogicalFilter { predicate: ($2 > 2:Int32) } - LogicalScan { table: t2, columns: [_row_id, v1, v2, v3] } + LogicalScan { table: t2, columns: [_row_id#0, v1, v2, v3] } - sql: | create table t (v1 bigint, v2 double precision); select * from (select * from t) where v2 > 1; logical_plan: | - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select v1 from (select v2, v1 from t) where v2 > 1; @@ -32,7 +32,7 @@ LogicalProject { exprs: [$1], expr_alias: [v1] } LogicalFilter { predicate: ($0 > 1:Int32) } LogicalProject { exprs: [$2, $1], expr_alias: [v2, v1] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | LogicalProject { exprs: [$0], expr_alias: [v1] } LogicalFilter { predicate: ($1 > 1:Int32) } @@ -45,7 +45,7 @@ LogicalFilter { predicate: ($0 > 1:Int32) } LogicalProject { exprs: [$2, $1], expr_alias: [a2, v1] } LogicalFilter { predicate: ($1 > 2:Int32) } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | LogicalProject { exprs: [$0], expr_alias: [v1] } LogicalFilter { predicate: ($1 > 1:Int32) } @@ -60,7 +60,7 @@ LogicalProject { exprs: [$0, $1], expr_alias: [v1, min] } LogicalAgg { group_keys: [0], agg_calls: [min($1)] } LogicalProject { exprs: [$1, $2], expr_alias: [ , ] } - LogicalScan { table: t, columns: [_row_id, v1, v2, v3, v4] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3, v4] } optimized_logical_plan: | LogicalProject { exprs: [$0, $1], expr_alias: [v1, min] } LogicalFilter { predicate: ($1 > 1:Int32) AND ($0 > $1) } diff --git a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml index 5b38e1a0cb465..5d4416cb74769 100644 --- a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml +++ b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml @@ -17,7 +17,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - columnType: typeName: INT32 isNullable: true @@ -32,7 +32,7 @@ - dataType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - dataType: typeName: INT32 isNullable: true @@ -82,7 +82,7 @@ columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - columnDesc: columnType: typeName: INT32 @@ -111,7 +111,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - columnType: typeName: INT32 isNullable: true @@ -126,7 +126,7 @@ - dataType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - dataType: typeName: INT32 isNullable: true @@ -154,7 +154,7 @@ columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - columnDesc: columnType: typeName: INT32 @@ -188,7 +188,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" pkIndices: - 1 chainNode: @@ -198,7 +198,7 @@ - dataType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - dataType: typeName: INT32 isNullable: true @@ -237,7 +237,7 @@ typeName: INT64 isNullable: true columnId: 1 - name: _row_id + name: "_row_id#0" isHidden: true pkColumnIds: - 1 @@ -267,7 +267,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" pkIndices: - 1 chainNode: @@ -277,7 +277,7 @@ - dataType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - dataType: typeName: INT32 isNullable: true @@ -376,7 +376,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" pkIndices: - 2 chainNode: @@ -386,7 +386,7 @@ - dataType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" - dataType: typeName: INT32 isNullable: true @@ -414,7 +414,7 @@ - dataType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" strategy: type: HASH columnIndices: @@ -502,7 +502,7 @@ typeName: INT32 isNullable: true columnId: 1 - name: "expr#1" + name: v2 isHidden: true pkColumnIds: - 1 diff --git a/rust/frontend/test_runner/tests/testdata/subquery.yaml b/rust/frontend/test_runner/tests/testdata/subquery.yaml index f99d6b2dc3006..352d6dc027911 100644 --- a/rust/frontend/test_runner/tests/testdata/subquery.yaml +++ b/rust/frontend/test_runner/tests/testdata/subquery.yaml @@ -4,8 +4,8 @@ logical_plan: | LogicalProject { exprs: [$1], expr_alias: [v1] } LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | /* merge and then eliminate */ create table t (v1 bigint, v2 double precision); @@ -13,7 +13,7 @@ logical_plan: | LogicalProject { exprs: [$0, $1], expr_alias: [v1, v2] } LogicalProject { exprs: [$1, $2], expr_alias: [a1, a2] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | LogicalScan { table: t, columns: [v1, v2] } - sql: | @@ -27,7 +27,7 @@ LogicalProject { exprs: [$1], expr_alias: [v3] } LogicalFilter { predicate: ($0 > 1:Int32) } LogicalProject { exprs: [$2, $1], expr_alias: [v2, v3] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | /* consecutive projects are merged */ create table t (v1 bigint, v2 double precision); @@ -35,7 +35,7 @@ logical_plan: | LogicalProject { exprs: [$0, 2:Int32], expr_alias: [v1, ] } LogicalProject { exprs: [$1, $2, 1:Int32], expr_alias: [v1, v2, ] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | LogicalProject { exprs: [$0, 2:Int32], expr_alias: [v1, ] } LogicalScan { table: t, columns: [v1] } @@ -43,21 +43,21 @@ create table t (v1 bigint, v2 double precision); select * from (select * from t); logical_plan: | - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | /* joins */ create table t (v1 bigint, v2 double precision); select * from (select * from t), t; logical_plan: | - LogicalProject { exprs: [$0, $1, $2, $3, $4, $5], expr_alias: [_row_id, v1, v2, _row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $2, $3, $4, $5], expr_alias: [_row_id#0, v1, v2, _row_id#0, v1, v2] } LogicalJoin { type: Inner, on: true:Boolean } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id, v1, v2] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } - LogicalScan { table: t, columns: [_row_id, v1, v2] } + LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } + LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | /* table alias not supported yet */ create table t (v1 bigint, v2 double precision); diff --git a/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml b/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml index 990865e26d3a5..062de30904adc 100644 --- a/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml +++ b/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml @@ -12,18 +12,18 @@ logical_plan: | LogicalProject { exprs: [$2, 1:Int32], expr_alias: [ , ] } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [$1], expr_alias: [x] } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | create table t(x int); select (select x from t) + 1 from t; logical_plan: | LogicalProject { exprs: [($2 + 1:Int32)], expr_alias: [ ] } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [$1], expr_alias: [x] } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | create table t(x int); select (select x from t), (select 1); @@ -33,7 +33,7 @@ LogicalJoin { type: LeftOuter, on: true:Boolean } LogicalValues { rows: [[]], schema: Schema { fields: [] } } LogicalProject { exprs: [$1], expr_alias: [x] } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [1:Int32], expr_alias: [ ] } LogicalValues { rows: [[]], schema: Schema { fields: [] } } - sql: | @@ -42,12 +42,12 @@ logical_plan: | LogicalProject { exprs: [($1 + $2)], expr_alias: [v3] } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [($1 + $2)], expr_alias: [v2] } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [$1], expr_alias: [v1] } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | select (select 1, 2); binder_error: 'Bind error: subquery must return only one column' @@ -61,8 +61,8 @@ LogicalValues { rows: [[]], schema: Schema { fields: [] } } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, x] } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalProject { exprs: [$0, $1], expr_alias: [_row_id#0, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | create table t1(x int); create table t2(x int); @@ -71,11 +71,11 @@ LogicalProject { exprs: [$1], expr_alias: [x] } LogicalFilter { predicate: $2 } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t1, columns: [_row_id, x] } + LogicalScan { table: t1, columns: [_row_id#0, x] } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } LogicalProject { exprs: [$1], expr_alias: [x] } - LogicalScan { table: t2, columns: [_row_id, x] } + LogicalScan { table: t2, columns: [_row_id#0, x] } - sql: | create table t(x int); select x from t where exists (select * from t); @@ -83,11 +83,11 @@ LogicalProject { exprs: [$1], expr_alias: [x] } LogicalFilter { predicate: $2 } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id, x] } - LogicalScan { table: t, columns: [_row_id, x] } + LogicalProject { exprs: [$0, $1], expr_alias: [_row_id#0, x] } + LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | create table t1(x int); create table t2(x int); @@ -96,6 +96,6 @@ LogicalProject { exprs: [$1], expr_alias: [x] } LogicalFilter { predicate: ($1 > $2) } LogicalJoin { type: LeftOuter, on: true:Boolean } - LogicalScan { table: t1, columns: [_row_id, x] } + LogicalScan { table: t1, columns: [_row_id#0, x] } LogicalProject { exprs: [$1], expr_alias: [x] } - LogicalScan { table: t2, columns: [_row_id, x] } + LogicalScan { table: t2, columns: [_row_id#0, x] } diff --git a/rust/frontend/test_runner/tests/testdata/tpch.yaml b/rust/frontend/test_runner/tests/testdata/tpch.yaml index 6959cf300efc9..a271c0aba72ae 100644 --- a/rust/frontend/test_runner/tests/testdata/tpch.yaml +++ b/rust/frontend/test_runner/tests/testdata/tpch.yaml @@ -80,7 +80,7 @@ StreamHashAgg { group_keys: [$0, $1], aggs: [count, sum($2), sum($3), sum($4), sum($5), sum($6), count($6), sum($7), count($7), sum($8), count($8), count] } StreamProject { exprs: [$4, $5, $0, $1, ($1 * (1:Int32 - $2)), (($1 * (1:Int32 - $2)) * (1:Int32 + $3)), $0, $1, $2, $6], expr_alias: [ , , , , , , , , , ] } StreamExchange { dist: HashShard([4, 5]) } - StreamTableScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, _row_id], pk_indices: [6] } + StreamTableScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, _row_id#0], pk_indices: [6] } - id: tpch_q3 before: - create_tables @@ -143,13 +143,13 @@ StreamProject { exprs: [$0, $2], expr_alias: [ , ] } StreamExchange { dist: HashShard([0]) } StreamFilter { predicate: ($1 = "FURNITURE":Varchar) AND true:Boolean AND true:Boolean } - StreamTableScan { table: customer, columns: [c_custkey, c_mktsegment, _row_id], pk_indices: [2] } + StreamTableScan { table: customer, columns: [c_custkey, c_mktsegment, _row_id#0], pk_indices: [2] } StreamExchange { dist: HashShard([1]) } StreamFilter { predicate: } - StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority, _row_id], pk_indices: [4] } + StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_orderdate, o_shippriority, _row_id#0], pk_indices: [4] } StreamExchange { dist: HashShard([0]) } StreamFilter { predicate: } - StreamTableScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount, _row_id], pk_indices: [3] } + StreamTableScan { table: lineitem, columns: [l_orderkey, l_extendedprice, l_discount, _row_id#0], pk_indices: [3] } - id: tpch_q6 before: - create_tables @@ -171,9 +171,9 @@ BatchFilter { predicate: ($0 < 24:Int32) } BatchScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount] } stream_plan: | - StreamMaterialize { columns: [revenue, expr#1(hidden)], pk_columns: [expr#1, revenue] } + StreamMaterialize { columns: [revenue, agg#0(hidden)], pk_columns: [agg#0, revenue] } StreamProject { exprs: [$1, $0], expr_alias: [revenue, ] } StreamSimpleAgg { aggs: [count, sum($0)] } StreamProject { exprs: [($1 * $2), $3], expr_alias: [ , ] } StreamFilter { predicate: ($0 < 24:Int32) } - StreamTableScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount, _row_id], pk_indices: [3] } + StreamTableScan { table: lineitem, columns: [l_quantity, l_extendedprice, l_discount, _row_id#0], pk_indices: [3] } From 91073b52f56abba19487c9910432279681eece4e Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 20:53:35 +0800 Subject: [PATCH 09/13] add alias_check if is row id --- rust/frontend/src/binder/query.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/rust/frontend/src/binder/query.rs b/rust/frontend/src/binder/query.rs index 3bb7c1d2b41c1..2fc3918c5198a 100644 --- a/rust/frontend/src/binder/query.rs +++ b/rust/frontend/src/binder/query.rs @@ -19,6 +19,7 @@ use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Expr, OrderByExpr, Query}; use crate::binder::{Binder, BoundSetExpr}; +use crate::catalog::{is_row_id_column_name, ROWID_PREFIX}; use crate::optimizer::property::{Direction, FieldOrder}; /// A validated sql query, including order and union. @@ -55,15 +56,33 @@ impl Binder { result } + fn check_aliases(aliases: &[Option]) -> Result<()> { + for alias in aliases { + if let Some(name) = alias { + if is_row_id_column_name(name) { + return Err(ErrorCode::InternalError(format!( + "column name prefixed with {:?} are reserved word.", + ROWID_PREFIX + )) + .into()); + } + } + } + Ok(()) + } + fn bind_query_inner(&mut self, query: Query) -> Result { let body = self.bind_set_expr(query.body)?; let mut name_to_index = HashMap::new(); match &body { - BoundSetExpr::Select(s) => s.aliases.iter().enumerate().for_each(|(index, alias)| { - if let Some(name) = alias { - name_to_index.insert(name.clone(), index); - } - }), + BoundSetExpr::Select(s) => { + Self::check_aliases(&s.aliases)?; + s.aliases.iter().enumerate().for_each(|(index, alias)| { + if let Some(name) = alias { + name_to_index.insert(name.clone(), index); + } + }) + } BoundSetExpr::Values(_) => {} }; let order = query From bd318842bbb91fb176b2be684f17cf59f44dab2b Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 20:55:58 +0800 Subject: [PATCH 10/13] add column name check if is row id --- rust/frontend/src/handler/create_table.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/rust/frontend/src/handler/create_table.rs b/rust/frontend/src/handler/create_table.rs index a85e78d01a2b0..3fc232be7b53f 100644 --- a/rust/frontend/src/handler/create_table.rs +++ b/rust/frontend/src/handler/create_table.rs @@ -16,7 +16,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_pb::catalog::source::Info; use risingwave_pb::catalog::{Source as ProstSource, Table as ProstTable, TableSourceInfo}; @@ -25,7 +25,7 @@ use risingwave_sqlparser::ast::{ColumnDef, ObjectName}; use crate::binder::expr::bind_data_type; use crate::binder::Binder; -use crate::catalog::gen_row_id_column_name; +use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ROWID_PREFIX}; use crate::optimizer::plan_node::StreamSource; use crate::optimizer::property::{Distribution, Order}; use crate::optimizer::{PlanRef, PlanRoot}; @@ -56,6 +56,14 @@ pub fn gen_create_table_plan( }); // Then user columns. for (i, column) in columns.into_iter().enumerate() { + if is_row_id_column_name(&column.name.value) { + return Err(ErrorCode::InternalError(format!( + "column name prefixed with {:?} are reserved word.", + ROWID_PREFIX + )) + .into()); + } + column_descs.push(ColumnDesc { data_type: bind_data_type(&column.data_type)?, column_id: ColumnId::new((i + 1) as i32), From 0e2ad884a2e05c691e9096d3e222fddff32e8a94 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 21:17:21 +0800 Subject: [PATCH 11/13] check alias not prefixed with _row_id --- rust/frontend/src/binder/query.rs | 28 +++++----------------------- rust/frontend/src/binder/select.rs | 9 +++++++++ 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/rust/frontend/src/binder/query.rs b/rust/frontend/src/binder/query.rs index 2fc3918c5198a..92390264c0e3f 100644 --- a/rust/frontend/src/binder/query.rs +++ b/rust/frontend/src/binder/query.rs @@ -56,33 +56,15 @@ impl Binder { result } - fn check_aliases(aliases: &[Option]) -> Result<()> { - for alias in aliases { - if let Some(name) = alias { - if is_row_id_column_name(name) { - return Err(ErrorCode::InternalError(format!( - "column name prefixed with {:?} are reserved word.", - ROWID_PREFIX - )) - .into()); - } - } - } - Ok(()) - } - fn bind_query_inner(&mut self, query: Query) -> Result { let body = self.bind_set_expr(query.body)?; let mut name_to_index = HashMap::new(); match &body { - BoundSetExpr::Select(s) => { - Self::check_aliases(&s.aliases)?; - s.aliases.iter().enumerate().for_each(|(index, alias)| { - if let Some(name) = alias { - name_to_index.insert(name.clone(), index); - } - }) - } + BoundSetExpr::Select(s) => s.aliases.iter().enumerate().for_each(|(index, alias)| { + if let Some(name) = alias { + name_to_index.insert(name.clone(), index); + } + }), BoundSetExpr::Values(_) => {} }; let order = query diff --git a/rust/frontend/src/binder/select.rs b/rust/frontend/src/binder/select.rs index fa33fea688b29..80e10226de945 100644 --- a/rust/frontend/src/binder/select.rs +++ b/rust/frontend/src/binder/select.rs @@ -22,6 +22,7 @@ use risingwave_sqlparser::ast::{Expr, Select, SelectItem}; use super::bind_context::{Clause, ColumnBinding}; use super::UNNAMED_COLUMN; use crate::binder::{Binder, Relation}; +use crate::catalog::{is_row_id_column_name, ROWID_PREFIX}; use crate::expr::{Expr as _, ExprImpl, InputRef}; #[derive(Debug)] @@ -118,6 +119,14 @@ impl Binder { aliases.push(alias); } SelectItem::ExprWithAlias { expr, alias } => { + if is_row_id_column_name(&alias.value) { + return Err(ErrorCode::InternalError(format!( + "column name prefixed with {:?} are reserved word.", + ROWID_PREFIX + )) + .into()); + } + let expr = self.bind_expr(expr)?; select_list.push(expr); aliases.push(Some(alias.value)); From af7a4196401a54430ebf65a29e5f7160faebcec0 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 28 Mar 2022 21:22:17 +0800 Subject: [PATCH 12/13] clippy fix --- rust/frontend/src/binder/query.rs | 1 - rust/frontend/src/catalog/table_catalog.rs | 6 +++--- rust/frontend/src/handler/create_source.rs | 2 +- rust/frontend/src/handler/create_table.rs | 3 +-- .../frontend/src/optimizer/plan_node/stream_materialize.rs | 7 ++++--- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/rust/frontend/src/binder/query.rs b/rust/frontend/src/binder/query.rs index 92390264c0e3f..3bb7c1d2b41c1 100644 --- a/rust/frontend/src/binder/query.rs +++ b/rust/frontend/src/binder/query.rs @@ -19,7 +19,6 @@ use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Expr, OrderByExpr, Query}; use crate::binder::{Binder, BoundSetExpr}; -use crate::catalog::{is_row_id_column_name, ROWID_PREFIX}; use crate::optimizer::property::{Direction, FieldOrder}; /// A validated sql query, including order and union. diff --git a/rust/frontend/src/catalog/table_catalog.rs b/rust/frontend/src/catalog/table_catalog.rs index 89b8f59d82ba0..26e4305efced3 100644 --- a/rust/frontend/src/catalog/table_catalog.rs +++ b/rust/frontend/src/catalog/table_catalog.rs @@ -173,7 +173,7 @@ mod tests { ProstColumnCatalog { column_desc: Some(ProstColumnDesc { column_id: 0, - name: gen_row_id_column_name(0).to_string(), + name: gen_row_id_column_name(0), field_descs: vec![], column_type: Some(DataType::Int32.to_protobuf()), type_name: String::new(), @@ -220,7 +220,7 @@ mod tests { column_desc: ColumnDesc { data_type: DataType::Int32, column_id: ColumnId::new(0), - name: gen_row_id_column_name(0).to_string(), + name: gen_row_id_column_name(0), field_descs: vec![], type_name: String::new() }, @@ -258,7 +258,7 @@ mod tests { column_desc: ColumnDesc { data_type: DataType::Int32, column_id: ColumnId::new(0), - name: gen_row_id_column_name(0).to_string(), + name: gen_row_id_column_name(0), field_descs: vec![], type_name: String::new() }, diff --git a/rust/frontend/src/handler/create_source.rs b/rust/frontend/src/handler/create_source.rs index 09d72df2f571d..bb89c4c9361e6 100644 --- a/rust/frontend/src/handler/create_source.rs +++ b/rust/frontend/src/handler/create_source.rs @@ -55,7 +55,7 @@ pub(super) async fn handle_create_source( let mut column_catalogs = vec![ColumnCatalog { column_desc: Some(ColumnDesc { column_id: 0, - name: gen_row_id_column_name(0).to_string(), + name: gen_row_id_column_name(0), column_type: Some(DataType::Int32.to_protobuf()), field_descs: vec![], type_name: "".to_string(), diff --git a/rust/frontend/src/handler/create_table.rs b/rust/frontend/src/handler/create_table.rs index 3fc232be7b53f..4274c94615e2b 100644 --- a/rust/frontend/src/handler/create_table.rs +++ b/rust/frontend/src/handler/create_table.rs @@ -50,7 +50,7 @@ pub fn gen_create_table_plan( column_descs.push(ColumnDesc { data_type: DataType::Int64, column_id: ColumnId::new(0), - name: gen_row_id_column_name(0).to_string(), + name: gen_row_id_column_name(0), field_descs: vec![], type_name: "".to_string(), }); @@ -161,7 +161,6 @@ mod tests { use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; use risingwave_common::types::DataType; - use super::*; use crate::catalog::gen_row_id_column_name; use crate::test_utils::LocalFrontend; diff --git a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs index 3cabcaeda18d1..4739f657874cd 100644 --- a/rust/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/rust/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -64,9 +64,10 @@ impl StreamMaterialize { continue; } if !col_names.insert(field.name.clone()) { - return Err(InternalError( - format!("column {} specified more than once", field.name).to_string(), - ) + return Err(InternalError(format!( + "column {} specified more than once", + field.name + )) .into()); } } From 3c3f6503d415ac3ce6c9a428649c7c861ebaea75 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 29 Mar 2022 11:22:18 +0800 Subject: [PATCH 13/13] ut --- .../tests/testdata/basic_query_1.yaml | 20 +++++++------- .../test_runner/tests/testdata/limit.yaml | 6 ++--- .../test_runner/tests/testdata/order_by.yaml | 6 ++--- .../tests/testdata/predicate_pushdown.yaml | 22 ++++++++-------- .../tests/testdata/stream_proto.yaml | 26 +++---------------- .../test_runner/tests/testdata/subquery.yaml | 16 ++++++------ .../tests/testdata/subquery_expr.yaml | 4 +-- 7 files changed, 41 insertions(+), 59 deletions(-) diff --git a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml index f507e0a7557f2..2fa02129451c0 100644 --- a/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml +++ b/rust/frontend/test_runner/tests/testdata/basic_query_1.yaml @@ -8,10 +8,10 @@ select * from t; batch_plan: | BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [_row_id#0, v1, v2] } + BatchScan { table: t, columns: [v1, v2] } stream_plan: | - StreamMaterialize { columns: [_row_id#0, v1, v2], pk_columns: [_row_id#0] } - StreamTableScan { table: t, columns: [_row_id#0, v1, v2], pk_indices: [0] } + StreamMaterialize { columns: [v1, v2, _row_id#0(hidden)], pk_columns: [_row_id#0] } + StreamTableScan { table: t, columns: [v1, v2, _row_id#0], pk_indices: [2] } - sql: | create table t (v1 bigint, v2 double precision); select t2.* from t; @@ -22,9 +22,9 @@ batch_plan: | BatchExchange { order: [], dist: Single } BatchFilter { predicate: (((((1:Int32 > 2:Int32) AND (1:Int32 = 1:Int32)) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR (((1:Int32 = 1:Int32) AND (2:Int32 >= 1:Int32)) AND (1:Int32 <= 2:Int32))) } - BatchScan { table: t, columns: [_row_id#0] } + BatchScan { table: t, columns: [] } stream_plan: | - StreamMaterialize { columns: [_row_id#0], pk_columns: [_row_id#0] } + StreamMaterialize { columns: [_row_id#0(hidden)], pk_columns: [_row_id#0] } StreamFilter { predicate: (((((1:Int32 > 2:Int32) AND (1:Int32 = 1:Int32)) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR (((1:Int32 = 1:Int32) AND (2:Int32 >= 1:Int32)) AND (1:Int32 <= 2:Int32))) } StreamTableScan { table: t, columns: [_row_id#0], pk_indices: [0] } - sql: | @@ -32,12 +32,12 @@ select * from t where v1<1; batch_plan: | BatchExchange { order: [], dist: Single } - BatchFilter { predicate: ($1 < 1:Int32) } - BatchScan { table: t, columns: [_row_id#0, v1] } + BatchFilter { predicate: ($0 < 1:Int32) } + BatchScan { table: t, columns: [v1] } stream_plan: | - StreamMaterialize { columns: [_row_id#0, v1], pk_columns: [_row_id#0] } - StreamFilter { predicate: ($1 < 1:Int32) } - StreamTableScan { table: t, columns: [_row_id#0, v1], pk_indices: [0] } + StreamMaterialize { columns: [v1, _row_id#0(hidden)], pk_columns: [_row_id#0] } + StreamFilter { predicate: ($0 < 1:Int32) } + StreamTableScan { table: t, columns: [v1, _row_id#0], pk_indices: [1] } - sql: | create table t (); select (((((false is not true) is true) is not false) is false) is not null) is null from t; diff --git a/rust/frontend/test_runner/tests/testdata/limit.yaml b/rust/frontend/test_runner/tests/testdata/limit.yaml index 7a1bb0c8b9427..49ad5924d7f97 100644 --- a/rust/frontend/test_runner/tests/testdata/limit.yaml +++ b/rust/frontend/test_runner/tests/testdata/limit.yaml @@ -4,14 +4,14 @@ logical_plan: | LogicalLimit { limit: 4, offset: 0 } LogicalProject { exprs: [$1], expr_alias: [v] } - LogicalScan { table: t, columns: [_row_id, v] } + LogicalScan { table: t, columns: [_row_id#0, v] } - sql: | create table t (v int not null); select * from t offset 4; logical_plan: | LogicalLimit { limit: 9223372036854775807, offset: 4 } LogicalProject { exprs: [$1], expr_alias: [v] } - LogicalScan { table: t, columns: [_row_id, v] } + LogicalScan { table: t, columns: [_row_id#0, v] } - sql: | create table t (v int not null); select * from ( select * from t limit 5 ) limit 4; @@ -20,4 +20,4 @@ LogicalProject { exprs: [$0], expr_alias: [v] } LogicalLimit { limit: 5, offset: 0 } LogicalProject { exprs: [$1], expr_alias: [v] } - LogicalScan { table: t, columns: [_row_id, v] } + LogicalScan { table: t, columns: [_row_id#0, v] } diff --git a/rust/frontend/test_runner/tests/testdata/order_by.yaml b/rust/frontend/test_runner/tests/testdata/order_by.yaml index b6c7d1086b1e0..b63ad3da5212b 100644 --- a/rust/frontend/test_runner/tests/testdata/order_by.yaml +++ b/rust/frontend/test_runner/tests/testdata/order_by.yaml @@ -2,9 +2,9 @@ create table t (v1 bigint, v2 double precision); select * from t order by v1 desc; batch_plan: | - BatchExchange { order: [$1 DESC], dist: Single } - BatchSort { order: [$1 DESC] } - BatchScan { table: t, columns: [_row_id#0, v1, v2] } + BatchExchange { order: [$0 DESC], dist: Single } + BatchSort { order: [$0 DESC] } + BatchScan { table: t, columns: [v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select t.* from t order by v1; diff --git a/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml b/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml index d28a27a734504..1a4d6e2cbb005 100644 --- a/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml +++ b/rust/frontend/test_runner/tests/testdata/predicate_pushdown.yaml @@ -3,28 +3,28 @@ create table t2 (v1 int, v2 int, v3 int); select * from t1 join t2 on t1.v1=t2.v2 and t1.v1>1 where t2.v2>2; logical_plan: | - LogicalProject { exprs: [$0, $1, $2, $3, $4, $5, $6, $7], expr_alias: [_row_id#0, v1, v2, v3, _row_id#0, v1, v2, v3] } + LogicalProject { exprs: [$1, $2, $3, $5, $6, $7], expr_alias: [v1, v2, v3, v1, v2, v3] } LogicalFilter { predicate: ($6 > 2:Int32) } LogicalJoin { type: Inner, on: ($1 = $6) AND ($1 > 1:Int32) } LogicalScan { table: t1, columns: [_row_id#0, v1, v2, v3] } LogicalScan { table: t2, columns: [_row_id#0, v1, v2, v3] } optimized_logical_plan: | - LogicalJoin { type: Inner, on: ($1 = $6) } - LogicalFilter { predicate: ($1 > 1:Int32) } - LogicalScan { table: t1, columns: [_row_id#0, v1, v2, v3] } - LogicalFilter { predicate: ($2 > 2:Int32) } - LogicalScan { table: t2, columns: [_row_id#0, v1, v2, v3] } + LogicalJoin { type: Inner, on: ($0 = $4) } + LogicalFilter { predicate: ($0 > 1:Int32) } + LogicalScan { table: t1, columns: [v1, v2, v3] } + LogicalFilter { predicate: ($1 > 2:Int32) } + LogicalScan { table: t2, columns: [v1, v2, v3] } - sql: | create table t (v1 bigint, v2 double precision); select * from (select * from t) where v2 > 1; logical_plan: | - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } - LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalProject { exprs: [$0, $1], expr_alias: [v1, v2] } + LogicalFilter { predicate: ($1 > 1:Int32) } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | - LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalScan { table: t, columns: [_row_id#0, v1, v2] } + LogicalFilter { predicate: ($1 > 1:Int32) } + LogicalScan { table: t, columns: [v1, v2] } - sql: | create table t (v1 bigint, v2 double precision); select v1 from (select v2, v1 from t) where v2 > 1; diff --git a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml index 38a0325daf70a..da56abb1ee041 100644 --- a/rust/frontend/test_runner/tests/testdata/stream_proto.yaml +++ b/rust/frontend/test_runner/tests/testdata/stream_proto.yaml @@ -14,10 +14,6 @@ tableRefId: tableId: 1 columnDescs: - - columnType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - columnType: typeName: INT32 isNullable: true @@ -26,7 +22,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" pkIndices: - 1 chainNode: @@ -82,11 +78,6 @@ id: 4294967294 name: test columns: - - columnDesc: - columnType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - columnDesc: columnType: typeName: INT32 @@ -97,7 +88,7 @@ typeName: INT64 isNullable: true columnId: 1 - name: _row_id + name: "_row_id#0" isHidden: true pkColumnIds: - 1 @@ -118,10 +109,6 @@ tableRefId: tableId: 1 columnDescs: - - columnType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - columnType: typeName: INT32 isNullable: true @@ -130,7 +117,7 @@ - columnType: typeName: INT64 isNullable: true - name: _row_id + name: "_row_id#0" pkIndices: - 1 chainNode: @@ -165,11 +152,6 @@ id: 4294967294 name: test columns: - - columnDesc: - columnType: - typeName: INT64 - isNullable: true - name: "_row_id#0" - columnDesc: columnType: typeName: INT32 @@ -180,7 +162,7 @@ typeName: INT64 isNullable: true columnId: 1 - name: _row_id + name: "_row_id#0" isHidden: true pkColumnIds: - 1 diff --git a/rust/frontend/test_runner/tests/testdata/subquery.yaml b/rust/frontend/test_runner/tests/testdata/subquery.yaml index 352d6dc027911..351ff319f7d81 100644 --- a/rust/frontend/test_runner/tests/testdata/subquery.yaml +++ b/rust/frontend/test_runner/tests/testdata/subquery.yaml @@ -2,9 +2,9 @@ create table t (v1 bigint, v2 double precision); select v1 from (select * from t) where v2 > 1; logical_plan: | - LogicalProject { exprs: [$1], expr_alias: [v1] } - LogicalFilter { predicate: ($2 > 1:Int32) } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalProject { exprs: [$0], expr_alias: [v1] } + LogicalFilter { predicate: ($1 > 1:Int32) } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | /* merge and then eliminate */ @@ -43,19 +43,19 @@ create table t (v1 bigint, v2 double precision); select * from (select * from t); logical_plan: | - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalProject { exprs: [$0, $1], expr_alias: [v1, v2] } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id#0, v1, v2] } optimized_logical_plan: | - LogicalScan { table: t, columns: [_row_id#0, v1, v2] } + LogicalScan { table: t, columns: [v1, v2] } - sql: | /* joins */ create table t (v1 bigint, v2 double precision); select * from (select * from t), t; logical_plan: | - LogicalProject { exprs: [$0, $1, $2, $3, $4, $5], expr_alias: [_row_id#0, v1, v2, _row_id#0, v1, v2] } + LogicalProject { exprs: [$0, $1, $3, $4], expr_alias: [v1, v2, v1, v2] } LogicalJoin { type: Inner, on: true:Boolean } - LogicalProject { exprs: [$0, $1, $2], expr_alias: [_row_id#0, v1, v2] } + LogicalProject { exprs: [$1, $2], expr_alias: [v1, v2] } LogicalScan { table: t, columns: [_row_id#0, v1, v2] } LogicalScan { table: t, columns: [_row_id#0, v1, v2] } - sql: | diff --git a/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml b/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml index 062de30904adc..31794f1df228f 100644 --- a/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml +++ b/rust/frontend/test_runner/tests/testdata/subquery_expr.yaml @@ -61,7 +61,7 @@ LogicalValues { rows: [[]], schema: Schema { fields: [] } } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id#0, x] } + LogicalProject { exprs: [$1], expr_alias: [x] } LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | create table t1(x int); @@ -86,7 +86,7 @@ LogicalScan { table: t, columns: [_row_id#0, x] } LogicalProject { exprs: [($0 >= 1:Int32)], expr_alias: [ ] } LogicalAgg { group_keys: [], agg_calls: [count] } - LogicalProject { exprs: [$0, $1], expr_alias: [_row_id#0, x] } + LogicalProject { exprs: [$1], expr_alias: [x] } LogicalScan { table: t, columns: [_row_id#0, x] } - sql: | create table t1(x int);