diff --git a/src/frontend/src/binder/delete.rs b/src/frontend/src/binder/delete.rs index c17d14d82e3e..911c4f0c42ac 100644 --- a/src/frontend/src/binder/delete.rs +++ b/src/frontend/src/binder/delete.rs @@ -35,12 +35,14 @@ impl Binder { source_name: ObjectName, selection: Option, ) -> Result { + let (schema_name, table_name) = Self::resolve_table_name(source_name.clone())?; + let table_source = self.bind_table_source(source_name)?; + let table = self.bind_table(&schema_name, &table_name, None)?; let delete = BoundDelete { - table_source: self.bind_table_source(source_name.clone())?, - table: self.bind_table(source_name, None)?, + table_source, + table, selection: selection.map(|expr| self.bind_expr(expr)).transpose()?, }; - Ok(delete) } } diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 9c46e1daeaca..a3c26cb66175 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -31,7 +31,7 @@ pub use bind_context::BindContext; pub use delete::BoundDelete; pub use insert::BoundInsert; pub use query::BoundQuery; -pub use relation::{BoundBaseTable, BoundJoin, BoundTableSource, Relation}; +pub use relation::{BoundBaseTable, BoundJoin, BoundSource, BoundTableSource, Relation}; pub use select::BoundSelect; pub use set_expr::BoundSetExpr; pub use statement::BoundStatement; diff --git a/src/frontend/src/binder/relation.rs b/src/frontend/src/binder/relation.rs index 1745940a9ad8..2e68c4110c86 100644 --- a/src/frontend/src/binder/relation.rs +++ b/src/frontend/src/binder/relation.rs @@ -16,11 +16,9 @@ use std::collections::hash_map::Entry; use std::str::FromStr; use itertools::Itertools; -use risingwave_common::catalog::{ColumnDesc, TableDesc, DEFAULT_SCHEMA_NAME}; -use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::try_match_expand; +use risingwave_common::catalog::{ColumnDesc, DEFAULT_SCHEMA_NAME}; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::DataType; -use risingwave_pb::catalog::source::Info; use risingwave_pb::plan::JoinType; use risingwave_sqlparser::ast::{ JoinConstraint, JoinOperator, ObjectName, Query, TableAlias, TableFactor, TableWithJoins, @@ -29,13 +27,16 @@ use risingwave_sqlparser::ast::{ use super::bind_context::ColumnBinding; use super::{BoundQuery, BoundWindowTableFunction, WindowTableFunctionKind, UNNAMED_SUBQUERY}; use crate::binder::Binder; -use crate::catalog::TableId; +use crate::catalog::source_catalog::SourceCatalog; +use crate::catalog::table_catalog::TableCatalog; +use crate::catalog::{CatalogError, TableId}; use crate::expr::{Expr, ExprImpl}; /// A validated item that refers to a table-like entity, including base table, subquery, join, etc. /// It is usually part of the `from` clause. #[derive(Debug)] pub enum Relation { + Source(Box), BaseTable(Box), Subquery(Box), Join(Box), @@ -54,7 +55,17 @@ pub struct BoundJoin { pub struct BoundBaseTable { pub name: String, // explain-only pub table_id: TableId, - pub table_desc: TableDesc, + pub table_catalog: TableCatalog, +} + +impl From<&TableCatalog> for BoundBaseTable { + fn from(t: &TableCatalog) -> Self { + Self { + name: t.name.clone(), + table_id: t.id, + table_catalog: t.clone(), + } + } } #[derive(Debug)] @@ -62,6 +73,7 @@ pub struct BoundSubquery { pub query: BoundQuery, } +/// `BoundTableSource` is used by DML statement on table source like insert, updata #[derive(Debug)] pub struct BoundTableSource { pub name: String, // explain-only @@ -69,6 +81,17 @@ pub struct BoundTableSource { pub columns: Vec, } +#[derive(Debug)] +pub struct BoundSource { + pub catalog: SourceCatalog, +} + +impl From<&SourceCatalog> for BoundSource { + fn from(s: &SourceCatalog) -> Self { + Self { catalog: s.clone() } + } +} + impl Binder { pub(super) fn bind_vec_table_with_joins( &mut self, @@ -144,7 +167,8 @@ impl Binder { match table_factor { TableFactor::Table { name, alias, args } => { if args.is_empty() { - Ok(Relation::BaseTable(Box::new(self.bind_table(name, alias)?))) + let (schema_name, table_name) = Self::resolve_table_name(name)?; + self.bind_table_or_source(&schema_name, &table_name, alias) } else { let kind = WindowTableFunctionKind::from_str(&name.0[0].value).map_err(|_| { @@ -179,55 +203,87 @@ impl Binder { } } - /// return the (`schema_name`, `table_name`) - pub fn resolve_table_name(name: ObjectName) -> Result<(String, String)> { - let mut identifiers = name.0; - let table_name = identifiers - .pop() - .ok_or_else(|| ErrorCode::InternalError("empty table name".into()))? - .value; + pub(super) fn bind_table_or_source( + &mut self, + schema_name: &str, + table_name: &str, + alias: Option, + ) -> Result { + let (ret, columns) = { + let catalog = &self.catalog; - let schema_name = identifiers - .pop() - .map(|ident| ident.value) - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.into()); + catalog + .get_table_by_name(&self.db_name, schema_name, table_name) + .map(|t| (Relation::BaseTable(Box::new(t.into())), t.columns.clone())) + .or_else(|_| { + catalog + .get_source_by_name(&self.db_name, schema_name, table_name) + .map(|s| (Relation::Source(Box::new(s.into())), s.columns.clone())) + }) + .map_err(|_| { + RwError::from(CatalogError::NotFound( + "table or source", + table_name.to_string(), + )) + })? + }; - Ok((schema_name, table_name)) + self.bind_context( + columns + .iter() + .cloned() + .map(|c| (c.name().to_string(), c.data_type().clone(), c.is_hidden)), + table_name.to_string(), + alias, + )?; + Ok(ret) } pub(super) fn bind_table( &mut self, - name: ObjectName, + schema_name: &str, + table_name: &str, alias: Option, ) -> Result { - let (schema_name, table_name) = Self::resolve_table_name(name)?; - let table_catalog = - self.catalog - .get_table_by_name(&self.db_name, &schema_name, &table_name)?; - - let table_id = table_catalog.id(); - let table_desc = table_catalog.table_desc(); - let columns = table_catalog.columns().to_vec(); + let table_catalog = self + .catalog + .get_table_by_name(&self.db_name, schema_name, table_name)? + .clone(); + let columns = table_catalog.columns.clone(); self.bind_context( - columns.iter().map(|c| { - ( - c.column_desc.name.clone(), - c.column_desc.data_type.clone(), - c.is_hidden, - ) - }), - table_name.clone(), + columns + .iter() + .cloned() + .map(|c| (c.name().to_string(), c.data_type().clone(), c.is_hidden)), + table_name.to_string(), alias, )?; + let table_id = table_catalog.id(); Ok(BoundBaseTable { - name: table_name, - table_desc, + name: table_name.to_string(), table_id, + table_catalog, }) } + /// return the (`schema_name`, `table_name`) + pub fn resolve_table_name(name: ObjectName) -> Result<(String, String)> { + let mut identifiers = name.0; + let table_name = identifiers + .pop() + .ok_or_else(|| ErrorCode::InternalError("empty table name".into()))? + .value; + + let schema_name = identifiers + .pop() + .map(|ident| ident.value) + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.into()); + + Ok((schema_name, table_name)) + } + pub(super) fn bind_table_source(&mut self, name: ObjectName) -> Result { let (schema_name, source_name) = Self::resolve_table_name(name)?; let source = self @@ -235,13 +291,12 @@ impl Binder { .get_source_by_name(&self.db_name, &schema_name, &source_name)?; let source_id = TableId::new(source.id); - let table_source_info = try_match_expand!(source.get_info()?, Info::TableSource)?; - let columns: Vec = table_source_info + let columns = source .columns .iter() .filter(|c| !c.is_hidden) - .map(|c| c.column_desc.as_ref().cloned().unwrap().into()) + .map(|c| c.column_desc.clone()) .collect(); // Note(bugen): do not bind context here. diff --git a/src/frontend/src/binder/window_table_function.rs b/src/frontend/src/binder/window_table_function.rs index 567175228cd7..ecce0ffc9ec0 100644 --- a/src/frontend/src/binder/window_table_function.rs +++ b/src/frontend/src/binder/window_table_function.rs @@ -5,7 +5,7 @@ use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, ObjectName}; -use super::{Binder, BoundBaseTable, Result}; +use super::{Binder, Relation, Result}; use crate::expr::{ExprImpl, InputRef}; #[derive(Copy, Clone, Debug)] @@ -30,7 +30,7 @@ impl FromStr for WindowTableFunctionKind { #[derive(Debug)] pub struct BoundWindowTableFunction { - pub(crate) input: BoundBaseTable, + pub(crate) input: Relation, pub(crate) kind: WindowTableFunctionKind, pub(crate) time_col: InputRef, pub(crate) args: Vec, @@ -60,9 +60,10 @@ impl Binder { ) .into()), }?; + let (schema_name, table_name) = Self::resolve_table_name(table_name)?; // TODO: support alias. - let base = self.bind_table(table_name.clone(), None)?; + let base = self.bind_table_or_source(&schema_name, &table_name, None)?; let Some(time_col_arg) = args.next() else { return Err(ErrorCode::BindError( @@ -79,7 +80,6 @@ impl Binder { self.pop_context(); - let (schema_name, table_name) = Self::resolve_table_name(table_name)?; let table_catalog = self.catalog .get_table_by_name(&self.db_name, &schema_name, &table_name)?; diff --git a/src/frontend/src/catalog/mod.rs b/src/frontend/src/catalog/mod.rs index 90b884b63e10..14318d373a0f 100644 --- a/src/frontend/src/catalog/mod.rs +++ b/src/frontend/src/catalog/mod.rs @@ -20,6 +20,7 @@ pub(crate) mod column_catalog; pub(crate) mod database_catalog; pub(crate) mod root_catalog; pub(crate) mod schema_catalog; +pub(crate) mod source_catalog; pub(crate) mod table_catalog; #[allow(dead_code)] diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 28813993e095..0b3fb1fb43c9 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -16,13 +16,12 @@ use std::collections::HashMap; use risingwave_common::catalog::{CatalogVersion, TableId}; use risingwave_common::error::Result; -use risingwave_meta::manager::SourceId; use risingwave_pb::catalog::{ - source, Database as ProstDatabase, Schema as ProstSchema, Source as ProstSource, - Table as ProstTable, + Database as ProstDatabase, Schema as ProstSchema, Source as ProstSource, Table as ProstTable, }; -use super::CatalogError; +use super::source_catalog::SourceCatalog; +use super::{CatalogError, SourceId}; use crate::catalog::database_catalog::DatabaseCatalog; use crate::catalog::schema_catalog::SchemaCatalog; use crate::catalog::table_catalog::TableCatalog; @@ -145,7 +144,7 @@ impl Catalog { db_name: &str, schema_name: &str, source_name: &str, - ) -> Result<&ProstSource> { + ) -> Result<&SourceCatalog> { self.get_schema_by_name(db_name, schema_name)? .get_source_by_name(source_name) .ok_or_else(|| CatalogError::NotFound("source", source_name.to_string()).into()) @@ -164,11 +163,11 @@ impl Catalog { // Resolve source first. if let Some(source) = schema.get_source_by_name(relation_name) { // TODO: check if it is a materialized source and improve the err msg - match source.info.as_ref().unwrap() { - source::Info::TableSource(_) => { + match source.source_type { + risingwave_pb::stream_plan::source_node::SourceType::Table => { Err(CatalogError::Duplicated("table", relation_name.to_string()).into()) } - source::Info::StreamSource(_) => { + risingwave_pb::stream_plan::source_node::SourceType::Source => { Err(CatalogError::Duplicated("source", relation_name.to_string()).into()) } } diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index ef22cdabda2e..908d723a95f7 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -18,6 +18,7 @@ use risingwave_common::catalog::TableId; use risingwave_meta::manager::SourceId; use risingwave_pb::catalog::{Schema as ProstSchema, Source as ProstSource, Table as ProstTable}; +use super::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableCatalog; use crate::catalog::SchemaId; @@ -28,7 +29,7 @@ pub struct SchemaCatalog { name: String, table_by_name: HashMap, table_name_by_id: HashMap, - source_by_name: HashMap, + source_by_name: HashMap, source_name_by_id: HashMap, } @@ -50,7 +51,9 @@ impl SchemaCatalog { let name = prost.name.clone(); let id = prost.id; - self.source_by_name.try_insert(name.clone(), prost).unwrap(); + self.source_by_name + .try_insert(name.clone(), SourceCatalog::from(&prost)) + .unwrap(); self.source_name_by_id.try_insert(id, name).unwrap(); } pub fn drop_source(&mut self, id: SourceId) { @@ -61,7 +64,7 @@ impl SchemaCatalog { pub fn get_table_by_name(&self, table_name: &str) -> Option<&TableCatalog> { self.table_by_name.get(table_name) } - pub fn get_source_by_name(&self, source_name: &str) -> Option<&ProstSource> { + pub fn get_source_by_name(&self, source_name: &str) -> Option<&SourceCatalog> { self.source_by_name.get(source_name) } diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs new file mode 100644 index 000000000000..5f94c41b96ca --- /dev/null +++ b/src/frontend/src/catalog/source_catalog.rs @@ -0,0 +1,62 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use risingwave_pb::catalog::source::Info; +use risingwave_pb::catalog::Source as ProstSource; +use risingwave_pb::stream_plan::source_node::SourceType; + +use super::column_catalog::ColumnCatalog; +use super::{ColumnId, SourceId}; +use crate::handler::create_table::TABLE_SOURCE_PK_COLID; +/// this struct `SourceCatalog` is used in frontend and compared with `ProstSource` it only maintain +/// information which will be used during optimization. +#[derive(Clone, Debug)] +pub struct SourceCatalog { + pub id: SourceId, + pub name: String, + pub columns: Vec, + pub pk_col_ids: Vec, + pub source_type: SourceType, +} + +impl From<&ProstSource> for SourceCatalog { + fn from(prost: &ProstSource) -> Self { + let id = prost.id; + let name = prost.name.clone(); + let (source_type, prost_columns, pk_col_ids) = match &prost.info { + Some(Info::StreamSource(source)) => ( + SourceType::Source, + source.columns.clone(), + source + .pk_column_ids + .iter() + .map(|id| ColumnId::new(*id)) + .collect(), + ), + Some(Info::TableSource(source)) => ( + SourceType::Table, + source.columns.clone(), + vec![TABLE_SOURCE_PK_COLID], + ), + None => unreachable!(), + }; + let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect(); + Self { + id, + name, + columns, + pk_col_ids, + source_type, + } + } +} diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 43ef7a07aa93..b573123d1633 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -135,13 +135,10 @@ pub mod tests { use std::collections::HashMap; use std::io::Write; - use itertools::Itertools; use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; use risingwave_common::types::DataType; use tempfile::NamedTempFile; - use super::*; - use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::gen_row_id_column_name; use crate::test_utils::LocalFrontend; @@ -201,40 +198,34 @@ pub mod tests { assert_eq!(source.name, "t"); // Only check stream source - if let Info::StreamSource(info) = source.info.as_ref().unwrap() { - let catalogs: Vec = info - .columns - .iter() - .map(|col| col.clone().into()) - .collect_vec(); - let mut columns = vec![]; - - // Get all column descs - for catalog in catalogs { - columns.append(&mut catalog.column_desc.get_column_descs()); - } - let columns = columns - .iter() - .map(|col| (col.name.as_str(), col.data_type.clone())) - .collect::>(); - - let city_type = DataType::Struct { - fields: vec![DataType::Varchar, DataType::Varchar].into(), - }; - let row_id_col_name = gen_row_id_column_name(0); - let expected_columns = maplit::hashmap! { - row_id_col_name.as_str() => DataType::Int32, - "id" => DataType::Int32, - "country.zipcode" => DataType::Varchar, - "zipcode" => DataType::Int64, - "country.city.address" => DataType::Varchar, - "country.address" => DataType::Varchar, - "country.city" => city_type.clone(), - "country.city.zipcode" => DataType::Varchar, - "rate" => DataType::Float32, - "country" => DataType::Struct {fields:vec![DataType::Varchar,city_type,DataType::Varchar].into()}, - }; - assert_eq!(columns, expected_columns); + let catalogs = source.columns; + let mut columns = vec![]; + + // Get all column descs + for catalog in catalogs { + columns.append(&mut catalog.column_desc.get_column_descs()); } + let columns = columns + .iter() + .map(|col| (col.name.as_str(), col.data_type.clone())) + .collect::>(); + + let city_type = DataType::Struct { + fields: vec![DataType::Varchar, DataType::Varchar].into(), + }; + let row_id_col_name = gen_row_id_column_name(0); + let expected_columns = maplit::hashmap! { + row_id_col_name.as_str() => DataType::Int32, + "id" => DataType::Int32, + "country.zipcode" => DataType::Varchar, + "zipcode" => DataType::Int64, + "country.city.address" => DataType::Varchar, + "country.address" => DataType::Varchar, + "country.city" => city_type.clone(), + "country.city.zipcode" => DataType::Varchar, + "rate" => DataType::Float32, + "country" => DataType::Struct {fields:vec![DataType::Varchar,city_type,DataType::Varchar].into()}, + }; + assert_eq!(columns, expected_columns); } } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 4274c94615e2..99326acbd40a 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::rc::Rc; + use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; @@ -26,10 +28,12 @@ use risingwave_sqlparser::ast::{ColumnDef, ObjectName}; use crate::binder::expr::bind_data_type; use crate::binder::Binder; use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ROWID_PREFIX}; -use crate::optimizer::plan_node::StreamSource; +use crate::optimizer::plan_node::{LogicalSource, StreamSource}; use crate::optimizer::property::{Distribution, Order}; use crate::optimizer::{PlanRef, PlanRoot}; use crate::session::{OptimizerContext, OptimizerContextRef, SessionImpl}; +// FIXME: store PK columns in ProstTableSourceInfo as Catalog information, and then remove this +pub const TABLE_SOURCE_PK_COLID: ColumnId = ColumnId::new(0); pub fn gen_create_table_plan( session: &SessionImpl, @@ -49,7 +53,7 @@ pub fn gen_create_table_plan( // Put the hidden row id column in the first column. This is used for PK. column_descs.push(ColumnDesc { data_type: DataType::Int64, - column_id: ColumnId::new(0), + column_id: TABLE_SOURCE_PK_COLID, name: gen_row_id_column_name(0), field_descs: vec![], type_name: "".to_string(), @@ -97,12 +101,8 @@ pub fn gen_create_table_plan( let materialize = { // Manually assemble the materialization plan for the table. - let source_node: PlanRef = StreamSource::create( - context, - vec![0], // row id column as pk - source.clone(), - ) - .into(); + let source_node: PlanRef = + StreamSource::new(LogicalSource::new(Rc::new((&source).into()), context)).into(); let mut required_cols = FixedBitSet::with_capacity(source_node.schema().len()); required_cols.toggle_range(..); required_cols.toggle(0); diff --git a/src/frontend/src/handler/show_source.rs b/src/frontend/src/handler/show_source.rs index 3fdb43f10b8c..ba41025bdfc5 100644 --- a/src/frontend/src/handler/show_source.rs +++ b/src/frontend/src/handler/show_source.rs @@ -36,9 +36,10 @@ pub async fn handle_show_source( // Get prost column_descs from source info and into column_descs let columns: Vec = catalog_reader .get_source_by_name(session.database(), &schema_name, &source_name)? - .get_column_descs() + .columns .iter() - .map(|c| c.into()) + .filter(|c| !c.is_hidden) + .map(|c| c.column_desc.clone()) .collect_vec(); // Convert all column_descs to rows diff --git a/src/frontend/src/optimizer/plan_node/logical_insert.rs b/src/frontend/src/optimizer/plan_node/logical_insert.rs index 46e8d26d8e9e..68d8e660fc1f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_insert.rs +++ b/src/frontend/src/optimizer/plan_node/logical_insert.rs @@ -20,7 +20,7 @@ use risingwave_common::error::Result; use risingwave_common::types::DataType; use super::{BatchInsert, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream}; -use crate::catalog::{ColumnId, TableId}; +use crate::catalog::TableId; /// `LogicalInsert` iterates on input relation and insert the data into specified table. /// @@ -31,18 +31,12 @@ pub struct LogicalInsert { pub base: PlanBase, table_source_name: String, // explain-only source_id: TableId, // TODO: use SourceId - columns: Vec, input: PlanRef, } impl LogicalInsert { /// Create a [`LogicalInsert`] node. Used internally by optimizer. - pub fn new( - input: PlanRef, - table_source_name: String, - source_id: TableId, - columns: Vec, - ) -> Self { + pub fn new(input: PlanRef, table_source_name: String, source_id: TableId) -> Self { let ctx = input.ctx(); let schema = Schema::new(vec![Field::unnamed(DataType::Int64)]); let base = PlanBase::new_logical(ctx, schema, vec![]); @@ -50,19 +44,13 @@ impl LogicalInsert { base, table_source_name, source_id, - columns, input, } } /// Create a [`LogicalInsert`] node. Used by planner. - pub fn create( - input: PlanRef, - table_source_name: String, - source_id: TableId, - columns: Vec, - ) -> Result { - Ok(Self::new(input, table_source_name, source_id, columns)) + pub fn create(input: PlanRef, table_source_name: String, source_id: TableId) -> Result { + Ok(Self::new(input, table_source_name, source_id)) } pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter, name: &str) -> fmt::Result { @@ -82,12 +70,7 @@ impl PlanTreeNodeUnary for LogicalInsert { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new( - input, - self.table_source_name.clone(), - self.source_id, - self.columns.clone(), - ) + Self::new(input, self.table_source_name.clone(), self.source_id) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 9e93c653ad4e..d7644be1f5a9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -99,6 +99,7 @@ impl LogicalScan { .map(|f| f.name.clone()) .collect() } + pub fn table_name(&self) -> &str { &self.table_name } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs new file mode 100644 index 000000000000..e15f3bbc3850 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -0,0 +1,109 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt; +use std::rc::Rc; + +use fixedbitset::FixedBitSet; +use risingwave_common::catalog::Schema; + +use super::{ + ColPrunable, LogicalProject, PlanBase, PlanNode, PlanRef, StreamSource, ToBatch, ToStream, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::session::OptimizerContextRef; +use crate::utils::ColIndexMapping; + +/// `LogicalSource` returns contents of a table or other equivalent object +#[derive(Debug, Clone)] +pub struct LogicalSource { + pub base: PlanBase, + pub source_catalog: Rc, +} + +impl LogicalSource { + pub fn new(source_catalog: Rc, ctx: OptimizerContextRef) -> Self { + let mut id_to_idx = HashMap::new(); + let fields = source_catalog + .columns + .iter() + .enumerate() + .map(|(idx, c)| { + id_to_idx.insert(c.column_id(), idx); + (&c.column_desc).into() + }) + .collect(); + let pk_indices = source_catalog + .pk_col_ids + .iter() + .map(|c| id_to_idx.get(c).copied()) + .collect::>>() + .unwrap_or_default(); + let schema = Schema { fields }; + let base = PlanBase::new_logical(ctx, schema, pk_indices); + LogicalSource { + base, + source_catalog, + } + } + + pub(super) fn column_names(&self) -> Vec { + self.schema() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + } +} + +impl_plan_tree_node_for_leaf! {LogicalSource} + +impl fmt::Display for LogicalSource { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "LogicalSource {{ source: {}, columns: [{}] }}", + self.source_catalog.name, + self.column_names().join(", ") + ) + } +} + +impl ColPrunable for LogicalSource { + fn prune_col(&self, required_cols: &FixedBitSet) -> PlanRef { + self.must_contain_columns(required_cols); + let mapping = ColIndexMapping::with_remaining_columns(required_cols); + LogicalProject::with_mapping(self.clone().into(), mapping) + } +} + +impl ToBatch for LogicalSource { + fn to_batch(&self) -> PlanRef { + panic!("there is no batch source operator"); + } +} + +impl ToStream for LogicalSource { + fn to_stream(&self) -> PlanRef { + StreamSource::new(self.clone()).into() + } + + fn logical_rewrite_for_stream(&self) -> (PlanRef, ColIndexMapping) { + ( + self.clone().into(), + ColIndexMapping::identity(self.schema().len()), + ) + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index b45cb23d127d..273cdd2472e8 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -223,6 +223,7 @@ mod logical_join; mod logical_limit; mod logical_project; mod logical_scan; +mod logical_source; mod logical_topn; mod logical_values; mod stream_exchange; @@ -256,6 +257,7 @@ pub use logical_join::LogicalJoin; pub use logical_limit::LogicalLimit; pub use logical_project::LogicalProject; pub use logical_scan::LogicalScan; +pub use logical_source::LogicalSource; pub use logical_topn::LogicalTopN; pub use logical_values::LogicalValues; pub use stream_exchange::StreamExchange; @@ -292,6 +294,7 @@ macro_rules! for_all_plan_nodes { ,{ Logical, Filter } ,{ Logical, Project } ,{ Logical, Scan } + ,{ Logical, Source } ,{ Logical, Insert } ,{ Logical, Delete } ,{ Logical, Join } @@ -334,6 +337,7 @@ macro_rules! for_logical_plan_nodes { ,{ Logical, Filter } ,{ Logical, Project } ,{ Logical, Scan } + ,{ Logical, Source } ,{ Logical, Insert } ,{ Logical, Delete } ,{ Logical, Join } diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 49b3dd7da0c4..cb1b7ec4fdb8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -14,59 +14,30 @@ use std::fmt; -use itertools::Itertools; -use risingwave_common::catalog::{ColumnDesc, Field, Schema}; -use risingwave_pb::catalog::source::Info; -use risingwave_pb::catalog::Source as ProstSource; use risingwave_pb::plan::TableRefId; -use risingwave_pb::stream_plan::source_node::SourceType; use risingwave_pb::stream_plan::stream_node::Node as ProstStreamNode; use risingwave_pb::stream_plan::SourceNode; -use super::{PlanBase, ToStreamProst}; +use super::{LogicalSource, PlanBase, ToStreamProst}; use crate::optimizer::property::Distribution; -use crate::session::OptimizerContextRef; /// [`StreamSource`] represents a table/connector source at the very beginning of the graph. #[derive(Debug, Clone)] pub struct StreamSource { pub base: PlanBase, - source_catalog: ProstSource, - col_descs: Vec, - source_type: SourceType, + logical: LogicalSource, } impl StreamSource { - pub fn create( - ctx: OptimizerContextRef, - pk_idx: Vec, - source_catalog: ProstSource, - ) -> Self { - let (source_type, prost_columns) = match &source_catalog.info { - Some(Info::StreamSource(source)) => (SourceType::Source, source.columns.clone()), - Some(Info::TableSource(source)) => (SourceType::Table, source.columns.clone()), - None => unreachable!(), - }; - let col_descs = prost_columns - .iter() - .map(|c| c.column_desc.as_ref().unwrap()) - .map(ColumnDesc::from) - .collect_vec(); - - let fields = col_descs.iter().map(Field::from).collect(); + pub fn new(logical: LogicalSource) -> Self { let base = PlanBase::new_stream( - ctx, - Schema { fields }, - pk_idx, + logical.ctx(), + logical.schema().clone(), + logical.pk_indices().to_vec(), Distribution::any().clone(), false, // TODO: determine the `append-only` field of source ); - Self { - base, - source_catalog, - col_descs, - source_type, - } + Self { base, logical } } pub fn column_names(&self) -> Vec { self.schema() @@ -84,7 +55,7 @@ impl fmt::Display for StreamSource { write!( f, "StreamSource {{ source: {}, columns: [{}] }}", - self.source_catalog.name, + self.logical.source_catalog.name, self.column_names().join(", ") ) } @@ -95,12 +66,18 @@ impl ToStreamProst for StreamSource { ProstStreamNode::SourceNode(SourceNode { // TODO: Refactor this id table_ref_id: TableRefId { - table_id: self.source_catalog.id as i32, + table_id: self.logical.source_catalog.id as i32, ..Default::default() } .into(), - column_ids: self.col_descs.iter().map(|c| c.column_id.into()).collect(), - source_type: self.source_type as i32, + column_ids: self + .logical + .source_catalog + .columns + .iter() + .map(|c| c.column_id().into()) + .collect(), + source_type: self.logical.source_catalog.source_type as i32, }) } } diff --git a/src/frontend/src/planner/insert.rs b/src/frontend/src/planner/insert.rs index bc1934502899..95f845eb5d82 100644 --- a/src/frontend/src/planner/insert.rs +++ b/src/frontend/src/planner/insert.rs @@ -29,7 +29,6 @@ impl Planner { input, insert.table_source.name, insert.table_source.source_id, - vec![], )? .into(); let order = Order::any().clone(); diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 387d73e4712b..b2315ab380b7 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -18,10 +18,13 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use crate::binder::{ - BoundBaseTable, BoundJoin, BoundWindowTableFunction, Relation, WindowTableFunctionKind, + BoundBaseTable, BoundJoin, BoundSource, BoundWindowTableFunction, Relation, + WindowTableFunctionKind, }; use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef}; -use crate::optimizer::plan_node::{LogicalJoin, LogicalProject, LogicalScan, PlanRef}; +use crate::optimizer::plan_node::{ + LogicalJoin, LogicalProject, LogicalScan, LogicalSource, PlanRef, +}; use crate::planner::Planner; impl Planner { @@ -32,11 +35,20 @@ impl Planner { Relation::Subquery(q) => Ok(self.plan_query(q.query)?.as_subplan()), Relation::Join(join) => self.plan_join(*join), Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf), + Relation::Source(s) => self.plan_source(*s), } } pub(super) fn plan_base_table(&mut self, base_table: BoundBaseTable) -> Result { - LogicalScan::create(base_table.name, Rc::new(base_table.table_desc), self.ctx()) + LogicalScan::create( + base_table.name, + Rc::new(base_table.table_catalog.table_desc()), + self.ctx(), + ) + } + + pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { + Ok(LogicalSource::new(Rc::new(source.catalog), self.ctx()).into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { @@ -67,21 +79,24 @@ impl Planner { fn plan_tumble_window( &mut self, - input: BoundBaseTable, + input: Relation, time_col: InputRef, args: Vec, ) -> Result { let mut args = args.into_iter(); + + let cols = match &input { + Relation::Source(s) => s.catalog.columns.to_vec(), + Relation::BaseTable(t) => t.table_catalog.columns().to_vec(), + _ => return Err(ErrorCode::BindError("the ".to_string()).into()), + }; + match (args.next(), args.next()) { (Some(window_size @ ExprImpl::Literal(_)), None) => { - let cols = &input.table_desc.columns; let mut exprs = Vec::with_capacity(cols.len() + 2); let mut expr_aliases = Vec::with_capacity(cols.len() + 2); for (idx, col) in cols.iter().enumerate() { - exprs.push(ExprImpl::InputRef(Box::new(InputRef::new( - idx, - col.data_type.clone(), - )))); + exprs.push(InputRef::new(idx, col.data_type().clone()).into()); expr_aliases.push(None); } let window_start = @@ -103,7 +118,7 @@ impl Planner { exprs.push(window_end); expr_aliases.push(Some("window_start".to_string())); expr_aliases.push(Some("window_end".to_string())); - let base = self.plan_base_table(input)?; + let base = self.plan_relation(input)?; let project = LogicalProject::create(base, exprs, expr_aliases); Ok(project) } diff --git a/src/frontend/test_runner/tests/testdata/basic_query_1.yaml b/src/frontend/test_runner/tests/testdata/basic_query_1.yaml index 9ca7185f740e..d1d3d873ad96 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query_1.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query_1.yaml @@ -2,7 +2,7 @@ batch_plan: | BatchValues { rows: [[11:Int32, 22:Int32], [(33:Int32 + (1:Int32 + 2:Int32)), 44:Int32]] } - sql: select * from t - binder_error: 'Catalog error: table not found: t' + binder_error: 'Catalog error: table or source not found: t' - sql: | create table t (v1 bigint, v2 double precision); select * from t;