Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): refactor table desc && SeqScan && SeqScan pk derive && SeqScan serialize (#1250) #1258

Merged
merged 2 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion rust/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ impl From<i32> for ColumnId {
Self::new(column_id)
}
}

impl From<ColumnId> for i32 {
fn from(id: ColumnId) -> i32 {
id.0
}
}
impl std::fmt::Display for ColumnId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
Expand Down Expand Up @@ -184,6 +188,18 @@ impl From<&ProstColumnDesc> for ColumnDesc {
}
}

impl From<&ColumnDesc> for ProstColumnDesc {
fn from(c: &ColumnDesc) -> Self {
Self {
column_type: c.data_type.to_protobuf().into(),
column_id: c.column_id.into(),
name: c.name.clone(),
field_descs: c.field_descs.iter().map(ColumnDesc::to_protobuf).collect(),
type_name: c.type_name.clone(),
}
}
}

impl From<ProstOrderedColumnDesc> for OrderedColumnDesc {
fn from(prost: ProstOrderedColumnDesc) -> Self {
Self {
Expand Down
5 changes: 5 additions & 0 deletions rust/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ impl From<u32> for TableId {
Self::new(id)
}
}
impl From<TableId> for u32 {
fn from(id: TableId) -> Self {
id.table_id
}
}

impl fmt::Display for TableId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down
8 changes: 5 additions & 3 deletions rust/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{OrderedColumnDesc, TableId};
use super::{ColumnDesc, OrderedColumnDesc, TableId};

/// the table descriptor of table with cell based encoding in state store and include all
/// information for compute node to access data of the table.
#[derive(Debug, Clone)]
pub struct CellBasedTableDesc {
#[derive(Debug, Clone, Default)]
pub struct TableDesc {
/// id of the table, to find in Storage()
pub table_id: TableId,
/// the primary key columns' descriptor
pub pk: Vec<OrderedColumnDesc>,
/// all columns in the table, noticed it is NOT sorted by columnId in the vec
pub columns: Vec<ColumnDesc>,
}
18 changes: 15 additions & 3 deletions rust/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::ops::Index;

use risingwave_pb::plan::Field as ProstField;

use super::ColumnDesc;
use crate::array::ArrayBuilderImpl;
use crate::error::Result;
use crate::types::DataType;
Expand Down Expand Up @@ -119,15 +120,26 @@ impl Field {
}
}

pub fn from(prost_field: &ProstField) -> Self {
pub fn data_type(&self) -> DataType {
self.data_type.clone()
}
}

impl From<&ProstField> for Field {
fn from(prost_field: &ProstField) -> Self {
Self {
data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
name: prost_field.get_name().clone(),
}
}
}

pub fn data_type(&self) -> DataType {
self.data_type.clone()
impl From<&ColumnDesc> for Field {
fn from(desc: &ColumnDesc) -> Self {
Self {
data_type: desc.data_type.clone(),
name: desc.name.clone(),
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions rust/frontend/src/binder/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::hash_map::Entry;

use risingwave_common::catalog::{CellBasedTableDesc, ColumnDesc, DEFAULT_SCHEMA_NAME};
use risingwave_common::catalog::{ColumnDesc, TableDesc, DEFAULT_SCHEMA_NAME};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::try_match_expand;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -49,8 +49,7 @@ pub struct BoundJoin {
pub struct BoundBaseTable {
pub name: String, // explain-only
pub table_id: TableId,
pub cell_based_desc: CellBasedTableDesc,
pub columns: Vec<ColumnDesc>,
pub table_desc: TableDesc,
}

#[derive(Debug)]
Expand Down Expand Up @@ -176,7 +175,7 @@ impl Binder {
.get_table_by_name(&self.db_name, &schema_name, &table_name)?;

let table_id = table_catalog.id();
let cell_based_desc = table_catalog.cell_based_table();
let table_desc = table_catalog.table_desc();
let columns = table_catalog.columns().to_vec();

let columns = columns
Expand All @@ -190,9 +189,8 @@ impl Binder {

Ok(BoundBaseTable {
name: table_name,
cell_based_desc,
table_desc,
table_id,
columns,
})
}

Expand Down
9 changes: 5 additions & 4 deletions rust/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::{CellBasedTableDesc, ColumnDesc, OrderedColumnDesc};
use risingwave_common::catalog::{ColumnDesc, OrderedColumnDesc, TableDesc};
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::Table as ProstTable;
use risingwave_pb::plan::OrderType as ProstOrderType;
Expand Down Expand Up @@ -47,11 +47,12 @@ impl TableCatalog {
self.pk_desc.as_ref()
}

/// Get a [`CellBasedTableDesc`] of the table.
pub fn cell_based_table(&self) -> CellBasedTableDesc {
CellBasedTableDesc {
/// Get a [`TableDesc`] of the table.
pub fn table_desc(&self) -> TableDesc {
TableDesc {
table_id: self.id,
pk: self.pk_desc.clone(),
columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion rust/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use itertools::Itertools;
Expand Down Expand Up @@ -70,7 +71,7 @@ pub(super) async fn handle_create_source(

let source = match &stmt.source_schema {
SourceSchema::Protobuf(protobuf_schema) => {
column_catalogs.append(&mut extract_protobuf_table_schema(protobuf_schema)?);
column_catalogs.extend(extract_protobuf_table_schema(protobuf_schema)?.into_iter());
StreamSourceInfo {
properties: HashMap::from(stmt.with_properties),
row_format: RowFormatType::Protobuf as i32,
Expand Down
91 changes: 37 additions & 54 deletions rust/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ use std::rc::Rc;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::source::Info;
use risingwave_pb::catalog::{Source as ProstSource, Table as ProstTable, TableSourceInfo};
use risingwave_pb::plan::{ColumnCatalog, ColumnDesc as ProstColumnDesc, OrderType};
use risingwave_pb::stream_plan::source_node::SourceType;
use risingwave_pb::plan::{ColumnCatalog, ColumnDesc as ProstColumnDesc};
use risingwave_sqlparser::ast::{ColumnDef, ObjectName};

use crate::binder::expr::bind_data_type;
use crate::binder::Binder;
use crate::optimizer::plan_node::{LogicalScan, StreamExchange, StreamMaterialize, StreamSource};
use crate::optimizer::plan_node::{StreamExchange, StreamMaterialize, StreamSource};
use crate::optimizer::property::{Direction, Distribution, FieldOrder};
use crate::optimizer::PlanRef;
use crate::session::{QueryContext, QueryContextRef};
Expand Down Expand Up @@ -69,29 +69,36 @@ pub async fn handle_create_table(
column_descs
};

let columns_catalog = column_descs
.into_iter()
.enumerate()
.map(|(i, c)| ColumnCatalog {
column_desc: ProstColumnDesc {
column_type: c.data_type.to_protobuf().into(),
column_id: c.column_id.get_id(),
name: c.name,
..Default::default()
}
.into(),
is_hidden: i == 0,
})
.collect_vec();

let source = ProstSource {
id: TableId::placeholder().table_id(),
schema_id,
database_id,
name: table_name.clone(),
info: Info::TableSource(TableSourceInfo {
columns: columns_catalog.clone(),
})
.into(),
};

let plan = {
let context: QueryContextRef = context.into();

let source_node = {
let (columns, fields) = column_descs
.iter()
.map(|c| {
(
c.column_id,
Field::with_name(c.data_type.clone(), c.name.clone()),
)
})
.unzip();
let schema = Schema::new(fields);
let logical_scan = LogicalScan::new(
table_name.clone(),
TableId::placeholder(),
columns,
schema,
context.clone(),
);
StreamSource::new(logical_scan, SourceType::Table)
};
let source_node = StreamSource::create(context.clone(), vec![0], source.clone());

let exchange_node =
{ StreamExchange::new(source_node.into(), Distribution::HashShard(vec![0])) };
Expand All @@ -107,7 +114,10 @@ pub async fn handle_create_table(
direct: Direction::Asc,
},
],
column_descs.iter().map(|x| x.column_id).collect(),
columns_catalog
.iter()
.map(|x| x.column_desc.as_ref().unwrap().column_id.into())
.collect(),
)
};

Expand All @@ -117,44 +127,17 @@ pub async fn handle_create_table(
let json_plan = serde_json::to_string_pretty(&plan).unwrap();
log::debug!("name={}, plan=\n{}", table_name, json_plan);

let columns = column_descs
.into_iter()
.enumerate()
.map(|(i, c)| ColumnCatalog {
column_desc: ProstColumnDesc {
column_type: c.data_type.to_protobuf().into(),
column_id: c.column_id.get_id(),
name: c.name,
..Default::default()
}
.into(),
is_hidden: i == 0,
})
.collect_vec();

let source = ProstSource {
id: TableId::placeholder().table_id(),
schema_id,
database_id,
name: table_name.clone(),
info: Info::TableSource(TableSourceInfo {
columns: columns.clone(),
})
.into(),
};

let table = ProstTable {
id: TableId::placeholder().table_id(),
schema_id,
database_id,
name: table_name,
columns,
columns: columns_catalog,
pk_column_ids: vec![0],
pk_orders: vec![OrderType::Ascending as i32],
pk_orders: vec![OrderType::Ascending.to_prost() as i32],
dependent_relations: vec![],
optional_associated_source_id: None,
};

let catalog_writer = session.env().catalog_writer();
catalog_writer
.create_materialized_source(source, table, plan)
Expand Down
13 changes: 5 additions & 8 deletions rust/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,27 +195,24 @@ mod tests {
use risingwave_common::types::DataType;

use super::*;
use crate::catalog::{ColumnId, TableId};
use crate::optimizer::plan_node::LogicalScan;
use crate::optimizer::plan_node::LogicalValues;
use crate::session::QueryContext;

#[tokio::test]
async fn test_as_subplan() {
let ctx = QueryContext::mock().await;
let scan = LogicalScan::create(
"test_table".into(),
TableId::new(3),
vec![ColumnId::new(2), ColumnId::new(7)],
let values = LogicalValues::new(
vec![],
Schema::new(vec![
Field::with_name(DataType::Int32, "v1"),
Field::with_name(DataType::Varchar, "v2"),
]),
ctx,
)
.unwrap();
.into();
let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
let root = PlanRoot::new(
scan,
values,
Distribution::any().clone(),
Order::any().clone(),
out_fields,
Expand Down
15 changes: 4 additions & 11 deletions rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_pb::plan::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc, RowSeqScanNode};
Expand Down Expand Up @@ -75,21 +74,15 @@ impl ToBatchProst for BatchSeqScan {
// TODO(bugen): directly store `ColumnDesc`s in logical scan.
let column_descs = self
.logical
.columns()
.column_descs()
.iter()
.zip_eq(self.logical.schema().fields())
.map(|(column_id, field)| ProstColumnDesc {
column_type: field.data_type().to_protobuf().into(),
column_id: column_id.get_id(),
name: field.name.clone(),
..Default::default()
})
.map(ProstColumnDesc::from)
.collect();

NodeBody::RowSeqScan(RowSeqScanNode {
table_desc: Some(CellBasedTableDesc {
table_id: self.logical.table_id(),
pk: vec![], // not used
table_id: self.logical.table_desc().table_id.into(),
pk: vec![], // TODO:
}),
column_descs,
})
Expand Down
Loading