Skip to content

Commit

Permalink
refactor(optimizer): stream materialize && handle create table && han…
Browse files Browse the repository at this point in the history
…dle create mv (risingwavelabs#1314)
  • Loading branch information
st1page authored Mar 28, 2022
1 parent f45ce1c commit c80c92b
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 350 deletions.
39 changes: 34 additions & 5 deletions rust/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ use risingwave_pb::catalog::Table as ProstTable;
use risingwave_pb::plan::OrderType as ProstOrderType;

use super::column_catalog::ColumnCatalog;
use super::{DatabaseId, SchemaId};
use crate::catalog::TableId;

#[derive(Clone, Debug, PartialEq)]
pub struct TableCatalog {
id: TableId,
associated_source_id: Option<TableId>, // TODO: use SourceId
name: String,
columns: Vec<ColumnCatalog>,
pk_desc: Vec<OrderedColumnDesc>,
pub id: TableId,
pub associated_source_id: Option<TableId>, // TODO: use SourceId
pub name: String,
pub columns: Vec<ColumnCatalog>,
pub pk_desc: Vec<OrderedColumnDesc>,
}

impl TableCatalog {
Expand Down Expand Up @@ -68,6 +69,33 @@ impl TableCatalog {
pub fn name(&self) -> &str {
self.name.as_ref()
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> ProstTable {
let (pk_column_ids, pk_orders) = self
.pk_desc()
.iter()
.map(|col| {
(
col.column_desc.column_id.get_id(),
col.order.to_prost() as i32,
)
})
.unzip();

ProstTable {
id: self.id.table_id as u32,
schema_id,
database_id,
name: self.name.clone(),
columns: self.columns().iter().map(|c| c.to_protobuf()).collect(),
pk_column_ids,
pk_orders,
dependent_relations: vec![],
optional_associated_source_id: self
.associated_source_id
.map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
}
}
}

impl From<ProstTable> for TableCatalog {
Expand Down Expand Up @@ -114,6 +142,7 @@ impl From<ProstTable> for TableCatalog {
}
}
}

impl From<&ProstTable> for TableCatalog {
fn from(tb: &ProstTable) -> Self {
tb.clone().into()
Expand Down
215 changes: 32 additions & 183 deletions rust/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,179 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use pgwire::pg_response::PgResponse;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::Result;
use risingwave_pb::catalog::Table as ProstTable;
use risingwave_sqlparser::ast::{ObjectName, Query};

use crate::binder::{Binder, BoundQuery};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::{ColumnId, TableId};
use crate::optimizer::plan_node::StreamMaterialize;
use crate::optimizer::property::{FieldOrder, Order};
use crate::binder::Binder;
use crate::optimizer::property::Distribution;
use crate::optimizer::PlanRef;
use crate::planner::Planner;
use crate::session::{OptimizerContext, SessionImpl};

impl BoundQuery {
/// Generate create MV's column desc from query.
pub fn gen_create_mv_column_desc(&self) -> Vec<ColumnDesc> {
let mut column_descs = vec![];

for (i, (data_type, name)) in self
.data_types()
.iter()
.zip_eq(self.names().iter())
.enumerate()
{
column_descs.push(ColumnDesc {
data_type: data_type.clone(),
column_id: ColumnId::new(i as i32),
name: name.to_string(),
field_descs: vec![],
type_name: "".to_string(),
});
}

column_descs
}
}

/// Mview information when handling create
pub struct MvInfo {
pub table_name: String,
pub database_id: u32,
pub schema_id: u32,
}

impl MvInfo {
/// Generate [`MvInfo`] with the table name. Note that this cannot be used to actually create an
/// MV.
pub fn with_name(name: impl Into<String>) -> Self {
Self {
table_name: name.into(),
database_id: u32::MAX,
schema_id: u32::MAX,
}
}
}
use crate::session::{OptimizerContext, OptimizerContextRef, SessionImpl};

/// Generate create MV plan, return plan and mv table info.
pub fn gen_create_mv_plan(
session: &SessionImpl,
planner: &mut Planner,
query: Query,
info: MvInfo,
context: OptimizerContextRef,
query: Box<Query>,
name: ObjectName,
) -> Result<(PlanRef, ProstTable)> {
// For create MV plan, we currently assume column id == column index.
// If there are anything that would be changed in the future, please carefully revisit this
// function.

let bound_query = Binder::new(
session.env().catalog_reader().read_guard(),
session.database().to_string(),
)
.bind_query(query)?;

let mut column_orders = bound_query.order.clone();

// The `column_catalog` stores mapping of the position of materialize node's input to column
// catalog. column catalog currently only contains the column selected by users.
let mut column_catalog: HashMap<usize, ColumnCatalog> = bound_query
.gen_create_mv_column_desc()
.into_iter()
.enumerate()
.map(|(idx, column_desc)| {
(
idx,
ColumnCatalog {
column_desc,
is_hidden: false,
},
)
})
.collect();

let mut logical = planner.plan_query(bound_query)?;

let plan = logical.gen_create_mv_plan();

let pks = plan.pk_indices();

// Now we will need to add the pks (which might not be explicitly selected by users) into the
// final table. We add pk into column orders and column catalogs.

let ordered_ids: HashSet<usize> = column_orders.iter().map(|x| x.index).collect();
let mut pk_column_id = column_catalog.len();

for pk in pks {
// If pk isn't contained in column catalog, we append it into column catalog.
if !column_catalog.contains_key(pk) {
column_catalog.insert(
*pk,
ColumnCatalog {
column_desc: ColumnDesc {
data_type: plan.schema()[*pk].data_type(),
column_id: ColumnId::new(pk_column_id as i32),
name: format!("_pk_{}", pk),
field_descs: vec![],
type_name: "".to_string(),
},
is_hidden: true,
},
);
pk_column_id += 1;
}

// If pk isn't contained in column orders, we append it into column orders, so that pk will
// appear at the end of the materialize state's key.
if !ordered_ids.contains(pk) {
column_orders.push(FieldOrder::ascending(
column_catalog
.get(pk)
.expect("pk not in catalog")
.column_id()
.get_id() as usize,
));
}
}

let mut column_catalog = column_catalog.into_values().collect_vec();
column_catalog.sort_by_key(|x| x.column_id().get_id());
let column_ids = column_catalog.iter().map(|x| x.column_id()).collect_vec();
assert!(
column_ids[0].get_id() == 0
&& column_ids.last().unwrap().get_id() == column_ids.len() as i32 - 1
);

// Add a materialize node upon the original stream plan
let plan = StreamMaterialize::new(plan.ctx(), plan, column_orders.clone(), column_ids);

let plan: PlanRef = plan.into();

let order = Order::new(column_orders);
let (pk_column_ids, pk_orders) = order.to_protobuf_id_and_order();

let table = ProstTable {
id: TableId::placeholder().table_id(),
schema_id: info.schema_id,
database_id: info.database_id,
name: info.table_name,
columns: column_catalog
.iter()
.map(ColumnCatalog::to_protobuf)
.collect(),
// The pk of the corresponding table of MV is order column + upstream pk
pk_column_ids,
pk_orders: pk_orders.into_iter().map(|x| x.into()).collect(),
dependent_relations: vec![], // placeholder for meta
optional_associated_source_id: None,
let (schema_name, table_name) = Binder::resolve_table_name(name)?;
let (database_id, schema_id) = session
.env()
.catalog_reader()
.read_guard()
.check_relation_name_duplicated(session.database(), &schema_name, &table_name)?;

let bound = {
let mut binder = Binder::new(
session.env().catalog_reader().read_guard(),
session.database().to_string(),
);
binder.bind_query(*query)?
};

let mut plan_root = Planner::new(context).plan_query(bound)?;
plan_root.set_required_dist(Distribution::any().clone());
let materialize = plan_root.gen_create_mv_plan(table_name);
let table = materialize.table().to_prost(schema_id, database_id);
let plan: PlanRef = materialize.into();

Ok((plan, table))
}

Expand All @@ -195,33 +61,16 @@ pub async fn handle_create_mv(
) -> Result<PgResponse> {
let session = context.session_ctx.clone();

let (table, plan) = {
let mut planner = Planner::new(context.into());

let (schema_name, table_name) = Binder::resolve_table_name(name.clone())?;
let (database_id, schema_id) = session
.env()
.catalog_reader()
.read_guard()
.check_relation_name_duplicated(session.database(), &schema_name, &table_name)?;

let (plan, table) = gen_create_mv_plan(
&session,
&mut planner,
*query,
MvInfo {
schema_id,
database_id,
table_name: name.to_string(),
},
)?;
let plan = plan.to_stream_prost();

(table, plan)
let (table, stream_plan) = {
let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name)?;
let stream_plan = plan.to_stream_prost();
(table, stream_plan)
};

let catalog_writer = session.env().catalog_writer();
catalog_writer.create_materialized_view(table, plan).await?;
catalog_writer
.create_materialized_view(table, stream_plan)
.await?;

Ok(PgResponse::new(
pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW,
Expand Down
Loading

0 comments on commit c80c92b

Please sign in to comment.