Skip to content

Commit

Permalink
feat(frontend): support mv on source (risingwavelabs#1623)
Browse files Browse the repository at this point in the history
* change add source binder

* add struct source catalog

* fix source catalog

* mass commit

* fix clippy

* fix create&&show source ut

* fix ut

* fix clippy

* refactor

* fix clippy

* fix clippy
  • Loading branch information
st1page authored Apr 7, 2022
1 parent f14709d commit 31a9c85
Show file tree
Hide file tree
Showing 19 changed files with 383 additions and 181 deletions.
8 changes: 5 additions & 3 deletions src/frontend/src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ impl Binder {
source_name: ObjectName,
selection: Option<Expr>,
) -> Result<BoundDelete> {
let (schema_name, table_name) = Self::resolve_table_name(source_name.clone())?;
let table_source = self.bind_table_source(source_name)?;
let table = self.bind_table(&schema_name, &table_name, None)?;
let delete = BoundDelete {
table_source: self.bind_table_source(source_name.clone())?,
table: self.bind_table(source_name, None)?,
table_source,
table,
selection: selection.map(|expr| self.bind_expr(expr)).transpose()?,
};

Ok(delete)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use bind_context::BindContext;
pub use delete::BoundDelete;
pub use insert::BoundInsert;
pub use query::BoundQuery;
pub use relation::{BoundBaseTable, BoundJoin, BoundTableSource, Relation};
pub use relation::{BoundBaseTable, BoundJoin, BoundSource, BoundTableSource, Relation};
pub use select::BoundSelect;
pub use set_expr::BoundSetExpr;
pub use statement::BoundStatement;
Expand Down
137 changes: 96 additions & 41 deletions src/frontend/src/binder/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ use std::collections::hash_map::Entry;
use std::str::FromStr;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, TableDesc, DEFAULT_SCHEMA_NAME};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::try_match_expand;
use risingwave_common::catalog::{ColumnDesc, DEFAULT_SCHEMA_NAME};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::catalog::source::Info;
use risingwave_pb::plan::JoinType;
use risingwave_sqlparser::ast::{
JoinConstraint, JoinOperator, ObjectName, Query, TableAlias, TableFactor, TableWithJoins,
Expand All @@ -29,13 +27,16 @@ use risingwave_sqlparser::ast::{
use super::bind_context::ColumnBinding;
use super::{BoundQuery, BoundWindowTableFunction, WindowTableFunctionKind, UNNAMED_SUBQUERY};
use crate::binder::Binder;
use crate::catalog::TableId;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::{CatalogError, TableId};
use crate::expr::{Expr, ExprImpl};

/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
/// It is usually part of the `from` clause.
#[derive(Debug)]
pub enum Relation {
Source(Box<BoundSource>),
BaseTable(Box<BoundBaseTable>),
Subquery(Box<BoundSubquery>),
Join(Box<BoundJoin>),
Expand All @@ -54,21 +55,43 @@ pub struct BoundJoin {
pub struct BoundBaseTable {
pub name: String, // explain-only
pub table_id: TableId,
pub table_desc: TableDesc,
pub table_catalog: TableCatalog,
}

impl From<&TableCatalog> for BoundBaseTable {
fn from(t: &TableCatalog) -> Self {
Self {
name: t.name.clone(),
table_id: t.id,
table_catalog: t.clone(),
}
}
}

#[derive(Debug)]
pub struct BoundSubquery {
pub query: BoundQuery,
}

/// `BoundTableSource` is used by DML statement on table source like insert, updata
#[derive(Debug)]
pub struct BoundTableSource {
pub name: String, // explain-only
pub source_id: TableId, // TODO: refactor to source id
pub columns: Vec<ColumnDesc>,
}

#[derive(Debug)]
pub struct BoundSource {
pub catalog: SourceCatalog,
}

impl From<&SourceCatalog> for BoundSource {
fn from(s: &SourceCatalog) -> Self {
Self { catalog: s.clone() }
}
}

impl Binder {
pub(super) fn bind_vec_table_with_joins(
&mut self,
Expand Down Expand Up @@ -144,7 +167,8 @@ impl Binder {
match table_factor {
TableFactor::Table { name, alias, args } => {
if args.is_empty() {
Ok(Relation::BaseTable(Box::new(self.bind_table(name, alias)?)))
let (schema_name, table_name) = Self::resolve_table_name(name)?;
self.bind_table_or_source(&schema_name, &table_name, alias)
} else {
let kind =
WindowTableFunctionKind::from_str(&name.0[0].value).map_err(|_| {
Expand Down Expand Up @@ -179,69 +203,100 @@ impl Binder {
}
}

/// return the (`schema_name`, `table_name`)
pub fn resolve_table_name(name: ObjectName) -> Result<(String, String)> {
let mut identifiers = name.0;
let table_name = identifiers
.pop()
.ok_or_else(|| ErrorCode::InternalError("empty table name".into()))?
.value;
pub(super) fn bind_table_or_source(
&mut self,
schema_name: &str,
table_name: &str,
alias: Option<TableAlias>,
) -> Result<Relation> {
let (ret, columns) = {
let catalog = &self.catalog;

let schema_name = identifiers
.pop()
.map(|ident| ident.value)
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.into());
catalog
.get_table_by_name(&self.db_name, schema_name, table_name)
.map(|t| (Relation::BaseTable(Box::new(t.into())), t.columns.clone()))
.or_else(|_| {
catalog
.get_source_by_name(&self.db_name, schema_name, table_name)
.map(|s| (Relation::Source(Box::new(s.into())), s.columns.clone()))
})
.map_err(|_| {
RwError::from(CatalogError::NotFound(
"table or source",
table_name.to_string(),
))
})?
};

Ok((schema_name, table_name))
self.bind_context(
columns
.iter()
.cloned()
.map(|c| (c.name().to_string(), c.data_type().clone(), c.is_hidden)),
table_name.to_string(),
alias,
)?;
Ok(ret)
}

pub(super) fn bind_table(
&mut self,
name: ObjectName,
schema_name: &str,
table_name: &str,
alias: Option<TableAlias>,
) -> Result<BoundBaseTable> {
let (schema_name, table_name) = Self::resolve_table_name(name)?;
let table_catalog =
self.catalog
.get_table_by_name(&self.db_name, &schema_name, &table_name)?;

let table_id = table_catalog.id();
let table_desc = table_catalog.table_desc();
let columns = table_catalog.columns().to_vec();
let table_catalog = self
.catalog
.get_table_by_name(&self.db_name, schema_name, table_name)?
.clone();
let columns = table_catalog.columns.clone();

self.bind_context(
columns.iter().map(|c| {
(
c.column_desc.name.clone(),
c.column_desc.data_type.clone(),
c.is_hidden,
)
}),
table_name.clone(),
columns
.iter()
.cloned()
.map(|c| (c.name().to_string(), c.data_type().clone(), c.is_hidden)),
table_name.to_string(),
alias,
)?;

let table_id = table_catalog.id();
Ok(BoundBaseTable {
name: table_name,
table_desc,
name: table_name.to_string(),
table_id,
table_catalog,
})
}

/// return the (`schema_name`, `table_name`)
pub fn resolve_table_name(name: ObjectName) -> Result<(String, String)> {
let mut identifiers = name.0;
let table_name = identifiers
.pop()
.ok_or_else(|| ErrorCode::InternalError("empty table name".into()))?
.value;

let schema_name = identifiers
.pop()
.map(|ident| ident.value)
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.into());

Ok((schema_name, table_name))
}

pub(super) fn bind_table_source(&mut self, name: ObjectName) -> Result<BoundTableSource> {
let (schema_name, source_name) = Self::resolve_table_name(name)?;
let source = self
.catalog
.get_source_by_name(&self.db_name, &schema_name, &source_name)?;

let source_id = TableId::new(source.id);
let table_source_info = try_match_expand!(source.get_info()?, Info::TableSource)?;

let columns: Vec<ColumnDesc> = table_source_info
let columns = source
.columns
.iter()
.filter(|c| !c.is_hidden)
.map(|c| c.column_desc.as_ref().cloned().unwrap().into())
.map(|c| c.column_desc.clone())
.collect();

// Note(bugen): do not bind context here.
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/binder/window_table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, ObjectName};

use super::{Binder, BoundBaseTable, Result};
use super::{Binder, Relation, Result};
use crate::expr::{ExprImpl, InputRef};

#[derive(Copy, Clone, Debug)]
Expand All @@ -30,7 +30,7 @@ impl FromStr for WindowTableFunctionKind {

#[derive(Debug)]
pub struct BoundWindowTableFunction {
pub(crate) input: BoundBaseTable,
pub(crate) input: Relation,
pub(crate) kind: WindowTableFunctionKind,
pub(crate) time_col: InputRef,
pub(crate) args: Vec<ExprImpl>,
Expand Down Expand Up @@ -60,9 +60,10 @@ impl Binder {
)
.into()),
}?;
let (schema_name, table_name) = Self::resolve_table_name(table_name)?;

// TODO: support alias.
let base = self.bind_table(table_name.clone(), None)?;
let base = self.bind_table_or_source(&schema_name, &table_name, None)?;

let Some(time_col_arg) = args.next() else {
return Err(ErrorCode::BindError(
Expand All @@ -79,7 +80,6 @@ impl Binder {

self.pop_context();

let (schema_name, table_name) = Self::resolve_table_name(table_name)?;
let table_catalog =
self.catalog
.get_table_by_name(&self.db_name, &schema_name, &table_name)?;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub(crate) mod column_catalog;
pub(crate) mod database_catalog;
pub(crate) mod root_catalog;
pub(crate) mod schema_catalog;
pub(crate) mod source_catalog;
pub(crate) mod table_catalog;

#[allow(dead_code)]
Expand Down
15 changes: 7 additions & 8 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ use std::collections::HashMap;

use risingwave_common::catalog::{CatalogVersion, TableId};
use risingwave_common::error::Result;
use risingwave_meta::manager::SourceId;
use risingwave_pb::catalog::{
source, Database as ProstDatabase, Schema as ProstSchema, Source as ProstSource,
Table as ProstTable,
Database as ProstDatabase, Schema as ProstSchema, Source as ProstSource, Table as ProstTable,
};

use super::CatalogError;
use super::source_catalog::SourceCatalog;
use super::{CatalogError, SourceId};
use crate::catalog::database_catalog::DatabaseCatalog;
use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::table_catalog::TableCatalog;
Expand Down Expand Up @@ -145,7 +144,7 @@ impl Catalog {
db_name: &str,
schema_name: &str,
source_name: &str,
) -> Result<&ProstSource> {
) -> Result<&SourceCatalog> {
self.get_schema_by_name(db_name, schema_name)?
.get_source_by_name(source_name)
.ok_or_else(|| CatalogError::NotFound("source", source_name.to_string()).into())
Expand All @@ -164,11 +163,11 @@ impl Catalog {
// Resolve source first.
if let Some(source) = schema.get_source_by_name(relation_name) {
// TODO: check if it is a materialized source and improve the err msg
match source.info.as_ref().unwrap() {
source::Info::TableSource(_) => {
match source.source_type {
risingwave_pb::stream_plan::source_node::SourceType::Table => {
Err(CatalogError::Duplicated("table", relation_name.to_string()).into())
}
source::Info::StreamSource(_) => {
risingwave_pb::stream_plan::source_node::SourceType::Source => {
Err(CatalogError::Duplicated("source", relation_name.to_string()).into())
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::catalog::TableId;
use risingwave_meta::manager::SourceId;
use risingwave_pb::catalog::{Schema as ProstSchema, Source as ProstSource, Table as ProstTable};

use super::source_catalog::SourceCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::SchemaId;

Expand All @@ -28,7 +29,7 @@ pub struct SchemaCatalog {
name: String,
table_by_name: HashMap<String, TableCatalog>,
table_name_by_id: HashMap<TableId, String>,
source_by_name: HashMap<String, ProstSource>,
source_by_name: HashMap<String, SourceCatalog>,
source_name_by_id: HashMap<SourceId, String>,
}

Expand All @@ -50,7 +51,9 @@ impl SchemaCatalog {
let name = prost.name.clone();
let id = prost.id;

self.source_by_name.try_insert(name.clone(), prost).unwrap();
self.source_by_name
.try_insert(name.clone(), SourceCatalog::from(&prost))
.unwrap();
self.source_name_by_id.try_insert(id, name).unwrap();
}
pub fn drop_source(&mut self, id: SourceId) {
Expand All @@ -61,7 +64,7 @@ impl SchemaCatalog {
pub fn get_table_by_name(&self, table_name: &str) -> Option<&TableCatalog> {
self.table_by_name.get(table_name)
}
pub fn get_source_by_name(&self, source_name: &str) -> Option<&ProstSource> {
pub fn get_source_by_name(&self, source_name: &str) -> Option<&SourceCatalog> {
self.source_by_name.get(source_name)
}

Expand Down
Loading

0 comments on commit 31a9c85

Please sign in to comment.