Skip to content

Commit

Permalink
feat: adding column of table schema change (#8063)
Browse files Browse the repository at this point in the history
* remove hanging channels

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* minor refactor

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add procedure

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* more procedure

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* more and more procedure

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* bug fixes

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* receiver with updated upstream

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* insert with version

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* pass dispatcher

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* make it run

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add col index mapping proto

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* use col index mapping to rewrite

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* refine docs

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* update proto

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* update merges

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* pause then replace

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add tests

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* refactor to ddl controller

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* refactor catalog

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

---------

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Feb 27, 2023
1 parent de37916 commit 85646ba
Show file tree
Hide file tree
Showing 25 changed files with 767 additions and 169 deletions.
42 changes: 42 additions & 0 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions dashboard/proto/gen/ddl_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 40 additions & 3 deletions e2e_test/ddl/alter_table_column.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ alter table t add column v1 int primary key;
statement error is not a table or cannot be altered
alter table mv add column v1 int;

statement ok
drop materialized view mv;

# Add column
statement ok
alter table t add column r real;

statement ok
create materialized view mv2 as select * from t;

query IR
select v, r from t;
----
Expand All @@ -33,6 +33,9 @@ public.t CREATE TABLE t (v INT, r REAL)
statement ok
alter table t add column s varchar;

statement ok
create materialized view mv3 as select * from t;

query IRT
select v, r, s from t;
----
Expand All @@ -42,5 +45,39 @@ show create table t;
----
public.t CREATE TABLE t (v INT, r REAL, s CHARACTER VARYING)

# Insert data
# TODO(#7906): alter after insert.
statement ok
insert into t values (1, 1.1, 'a');

statement ok
flush;

# All materialized views should keep the schema when it's created.
query I
select * from mv;
----
1

query IR
select * from mv2;
----
1 1.1

query IRT
select * from mv3;
----
1 1.1 a

# Drop columns
statement ok
drop materialized view mv;

statement ok
drop materialized view mv2;

statement ok
drop materialized view mv3;

statement ok
drop table t;
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ message ColumnIndex {
uint64 index = 1;
}

// A mapping of column indices.
message ColIndexMapping {
// The size of the target space.
uint64 target_size = 1;
// Each subscript is mapped to the corresponding element.
// For those not mapped, the value will be negative.
repeated int64 map = 2;
}

message WatermarkDesc {
// The column idx the watermark is on
uint32 watermark_idx = 1;
Expand Down
2 changes: 2 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ message ReplaceTablePlanRequest {
catalog.Table table = 1;
// The new materialization plan, where all schema are updated.
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
catalog.ColIndexMapping table_col_index_mapping = 3;
}

message ReplaceTablePlanResponse {
Expand Down
44 changes: 44 additions & 0 deletions src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::fmt::Debug;
use std::vec;

use itertools::Itertools;
use risingwave_pb::catalog::ColIndexMapping as ProstColIndexMapping;
use risingwave_pb::stream_plan::DispatchStrategy;

/// `ColIndexMapping` is a partial mapping from usize to usize.
///
Expand Down Expand Up @@ -268,6 +270,48 @@ impl ColIndexMapping {
}
}

impl ColIndexMapping {
pub fn to_protobuf(&self) -> ProstColIndexMapping {
ProstColIndexMapping {
target_size: self.target_size as u64,
map: self
.map
.iter()
.map(|x| x.map_or(-1, |x| x as i64))
.collect(),
}
}

pub fn from_protobuf(prost: &ProstColIndexMapping) -> ColIndexMapping {
ColIndexMapping {
target_size: prost.target_size as usize,
map: prost.map.iter().map(|&x| x.try_into().ok()).collect(),
}
}
}

impl ColIndexMapping {
/// Rewrite the dist-key indices and output indices in the given dispatch strategy. Returns
/// `None` if any of the indices is not mapped to the target.
pub fn rewrite_dispatch_strategy(
&self,
strategy: &DispatchStrategy,
) -> Option<DispatchStrategy> {
let map = |index: &[u32]| -> Option<Vec<u32>> {
index
.iter()
.map(|i| self.try_map(*i as usize).map(|i| i as u32))
.collect()
};

Some(DispatchStrategy {
r#type: strategy.r#type,
dist_key_indices: map(&strategy.dist_key_indices)?,
output_indices: map(&strategy.output_indices)?,
})
}
}

impl Debug for ColIndexMapping {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
20 changes: 17 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::{
Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex,
Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable,
Expand Down Expand Up @@ -78,7 +79,12 @@ pub trait CatalogWriter: Send + Sync {
graph: StreamFragmentGraph,
) -> Result<()>;

async fn replace_table(&self, table: ProstTable, graph: StreamFragmentGraph) -> Result<()>;
async fn replace_table(
&self,
table: ProstTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
) -> Result<()>;

async fn create_index(
&self,
Expand Down Expand Up @@ -188,8 +194,16 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn replace_table(&self, table: ProstTable, graph: StreamFragmentGraph) -> Result<()> {
let version = self.meta_client.replace_table(table, graph).await?;
async fn replace_table(
&self,
table: ProstTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
) -> Result<()> {
let version = self
.meta_client
.replace_table(table, graph, mapping)
.await?;
self.wait_version(version).await
}

Expand Down
24 changes: 16 additions & 8 deletions src/frontend/src/handler/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use anyhow::Context;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -127,22 +128,29 @@ pub async fn handle_add_column(
(graph, table)
};

// TODO: for test purpose only, we drop the original table and create a new one. This is wrong
// and really dangerous in production.
// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
original_catalog
.columns()
.iter()
.map(|old_c| {
table.columns.iter().position(|new_c| {
new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id()
})
})
.collect(),
);

if cfg!(debug_assertions) {
let catalog_writer = session.env().catalog_writer();

// TODO: call replace_table RPC
// catalog_writer.replace_table(table, graph).await?;

catalog_writer
.drop_table(None, original_catalog.id())
.replace_table(table, graph, col_index_mapping)
.await?;
catalog_writer.create_table(None, table, graph).await?;

Ok(PgResponse::empty_result_with_notice(
StatementType::ALTER_TABLE,
"The `ALTER TABLE` feature is incomplete and NO DATA is preserved! This feature is not available in production.".to_owned(),
"The `ALTER TABLE` feature is incomplete and data will be corrupted! This feature is not available in production.".to_owned(),
))
} else {
Err(ErrorCode::NotImplemented(
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::catalog::{
};
use risingwave_common::error::Result;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
Expand Down Expand Up @@ -256,7 +257,12 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn replace_table(&self, table: ProstTable, _graph: StreamFragmentGraph) -> Result<()> {
async fn replace_table(
&self,
table: ProstTable,
_graph: StreamFragmentGraph,
_mapping: ColIndexMapping,
) -> Result<()> {
self.catalog.write().update_table(&table);
Ok(())
}
Expand Down
Loading

0 comments on commit 85646ba

Please sign in to comment.