Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(proto): refactor cell based table and rowSeqScan plan #835

Merged
merged 7 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import com.risingwave.catalog.TableCatalog;
import com.risingwave.planner.rel.common.RwScan;
import com.risingwave.planner.rel.common.dist.RwDistributions;
import com.risingwave.proto.plan.CellBasedTableDesc;
import com.risingwave.proto.plan.ColumnDesc;
import com.risingwave.proto.plan.PlanNode;
import com.risingwave.proto.plan.RowSeqScanNode;
import com.risingwave.rpc.Messages;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
Expand Down Expand Up @@ -56,8 +56,11 @@ public RelNode convertToDistributed() {
@Override
public PlanNode serialize() {
var table = getTable().unwrapOrThrow(TableCatalog.class);
var tableRefId = Messages.getTableRefId(tableId);
var builder = RowSeqScanNode.newBuilder().setTableRefId(tableRefId);

// FIXME: here should add pk's desc in the table desc.
var builder =
RowSeqScanNode.newBuilder()
.setTableDesc(CellBasedTableDesc.newBuilder().setTableId(tableId.getValue()).build());

columnIds.forEach(
c -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 1
},
"columnDescs": [{
Expand All @@ -30,11 +26,7 @@
"identity": "RwBatchScan(table\u003d[[test_schema, t]], columns\u003d[v1,v2,v3])"
}, {
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 5
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 9
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 1
},
"columnDescs": [{
Expand All @@ -30,11 +26,7 @@
"identity": "RwBatchScan(table\u003d[[test_schema, t]], columns\u003d[v1,v2,v3])"
}, {
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 5
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 9
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@
"children": [{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 1
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 1
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 1
},
"columnDescs": [{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
{
"children": [{
"rowSeqScan": {
"tableRefId": {
"schemaRefId": {
"databaseRefId": {
}
},
"tableDesc": {
"tableId": 1
},
"columnDescs": [{
Expand Down
4 changes: 2 additions & 2 deletions proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ message ColumnCatalog {
}

message CellBasedTableDesc {
int32 table_id = 1;
uint32 table_id = 1;
repeated OrderedColumnDesc pk = 2;
}

Expand All @@ -70,7 +70,7 @@ message MaterializedViewInfo {
}

message RowSeqScanNode {
TableRefId table_ref_id = 1;
CellBasedTableDesc table_desc = 1;
repeated ColumnDesc column_descs = 2;
}

Expand Down
4 changes: 3 additions & 1 deletion rust/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
NodeBody::RowSeqScan
)?;

let table_id = TableId::from(&seq_scan_node.table_ref_id);
let table_id = TableId {
table_id: seq_scan_node.table_desc.as_ref().unwrap().table_id,
};
let column_descs = seq_scan_node
.column_descs
.iter()
Expand Down
11 changes: 0 additions & 11 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
#![feature(map_first_last)]
#![feature(let_chains)]

use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;

pub mod cell_based_row_deserializer;
pub mod hummock;
pub mod keyspace;
Expand All @@ -44,17 +41,9 @@ pub mod tikv;
pub mod tikv;

pub use keyspace::{Keyspace, Segment};
use risingwave_common::catalog::ColumnId;
pub use store::{StateStore, StateStoreIter};
pub use store_impl::StateStoreImpl;

#[derive(Clone, Debug)]
pub struct IndexDesc {
pub column_id: ColumnId,
pub data_type: DataType,
pub order: OrderType,
}

pub enum TableScanOptions {
SequentialScan,
SparseIndexScan,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
#![allow(dead_code)]
#![allow(unused)]
use risingwave_common::array::Row;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, OrderedColumnDesc};
use risingwave_common::error::Result;
use risingwave_common::types::Datum;
use risingwave_common::util::ordered::*;

use super::TableIter;
use crate::{IndexDesc, Keyspace, StateStore};
use crate::{Keyspace, StateStore};

/// `RowTable` is the interface accessing relational data in KV(`StateStore`) with encoding format:
/// [keyspace | pk | `column_id` (4B)] -> value.
/// `CellBasedTable` is the interface accessing relational data in KV(`StateStore`) with encoding
/// format: [keyspace | pk | `column_id` (4B)] -> value.
/// if the key of the column id does not exist, it will be Null in the relation
pub struct RowTable<S: StateStore> {
pub struct CellBasedTable<S: StateStore> {
keyspace: Keyspace<S>,
pk: Vec<IndexDesc>,
pk: Vec<OrderedColumnDesc>,
pk_serializer: OrderedRowSerializer,
}

impl<S: StateStore> RowTable<S> {
pub fn new(keyspace: Keyspace<S>, pk: Vec<IndexDesc>) -> Self {
impl<S: StateStore> CellBasedTable<S> {
pub fn new(keyspace: Keyspace<S>, pk: Vec<OrderedColumnDesc>) -> Self {
todo!()
}

pub async fn get(&self, pk: Row, column: ColumnDesc, epoch: u64) -> Result<Option<Datum>> {
pub async fn get(&self, pk: Row, column: &ColumnDesc, epoch: u64) -> Result<Option<Datum>> {
todo!()
}

pub async fn get_row(&self, pk: Row, columns: Vec<ColumnDesc>, epoch: u64) -> Result<Row> {
pub async fn get_row(&self, pk: Row, columns: &[ColumnDesc], epoch: u64) -> Result<Row> {
todo!()
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod cell_based_table;
pub mod mview;
pub mod row_table;

use risingwave_common::array::Row;
use risingwave_common::error::Result;
Expand Down