From 85646ba62b6fc9d6b37e6fc86141b0db92cb69a1 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 28 Feb 2023 00:26:57 +0800 Subject: [PATCH] feat: adding column of table schema change (#8063) * remove hanging channels Signed-off-by: Bugen Zhao * minor refactor Signed-off-by: Bugen Zhao * add procedure Signed-off-by: Bugen Zhao * more procedure Signed-off-by: Bugen Zhao * more and more procedure Signed-off-by: Bugen Zhao * bug fixes Signed-off-by: Bugen Zhao * receiver with updated upstream Signed-off-by: Bugen Zhao * insert with version Signed-off-by: Bugen Zhao * pass dispatcher Signed-off-by: Bugen Zhao * make it run Signed-off-by: Bugen Zhao * add col index mapping proto Signed-off-by: Bugen Zhao * use col index mapping to rewrite Signed-off-by: Bugen Zhao * refine docs Signed-off-by: Bugen Zhao * update proto Signed-off-by: Bugen Zhao * update merges Signed-off-by: Bugen Zhao * pause then replace Signed-off-by: Bugen Zhao * add tests Signed-off-by: Bugen Zhao * refactor to ddl controller Signed-off-by: Bugen Zhao * refactor catalog Signed-off-by: Bugen Zhao --------- Signed-off-by: Bugen Zhao --- dashboard/proto/gen/catalog.ts | 42 +++++ dashboard/proto/gen/ddl_service.ts | 19 ++- e2e_test/ddl/alter_table_column.slt | 43 ++++- proto/catalog.proto | 9 ++ proto/ddl_service.proto | 2 + src/common/src/util/column_index_mapping.rs | 44 +++++ src/frontend/src/catalog/catalog_service.rs | 20 ++- src/frontend/src/handler/alter_table.rs | 24 ++- src/frontend/src/test_utils.rs | 8 +- src/meta/src/barrier/command.rs | 56 +++++++ src/meta/src/barrier/schedule.rs | 11 +- src/meta/src/manager/catalog/database.rs | 4 + src/meta/src/manager/catalog/fragment.rs | 161 +++++++++++++++---- src/meta/src/manager/catalog/mod.rs | 79 ++++++++- src/meta/src/model/stream.rs | 6 + src/meta/src/rpc/ddl_controller.rs | 149 +++++++++++++---- src/meta/src/rpc/service/ddl_service.rs | 18 ++- src/meta/src/stream/scale.rs | 11 +- src/meta/src/stream/stream_graph.rs | 2 +- src/meta/src/stream/stream_graph/fragment.rs | 29 +--- src/meta/src/stream/stream_graph/visit.rs | 16 +- src/meta/src/stream/stream_manager.rs | 110 +++++++++---- src/rpc_client/src/meta_client.rs | 3 + src/stream/src/executor/merge.rs | 47 +++--- src/stream/src/executor/receiver.rs | 23 ++- 25 files changed, 767 insertions(+), 169 deletions(-) diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index 09811bac319e..630397b1af30 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -68,6 +68,17 @@ export interface ColumnIndex { index: number; } +/** A mapping of column indices. */ +export interface ColIndexMapping { + /** The size of the target space. */ + targetSize: number; + /** + * Each subscript is mapped to the corresponding element. + * For those not mapped, the value will be negative. + */ + map: number[]; +} + export interface WatermarkDesc { /** The column idx the watermark is on */ watermarkIdx: number; @@ -345,6 +356,37 @@ export const ColumnIndex = { }, }; +function createBaseColIndexMapping(): ColIndexMapping { + return { targetSize: 0, map: [] }; +} + +export const ColIndexMapping = { + fromJSON(object: any): ColIndexMapping { + return { + targetSize: isSet(object.targetSize) ? Number(object.targetSize) : 0, + map: Array.isArray(object?.map) ? object.map.map((e: any) => Number(e)) : [], + }; + }, + + toJSON(message: ColIndexMapping): unknown { + const obj: any = {}; + message.targetSize !== undefined && (obj.targetSize = Math.round(message.targetSize)); + if (message.map) { + obj.map = message.map.map((e) => Math.round(e)); + } else { + obj.map = []; + } + return obj; + }, + + fromPartial, I>>(object: I): ColIndexMapping { + const message = createBaseColIndexMapping(); + message.targetSize = object.targetSize ?? 0; + message.map = object.map?.map((e) => e) || []; + return message; + }, +}; + function createBaseWatermarkDesc(): WatermarkDesc { return { watermarkIdx: 0, expr: undefined }; } diff --git a/dashboard/proto/gen/ddl_service.ts b/dashboard/proto/gen/ddl_service.ts index 7ddf38241f3f..9f55ed090edf 100644 --- a/dashboard/proto/gen/ddl_service.ts +++ b/dashboard/proto/gen/ddl_service.ts @@ -1,5 +1,5 @@ /* eslint-disable */ -import { Database, Function, Index, Schema, Sink, Source, Table, View } from "./catalog"; +import { ColIndexMapping, Database, Function, Index, Schema, Sink, Source, Table, View } from "./catalog"; import { Status } from "./common"; import { StreamFragmentGraph } from "./stream_plan"; @@ -206,7 +206,11 @@ export interface ReplaceTablePlanRequest { | Table | undefined; /** The new materialization plan, where all schema are updated. */ - fragmentGraph: StreamFragmentGraph | undefined; + fragmentGraph: + | StreamFragmentGraph + | undefined; + /** The mapping from the old columns to the new columns of the table. */ + tableColIndexMapping: ColIndexMapping | undefined; } export interface ReplaceTablePlanResponse { @@ -1299,7 +1303,7 @@ export const DropIndexResponse = { }; function createBaseReplaceTablePlanRequest(): ReplaceTablePlanRequest { - return { table: undefined, fragmentGraph: undefined }; + return { table: undefined, fragmentGraph: undefined, tableColIndexMapping: undefined }; } export const ReplaceTablePlanRequest = { @@ -1307,6 +1311,9 @@ export const ReplaceTablePlanRequest = { return { table: isSet(object.table) ? Table.fromJSON(object.table) : undefined, fragmentGraph: isSet(object.fragmentGraph) ? StreamFragmentGraph.fromJSON(object.fragmentGraph) : undefined, + tableColIndexMapping: isSet(object.tableColIndexMapping) + ? ColIndexMapping.fromJSON(object.tableColIndexMapping) + : undefined, }; }, @@ -1315,6 +1322,9 @@ export const ReplaceTablePlanRequest = { message.table !== undefined && (obj.table = message.table ? Table.toJSON(message.table) : undefined); message.fragmentGraph !== undefined && (obj.fragmentGraph = message.fragmentGraph ? StreamFragmentGraph.toJSON(message.fragmentGraph) : undefined); + message.tableColIndexMapping !== undefined && (obj.tableColIndexMapping = message.tableColIndexMapping + ? ColIndexMapping.toJSON(message.tableColIndexMapping) + : undefined); return obj; }, @@ -1324,6 +1334,9 @@ export const ReplaceTablePlanRequest = { message.fragmentGraph = (object.fragmentGraph !== undefined && object.fragmentGraph !== null) ? StreamFragmentGraph.fromPartial(object.fragmentGraph) : undefined; + message.tableColIndexMapping = (object.tableColIndexMapping !== undefined && object.tableColIndexMapping !== null) + ? ColIndexMapping.fromPartial(object.tableColIndexMapping) + : undefined; return message; }, }; diff --git a/e2e_test/ddl/alter_table_column.slt b/e2e_test/ddl/alter_table_column.slt index 15e7b20efbf6..0a97586769d3 100644 --- a/e2e_test/ddl/alter_table_column.slt +++ b/e2e_test/ddl/alter_table_column.slt @@ -14,13 +14,13 @@ alter table t add column v1 int primary key; statement error is not a table or cannot be altered alter table mv add column v1 int; -statement ok -drop materialized view mv; - # Add column statement ok alter table t add column r real; +statement ok +create materialized view mv2 as select * from t; + query IR select v, r from t; ---- @@ -33,6 +33,9 @@ public.t CREATE TABLE t (v INT, r REAL) statement ok alter table t add column s varchar; +statement ok +create materialized view mv3 as select * from t; + query IRT select v, r, s from t; ---- @@ -42,5 +45,39 @@ show create table t; ---- public.t CREATE TABLE t (v INT, r REAL, s CHARACTER VARYING) +# Insert data +# TODO(#7906): alter after insert. +statement ok +insert into t values (1, 1.1, 'a'); + +statement ok +flush; + +# All materialized views should keep the schema when it's created. +query I +select * from mv; +---- +1 + +query IR +select * from mv2; +---- +1 1.1 + +query IRT +select * from mv3; +---- +1 1.1 a + +# Drop columns +statement ok +drop materialized view mv; + +statement ok +drop materialized view mv2; + +statement ok +drop materialized view mv3; + statement ok drop table t; diff --git a/proto/catalog.proto b/proto/catalog.proto index 6b3602527232..42eae27e5d8d 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -16,6 +16,15 @@ message ColumnIndex { uint64 index = 1; } +// A mapping of column indices. +message ColIndexMapping { + // The size of the target space. + uint64 target_size = 1; + // Each subscript is mapped to the corresponding element. + // For those not mapped, the value will be negative. + repeated int64 map = 2; +} + message WatermarkDesc { // The column idx the watermark is on uint32 watermark_idx = 1; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 364166d6d111..4f9cf91271cd 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -206,6 +206,8 @@ message ReplaceTablePlanRequest { catalog.Table table = 1; // The new materialization plan, where all schema are updated. stream_plan.StreamFragmentGraph fragment_graph = 2; + // The mapping from the old columns to the new columns of the table. + catalog.ColIndexMapping table_col_index_mapping = 3; } message ReplaceTablePlanResponse { diff --git a/src/common/src/util/column_index_mapping.rs b/src/common/src/util/column_index_mapping.rs index 654fb0035aff..884f4043b69e 100644 --- a/src/common/src/util/column_index_mapping.rs +++ b/src/common/src/util/column_index_mapping.rs @@ -17,6 +17,8 @@ use std::fmt::Debug; use std::vec; use itertools::Itertools; +use risingwave_pb::catalog::ColIndexMapping as ProstColIndexMapping; +use risingwave_pb::stream_plan::DispatchStrategy; /// `ColIndexMapping` is a partial mapping from usize to usize. /// @@ -268,6 +270,48 @@ impl ColIndexMapping { } } +impl ColIndexMapping { + pub fn to_protobuf(&self) -> ProstColIndexMapping { + ProstColIndexMapping { + target_size: self.target_size as u64, + map: self + .map + .iter() + .map(|x| x.map_or(-1, |x| x as i64)) + .collect(), + } + } + + pub fn from_protobuf(prost: &ProstColIndexMapping) -> ColIndexMapping { + ColIndexMapping { + target_size: prost.target_size as usize, + map: prost.map.iter().map(|&x| x.try_into().ok()).collect(), + } + } +} + +impl ColIndexMapping { + /// Rewrite the dist-key indices and output indices in the given dispatch strategy. Returns + /// `None` if any of the indices is not mapped to the target. + pub fn rewrite_dispatch_strategy( + &self, + strategy: &DispatchStrategy, + ) -> Option { + let map = |index: &[u32]| -> Option> { + index + .iter() + .map(|i| self.try_map(*i as usize).map(|i| i as u32)) + .collect() + }; + + Some(DispatchStrategy { + r#type: strategy.r#type, + dist_key_indices: map(&strategy.dist_key_indices)?, + output_indices: map(&strategy.output_indices)?, + }) + } +} + impl Debug for ColIndexMapping { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 37cffcfa816e..736cb307c0b3 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -19,6 +19,7 @@ use parking_lot::{RawRwLock, RwLock}; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::{Result, RwError}; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::{ Database as ProstDatabase, Function as ProstFunction, Index as ProstIndex, Schema as ProstSchema, Sink as ProstSink, Source as ProstSource, Table as ProstTable, @@ -78,7 +79,12 @@ pub trait CatalogWriter: Send + Sync { graph: StreamFragmentGraph, ) -> Result<()>; - async fn replace_table(&self, table: ProstTable, graph: StreamFragmentGraph) -> Result<()>; + async fn replace_table( + &self, + table: ProstTable, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()>; async fn create_index( &self, @@ -188,8 +194,16 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn replace_table(&self, table: ProstTable, graph: StreamFragmentGraph) -> Result<()> { - let version = self.meta_client.replace_table(table, graph).await?; + async fn replace_table( + &self, + table: ProstTable, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()> { + let version = self + .meta_client + .replace_table(table, graph, mapping) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_table.rs b/src/frontend/src/handler/alter_table.rs index 2b38a6e61af9..6e2985e9fd44 100644 --- a/src/frontend/src/handler/alter_table.rs +++ b/src/frontend/src/handler/alter_table.rs @@ -15,6 +15,7 @@ use anyhow::Context; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::Table; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -127,22 +128,29 @@ pub async fn handle_add_column( (graph, table) }; - // TODO: for test purpose only, we drop the original table and create a new one. This is wrong - // and really dangerous in production. + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + original_catalog + .columns() + .iter() + .map(|old_c| { + table.columns.iter().position(|new_c| { + new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id() + }) + }) + .collect(), + ); + if cfg!(debug_assertions) { let catalog_writer = session.env().catalog_writer(); - // TODO: call replace_table RPC - // catalog_writer.replace_table(table, graph).await?; - catalog_writer - .drop_table(None, original_catalog.id()) + .replace_table(table, graph, col_index_mapping) .await?; - catalog_writer.create_table(None, table, graph).await?; Ok(PgResponse::empty_result_with_notice( StatementType::ALTER_TABLE, - "The `ALTER TABLE` feature is incomplete and NO DATA is preserved! This feature is not available in production.".to_owned(), + "The `ALTER TABLE` feature is incomplete and data will be corrupted! This feature is not available in production.".to_owned(), )) } else { Err(ErrorCode::NotImplemented( diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 8aa46fb9c820..26153d6cef66 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -28,6 +28,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::error::Result; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -256,7 +257,12 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn replace_table(&self, table: ProstTable, _graph: StreamFragmentGraph) -> Result<()> { + async fn replace_table( + &self, + table: ProstTable, + _graph: StreamFragmentGraph, + _mapping: ColIndexMapping, + ) -> Result<()> { self.catalog.write().update_table(&table); Ok(()) } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 24a049ee1018..0353aa074f68 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -115,6 +115,18 @@ pub enum Command { /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively. RescheduleFragment(HashMap), + /// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is + /// essentially switching the downstream of the old table fragments to the new ones, and + /// dropping the old table fragments. Used for table schema change. + /// + /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment + /// of the Merge executors are changed additionally. + ReplaceTable { + old_table_fragments: TableFragments, + new_table_fragments: TableFragments, + merge_updates: Vec, + }, + /// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or /// newly added splits. SourceSplitAssignment(SplitAssignment), @@ -155,6 +167,15 @@ impl Command { .collect(); CommandChanges::Actor { to_add, to_remove } } + Command::ReplaceTable { + old_table_fragments, + new_table_fragments, + .. + } => { + let to_add = new_table_fragments.actor_ids().into_iter().collect(); + let to_remove = old_table_fragments.actor_ids().into_iter().collect(); + CommandChanges::Actor { to_add, to_remove } + } Command::SourceSplitAssignment(_) => CommandChanges::None, } } @@ -282,6 +303,20 @@ where Some(Mutation::Stop(StopMutation { actors })) } + Command::ReplaceTable { + old_table_fragments, + merge_updates, + .. + } => { + let dropped_actors = old_table_fragments.actor_ids(); + + Some(Mutation::Update(UpdateMutation { + merge_update: merge_updates.clone(), + dropped_actors, + ..Default::default() + })) + } + Command::RescheduleFragment(reschedules) => { let mut dispatcher_update = HashMap::new(); for (_fragment_id, reschedule) in reschedules.iter() { @@ -604,6 +639,27 @@ where .await; } } + + Command::ReplaceTable { + old_table_fragments, + new_table_fragments, + merge_updates, + } => { + let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); + + // Tell compute nodes to drop actors. + let node_actors = self.fragment_manager.table_node_actors(&table_ids).await?; + self.clean_up(node_actors).await?; + + // Drop fragment info in meta store. + self.fragment_manager + .post_replace_table( + old_table_fragments.table_id(), + new_table_fragments.table_id(), + merge_updates, + ) + .await?; + } } Ok(()) diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index ce5536251d74..4d5e9570319a 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -154,7 +154,9 @@ impl BarrierScheduler { } /// Run multiple commands and return when they're all completely finished. It's ensured that - /// multiple commands is executed continuously and atomically. + /// multiple commands are executed continuously. + /// + /// TODO: atomicity of multiple commands is not guaranteed. pub async fn run_multiple_commands(&self, commands: Vec) -> MetaResult<()> { struct Context { collect_rx: oneshot::Receiver>, @@ -205,6 +207,13 @@ impl BarrierScheduler { Ok(()) } + /// Run a command with a `Pause` command before and `Resume` command after it. Used for + /// configuration change. + pub async fn run_command_with_paused(&self, command: Command) -> MetaResult<()> { + self.run_multiple_commands(vec![Command::pause(), command, Command::resume()]) + .await + } + /// Run a command and return when it's completely finished. pub async fn run_command(&self, command: Command) -> MetaResult<()> { self.run_multiple_commands(vec![command]).await diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index f720d6f0dc1d..d678ba3445e5 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -200,6 +200,10 @@ impl DatabaseManager { self.tables.values().cloned().collect_vec() } + pub fn get_table(&self, table_id: TableId) -> Option<&Table> { + self.tables.get(&table_id) + } + pub fn get_all_table_options(&self) -> HashMap { self.tables .iter() diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index c29e54633a44..de1aa73f54f3 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -28,8 +28,9 @@ use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; use risingwave_pb::meta::FragmentParallelUnitMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ - Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode, + DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode, }; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -40,7 +41,7 @@ use crate::model::{ ActorId, BTreeMapTransaction, FragmentId, MetadataModel, TableFragments, ValTransaction, }; use crate::storage::{MetaStore, Transaction}; -use crate::stream::SplitAssignment; +use crate::stream::{visit_stream_node, SplitAssignment}; use crate::MetaResult; pub struct FragmentManagerCore { @@ -262,6 +263,94 @@ where Ok(()) } + /// Called after the barrier collection of `ReplaceTable` command, which replaces the fragments + /// of this table, and updates the downstream Merge to have the new upstream fragments. + pub async fn post_replace_table( + &self, + table_id: TableId, + dummy_table_id: TableId, + merge_updates: &[MergeUpdate], + ) -> MetaResult<()> { + let map = &mut self.core.write().await.table_fragments; + + let mut table_fragments = BTreeMapTransaction::new(map); + + // FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments + // with the real table ID, then replace the dummy table ID with the real table ID. This is a + // workaround for not having the version info in the fragment manager. + let old_table_fragment = table_fragments + .remove(table_id) + .with_context(|| format!("table_fragment not exist: id={}", table_id))?; + let mut table_fragment = table_fragments + .remove(dummy_table_id) + .with_context(|| format!("table_fragment not exist: id={}", dummy_table_id))?; + + assert_eq!(table_fragment.state(), State::Initial); + table_fragment.set_table_id(table_id); + + // Directly set to `Created` and `Running` state. + table_fragment.set_state(State::Created); + table_fragment.update_actors_state(ActorState::Running); + + table_fragments.insert(table_id, table_fragment.clone()); + + // Update downstream `Merge`s. + let mut merge_updates: HashMap<_, _> = merge_updates + .iter() + .map(|update| (update.actor_id, update)) + .collect(); + + let to_update_table_ids = table_fragments + .tree_ref() + .iter() + .filter(|(_, v)| { + v.actor_ids() + .iter() + .any(|&actor_id| merge_updates.contains_key(&actor_id)) + }) + .map(|(k, _)| *k) + .collect::>(); + + for table_id in to_update_table_ids { + let mut table_fragment = table_fragments + .get_mut(table_id) + .with_context(|| format!("table_fragment not exist: id={}", table_id))?; + + for actor in table_fragment + .fragments + .values_mut() + .flat_map(|f| &mut f.actors) + { + if let Some(merge_update) = merge_updates.remove(&actor.actor_id) { + assert!(merge_update.removed_upstream_actor_id.is_empty()); + assert!(merge_update.new_upstream_fragment_id.is_some()); + + let stream_node = actor.nodes.as_mut().unwrap(); + visit_stream_node(stream_node, |body| { + if let NodeBody::Merge(m) = body + && m.upstream_fragment_id == merge_update.upstream_fragment_id + { + m.upstream_fragment_id = merge_update.new_upstream_fragment_id.unwrap(); + m.upstream_actor_id = merge_update.added_upstream_actor_id.clone(); + } + }); + } + } + } + + assert!(merge_updates.is_empty()); + + // Commit changes and notify about the changes. + commit_meta!(self, table_fragments)?; + + self.notify_fragment_mapping(&old_table_fragment, Operation::Delete) + .await; + self.notify_fragment_mapping(&table_fragment, Operation::Add) + .await; + + Ok(()) + } + /// Called after the finish of `CreateStreamingJob` command, i.e., streaming job is /// completely created, which updates the state from `Creating` to `Created`. pub async fn mark_table_fragments_created(&self, table_id: TableId) -> MetaResult<()> { @@ -571,26 +660,19 @@ where stream_node: &mut StreamNode, upstream_fragment_id: &FragmentId, upstream_actors_to_remove: &HashSet, - upstream_actors_to_create: &Vec, + upstream_actors_to_create: &[ActorId], ) { - if let Some(NodeBody::Merge(s)) = stream_node.node_body.as_mut() { - if s.upstream_fragment_id == *upstream_fragment_id { - update_actors( - s.upstream_actor_id.as_mut(), - upstream_actors_to_remove, - upstream_actors_to_create, - ); + visit_stream_node(stream_node, |body| { + if let NodeBody::Merge(s) = body { + if s.upstream_fragment_id == *upstream_fragment_id { + update_actors( + s.upstream_actor_id.as_mut(), + upstream_actors_to_remove, + upstream_actors_to_create, + ); + } } - } - - for child in &mut stream_node.input { - update_merge_node_upstream( - child, - upstream_fragment_id, - upstream_actors_to_remove, - upstream_actors_to_create, - ); - } + }); } let new_created_actors: HashSet<_> = reschedules @@ -842,7 +924,7 @@ where pub async fn get_downstream_chain_fragments( &self, table_id: TableId, - ) -> MetaResult> { + ) -> MetaResult> { let map = &self.core.read().await.table_fragments; let table_fragments = map @@ -850,10 +932,18 @@ where .with_context(|| format!("table_fragment not exist: id={}", table_id))?; let mview_fragment = table_fragments.mview_fragment().unwrap(); - let downstream_fragment_ids: HashSet<_> = mview_fragment.actors[0] + let downstream_dispatches: HashMap<_, _> = mview_fragment.actors[0] .dispatcher .iter() - .map(|d| d.dispatcher_id as FragmentId) + .map(|d| { + let fragment_id = d.dispatcher_id as FragmentId; + let strategy = DispatchStrategy { + r#type: d.r#type, + dist_key_indices: d.dist_key_indices.clone(), + output_indices: d.output_indices.clone(), + }; + (fragment_id, strategy) + }) .collect(); // Find the fragments based on the fragment ids. @@ -863,16 +953,33 @@ where table_fragments .fragments .values() - .filter(|fragment| downstream_fragment_ids.contains(&fragment.fragment_id)) - .inspect(|f| { + .filter_map(|fragment| { + downstream_dispatches + .get(&fragment.fragment_id) + .map(|d| (d.clone(), fragment.clone())) + }) + .inspect(|(_, f)| { assert!((f.fragment_type_mask & FragmentTypeFlag::ChainNode as u32) != 0) }) }) - .cloned() .collect_vec(); - assert_eq!(downstream_fragment_ids.len(), fragments.len()); + assert_eq!(downstream_dispatches.len(), fragments.len()); Ok(fragments) } + + /// Get the `Materialize` fragment of the specified table. + pub async fn get_mview_fragment(&self, table_id: TableId) -> MetaResult { + let map = &self.core.read().await.table_fragments; + + let table_fragments = map + .get(&table_id) + .with_context(|| format!("table_fragment not exist: id={}", table_id))?; + let mview_fragment = table_fragments + .mview_fragment() + .with_context(|| format!("mview fragment not exist: id={}", table_id))?; + + Ok(mview_fragment) + } } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index b8f8f3245dec..12096ae50b00 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -21,7 +21,7 @@ use std::iter; use std::option::Option::Some; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; pub use database::*; pub use fragment::*; use itertools::Itertools; @@ -1436,6 +1436,83 @@ where } } + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. + pub async fn start_replace_table_procedure(&self, table: &Table) -> MetaResult<()> { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + database_core.ensure_database_id(table.database_id)?; + database_core.ensure_schema_id(table.schema_id)?; + + assert!(table.dependent_relations.is_empty()); + + let key = (table.database_id, table.schema_id, table.name.clone()); + let original_table = database_core + .get_table(table.id) + .context("table to alter must exist")?; + + // Check whether the frontend is operating on the latest version of the table. + if table.get_version()?.version != original_table.get_version()?.version + 1 { + bail!("table version is stale"); + } + + // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must + // occur after it's created. We may need to add a new tracker for `alter` procedure. + if database_core.has_in_progress_creation(&key) { + bail!("table is in altering procedure"); + } else { + database_core.mark_creating(&key); + Ok(()) + } + } + + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. + pub async fn finish_replace_table_procedure( + &self, + table: &Table, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + let key = (table.database_id, table.schema_id, table.name.clone()); + assert!( + tables.contains_key(&table.id) + && database_core.in_progress_creation_tracker.contains(&key), + "table must exist and be in altering procedure" + ); + + // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must + database_core.in_progress_creation_tracker.remove(&key); + + tables.insert(table.id, table.clone()); + commit_meta!(self, tables)?; + + let version = self + .notify_frontend(Operation::Update, Info::Table(table.to_owned())) + .await; + + Ok(version) + } + + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. + pub async fn cancel_replace_table_procedure(&self, table: &Table) -> MetaResult<()> { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let key = (table.database_id, table.schema_id, table.name.clone()); + + assert!(table.dependent_relations.is_empty()); + + assert!( + database_core.tables.contains_key(&table.id) + && database_core.has_in_progress_creation(&key), + "table must exist and must be in altering procedure" + ); + + // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must + // occur after it's created. We may need to add a new tracker for `alter` procedure.s + database_core.unmark_creating(&key); + Ok(()) + } + pub async fn list_databases(&self) -> Vec { self.core.lock().await.database.list_databases() } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index ce385b207b81..9480a528fe61 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -191,6 +191,12 @@ impl TableFragments { self.state == State::Created } + /// Set the table ID. + // TODO: remove this workaround for replacing table. + pub fn set_table_id(&mut self, table_id: TableId) { + self.table_id = table_id; + } + /// Set the state of the table fragments. pub fn set_state(&mut self, state: State) { self.state = state; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index b9543110048f..36d12ca11fa4 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -12,15 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::Context; use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::{Database, Function, Schema, Source, Table, View}; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use crate::barrier::BarrierManagerRef; use crate::manager::{ - CatalogManagerRef, ClusterManagerRef, DatabaseId, FragmentManagerRef, FunctionId, IndexId, - MetaSrvEnv, NotificationVersion, SchemaId, SinkId, SourceId, StreamingJob, TableId, ViewId, + CatalogManagerRef, ClusterManagerRef, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, + IndexId, MetaSrvEnv, NotificationVersion, SchemaId, SinkId, SourceId, StreamingJob, TableId, + ViewId, }; use crate::model::{StreamEnvironment, TableFragments}; use crate::storage::MetaStore; @@ -62,7 +65,7 @@ pub enum DdlCommand { DropView(ViewId), CreatingStreamingJob(StreamingJob, StreamFragmentGraphProto), DropStreamingJob(StreamingJobId), - ReplaceStreamingJob(StreamingJob, StreamFragmentGraphProto), + ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), } #[derive(Clone)] @@ -135,8 +138,9 @@ where ctrl.create_streaming_job(stream_job, fragment_graph).await } DdlCommand::DropStreamingJob(job_id) => ctrl.drop_streaming_job(job_id).await, - DdlCommand::ReplaceStreamingJob(stream_job, fragment_graph) => { - ctrl.replace_streaming_job(stream_job, fragment_graph).await + DdlCommand::ReplaceTable(stream_job, fragment_graph, table_col_index_mapping) => { + ctrl.replace_table(stream_job, fragment_graph, table_col_index_mapping) + .await } } }); @@ -519,52 +523,97 @@ where } } - async fn replace_streaming_job( + async fn replace_table( &self, mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, + table_col_index_mapping: ColIndexMapping, ) -> MetaResult { - let (_ctx, _table_fragments) = self + let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); + + let fragment_graph = self .prepare_replace_table(&mut stream_job, fragment_graph) .await?; - Ok(u64::MAX) + + let result = try { + let (ctx, table_fragments) = self + .build_replace_table(env, &stream_job, fragment_graph, table_col_index_mapping) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + }; + + match result { + Ok(_) => self.finish_replace_table(&stream_job).await, + Err(err) => { + self.cancel_replace_table(&stream_job).await?; + Err(err) + } + } } - /// Prepares a table replacement and returns the context and table fragments. + /// `prepare_replace_table` prepares a table replacement and returns the new stream fragment + /// graph. This is basically the same as `prepare_stream_job`, except that it does more + /// assertions and uses a different method to mark in the catalog. async fn prepare_replace_table( &self, stream_job: &mut StreamingJob, fragment_graph: StreamFragmentGraphProto, - ) -> MetaResult<(ReplaceTableContext, TableFragments)> { - let id = stream_job.id(); - - // 1. Get the env for streaming jobs. - let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); - - // 2. Build fragment graph. + ) -> MetaResult { + // 1. Build fragment graph. let fragment_graph = StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), &*stream_job) .await?; - let default_parallelism = fragment_graph.default_parallelism(); assert!(fragment_graph.internal_tables().is_empty()); + assert!(fragment_graph.dependent_relations().is_empty()); - // 3. Set the graph-related fields and freeze the `stream_job`. + // 2. Set the graph-related fields and freeze the `stream_job`. stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); let stream_job = &*stream_job; - // TODO: 4. Mark current relation as "updating". + // 3. Mark current relation as "updating". + self.catalog_manager + .start_replace_table_procedure(stream_job.table().unwrap()) + .await?; - // 5. Resolve the downstream fragments, extend the fragment graph to a complete graph that - // contains all information needed for building the actor graph. + Ok(fragment_graph) + } + + /// `build_replace_table` builds a table replacement and returns the context and new table + /// fragments. + async fn build_replace_table( + &self, + env: StreamEnvironment, + stream_job: &StreamingJob, + fragment_graph: StreamFragmentGraph, + table_col_index_mapping: ColIndexMapping, + ) -> MetaResult<(ReplaceTableContext, TableFragments)> { + let id = stream_job.id(); + let default_parallelism = fragment_graph.default_parallelism(); + + // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete + // graph that contains all information needed for building the actor graph. + let original_table_fragment = self.fragment_manager.get_mview_fragment(id.into()).await?; + + // Map the column indices in the dispatchers with the given mapping. let downstream_fragments = self .fragment_manager .get_downstream_chain_fragments(id.into()) - .await?; + .await? + .into_iter() + .map(|(d, f)| Some((table_col_index_mapping.rewrite_dispatch_strategy(&d)?, f))) + .collect::>() + .context("failed to map columns")?; - let complete_graph = - CompleteStreamFragmentGraph::with_downstreams(fragment_graph, downstream_fragments)?; + let complete_graph = CompleteStreamFragmentGraph::with_downstreams( + fragment_graph, + original_table_fragment.fragment_id, + downstream_fragments, + )?; - // 6. Build the actor graph. + // 2. Build the actor graph. let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; let actor_graph_builder = ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; @@ -580,13 +629,34 @@ where .await?; assert!(dispatchers.is_empty()); - // 7. Build the table fragments structure that will be persisted in the stream manager, and + // 3. Assign a new dummy ID for the new table fragments. + // + // FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments + // with the real table ID, then replace the dummy table ID with the real table ID. This is a + // workaround for not having the version info in the fragment manager. + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; + + // 4. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + let table_fragments = TableFragments::new( + dummy_id.into(), + graph, + &building_locations.actor_locations, + env, + ); + + let old_table_fragments = self + .fragment_manager + .select_table_fragments_by_table_id(&id.into()) + .await?; let ctx = ReplaceTableContext { + old_table_fragments, merge_updates, building_locations, existing_locations, @@ -595,4 +665,27 @@ where Ok((ctx, table_fragments)) } + + async fn finish_replace_table( + &self, + stream_job: &StreamingJob, + ) -> MetaResult { + let StreamingJob::Table(None, table) = stream_job else { + unreachable!("unexpected job: {stream_job:?}") + }; + + self.catalog_manager + .finish_replace_table_procedure(table) + .await + } + + async fn cancel_replace_table(&self, stream_job: &StreamingJob) -> MetaResult<()> { + let StreamingJob::Table(None, table) = stream_job else { + unreachable!("unexpected job: {stream_job:?}") + }; + + self.catalog_manager + .cancel_replace_table_procedure(table) + .await + } } diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 0482bad4bf4e..7e1ff0b92caf 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::SourceId as ProstSourceId; @@ -484,15 +485,22 @@ where let stream_job = StreamingJob::Table(None, req.table.unwrap()); let fragment_graph = req.fragment_graph.unwrap(); + let table_col_index_mapping = + ColIndexMapping::from_protobuf(&req.table_col_index_mapping.unwrap()); - let _version = self + let version = self .ddl_controller - .run_command(DdlCommand::ReplaceStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::ReplaceTable( + stream_job, + fragment_graph, + table_col_index_mapping, + )) .await?; - Err(Status::unimplemented( - "replace table plan is not implemented yet", - )) + Ok(Response::new(ReplaceTablePlanResponse { + status: None, + version, + })) } async fn get_table( diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 89f03123e202..719dad39676a 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -29,11 +29,8 @@ use risingwave_pb::common::{worker_node, ActorInfo, ParallelUnit, WorkerNode, Wo use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment}; -use risingwave_pb::stream_plan::barrier::Mutation; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{ - DispatcherType, FragmentTypeFlag, PauseMutation, ResumeMutation, StreamActor, StreamNode, -}; +use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, UpdateActorsRequest, }; @@ -1129,11 +1126,7 @@ where tracing::trace!("reschedule plan: {:#?}", reschedule_fragment); self.barrier_scheduler - .run_multiple_commands(vec![ - Command::Plain(Some(Mutation::Pause(PauseMutation {}))), - Command::RescheduleFragment(reschedule_fragment), - Command::Plain(Some(Mutation::Resume(ResumeMutation {}))), - ]) + .run_command_with_paused(Command::RescheduleFragment(reschedule_fragment)) .await?; Ok(()) diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index aff8fceacfb5..1969ed62acd3 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -21,4 +21,4 @@ mod visit; pub use actor::{ActorGraphBuildResult, ActorGraphBuilder}; pub use fragment::{CompleteStreamFragmentGraph, StreamFragmentGraph}; pub use schedule::Locations; -pub use visit::visit_fragment; +pub use visit::{visit_fragment, visit_stream_node}; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 71cdd3db10cd..d82b34ed12eb 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -17,7 +17,7 @@ use std::num::NonZeroUsize; use std::ops::Deref; use std::sync::LazyLock; -use anyhow::{anyhow, Context}; +use anyhow::Context; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bail; @@ -469,27 +469,18 @@ impl CompleteStreamFragmentGraph { /// downstream existing `Chain` fragments. pub fn with_downstreams( graph: StreamFragmentGraph, - downstream_fragments: Vec, + original_table_fragment_id: FragmentId, + downstream_fragments: Vec<(DispatchStrategy, Fragment)>, ) -> MetaResult { let mut extra_downstreams = HashMap::new(); let mut extra_upstreams = HashMap::new(); - let original_table_fragment_id = GlobalFragmentId::new( - downstream_fragments - .iter() - .flat_map(|f| f.upstream_fragment_ids.iter().copied()) - .unique() - .exactly_one() - .map_err(|_| { - anyhow!("downstream fragments must have exactly one upstream fragment") - })?, - ); - + let original_table_fragment_id = GlobalFragmentId::new(original_table_fragment_id); let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id()); // Build the extra edges between the `Materialize` and the downstream `Chain` of the // existing materialized views. - for fragment in &downstream_fragments { + for (dispatch_strategy, fragment) in &downstream_fragments { let id = GlobalFragmentId::new(fragment.fragment_id); let edge = StreamFragmentEdge { @@ -497,13 +488,7 @@ impl CompleteStreamFragmentGraph { original_upstream_fragment_id: original_table_fragment_id, downstream_fragment_id: id, }, - // We always use `NoShuffle` for the exchange between the upstream `Materialize` - // and the downstream `Chain` of the new materialized view. - dispatch_strategy: DispatchStrategy { - r#type: DispatcherType::NoShuffle as _, - dist_key_indices: vec![], // not used - output_indices: vec![], // FIXME: should erase the changes of the schema - }, + dispatch_strategy: dispatch_strategy.clone(), }; extra_downstreams @@ -520,7 +505,7 @@ impl CompleteStreamFragmentGraph { let existing_fragments = downstream_fragments .into_iter() - .map(|f| (GlobalFragmentId::new(f.fragment_id), f)) + .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)) .collect(); Ok(Self { diff --git a/src/meta/src/stream/stream_graph/visit.rs b/src/meta/src/stream/stream_graph/visit.rs index d9f399f5f700..a7592888e230 100644 --- a/src/meta/src/stream/stream_graph/visit.rs +++ b/src/meta/src/stream/stream_graph/visit.rs @@ -17,9 +17,8 @@ use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragment; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{agg_call_state, StreamNode}; -/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a -/// [`StreamFragment`] recursively. -pub fn visit_fragment(fragment: &mut StreamFragment, mut f: F) +/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s recursively. +pub fn visit_stream_node(stream_node: &mut StreamNode, mut f: F) where F: FnMut(&mut NodeBody), { @@ -33,7 +32,16 @@ where } } - visit_inner(fragment.node.as_mut().unwrap(), &mut f) + visit_inner(stream_node, &mut f) +} + +/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a +/// [`StreamFragment`] recursively. +pub fn visit_fragment(fragment: &mut StreamFragment, f: F) +where + F: FnMut(&mut NodeBody), +{ + visit_stream_node(fragment.node.as_mut().unwrap(), f) } /// Visit the internal tables of a [`StreamFragment`]. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 2d243699dec7..edc18894f8d8 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -60,7 +60,7 @@ pub struct CreateStreamingJobContext { pub existing_locations: Locations, /// The properties of the streaming job. - // TODO: directly store `StreamingJob here. + // TODO: directly store `StreamingJob` here. pub table_properties: HashMap, /// DDL definition. @@ -127,6 +127,9 @@ type CreatingStreamingJobInfoRef = Arc; /// /// Note: for better readability, keep this struct complete and immutable once created. pub struct ReplaceTableContext { + /// The old table fragments to be replaced. + pub old_table_fragments: TableFragments, + /// The updates to be applied to the downstream chain actors. Used for schema change. pub merge_updates: Vec, @@ -307,19 +310,11 @@ where res } - async fn create_streaming_job_impl( + async fn build_actors( &self, - revert_funcs: &mut Vec>, - table_fragments: TableFragments, - CreateStreamingJobContext { - dispatchers, - upstream_mview_actors, - table_properties, - building_locations, - existing_locations, - definition, - .. - }: CreateStreamingJobContext, + table_fragments: &TableFragments, + building_locations: &Locations, + existing_locations: &Locations, ) -> MetaResult<()> { let actor_map = table_fragments.actor_map(); @@ -363,21 +358,6 @@ where .await?; } - // Register to compaction group beforehand. - let hummock_manager_ref = self.hummock_manager.clone(); - let registered_table_ids = hummock_manager_ref - .register_table_fragments(&table_fragments, &table_properties) - .await?; - debug_assert_eq!( - registered_table_ids.len(), - table_fragments.all_table_ids().count() - ); - revert_funcs.push(Box::pin(async move { - if let Err(e) = hummock_manager_ref.unregister_table_ids(®istered_table_ids).await { - tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e); - } - })); - // In the second stage, each [`WorkerNode`] builds local actors and connect them with // channels. for (worker_id, actors) in building_worker_actors { @@ -394,6 +374,41 @@ where .await?; } + Ok(()) + } + + async fn create_streaming_job_impl( + &self, + revert_funcs: &mut Vec>, + table_fragments: TableFragments, + CreateStreamingJobContext { + dispatchers, + upstream_mview_actors, + table_properties, + building_locations, + existing_locations, + definition, + .. + }: CreateStreamingJobContext, + ) -> MetaResult<()> { + // Register to compaction group beforehand. + let hummock_manager_ref = self.hummock_manager.clone(); + let registered_table_ids = hummock_manager_ref + .register_table_fragments(&table_fragments, &table_properties) + .await?; + debug_assert_eq!( + registered_table_ids.len(), + table_fragments.all_table_ids().count() + ); + revert_funcs.push(Box::pin(async move { + if let Err(e) = hummock_manager_ref.unregister_table_ids(®istered_table_ids).await { + tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e); + } + })); + + self.build_actors(&table_fragments, &building_locations, &existing_locations) + .await?; + // Add table fragments to meta store with state: `State::Initial`. self.fragment_manager .start_create_table_fragments(table_fragments.clone()) @@ -423,6 +438,45 @@ where Ok(()) } + pub async fn replace_table( + &self, + table_fragments: TableFragments, + ReplaceTableContext { + old_table_fragments, + merge_updates, + building_locations, + existing_locations, + table_properties: _, + }: ReplaceTableContext, + ) -> MetaResult<()> { + self.build_actors(&table_fragments, &building_locations, &existing_locations) + .await?; + + // Add table fragments to meta store with state: `State::Initial`. + self.fragment_manager + .start_create_table_fragments(table_fragments.clone()) + .await?; + + let dummy_table_id = table_fragments.table_id(); + + if let Err(err) = self + .barrier_scheduler + .run_command_with_paused(Command::ReplaceTable { + old_table_fragments, + new_table_fragments: table_fragments, + merge_updates, + }) + .await + { + self.fragment_manager + .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(dummy_table_id))) + .await?; + return Err(err); + } + + Ok(()) + } + /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will /// be ignored because the recovery process will take over it in cleaning part. Check /// [`Command::DropStreamingJobs`] for details. diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e05e47fab5a9..a65aba1c6f37 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::{ @@ -330,10 +331,12 @@ impl MetaClient { &self, table: ProstTable, graph: StreamFragmentGraph, + table_col_index_mapping: ColIndexMapping, ) -> Result { let request = ReplaceTablePlanRequest { table: Some(table), fragment_graph: Some(graph), + table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), }; let resp = self.inner.replace_table_plan(request).await?; // TODO: handle error in `resp.status` here diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index be5a81f0738e..40fd5fe5c197 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -108,12 +108,12 @@ impl MergeExecutor { } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(self: Box) { + async fn execute_inner(mut self: Box) { // Futures of all active upstreams. let select_all = SelectReceivers::new(self.actor_context.id, self.upstreams); let actor_id = self.actor_context.id; let actor_id_str = actor_id.to_string(); - let upstream_fragment_id_str = self.upstream_fragment_id.to_string(); + let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string(); // Channels that're blocked by the barrier to align. let mut start_time = minstant::Instant::now(); @@ -147,7 +147,16 @@ impl MergeExecutor { if let Some(update) = barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id) { - assert!(update.new_upstream_fragment_id.is_none()); + let new_upstream_fragment_id = update + .new_upstream_fragment_id + .unwrap_or(self.upstream_fragment_id); + let added_upstream_actor_id = update.added_upstream_actor_id.clone(); + let removed_upstream_actor_id: HashSet<_> = + if update.new_upstream_fragment_id.is_some() { + select_all.upstream_actor_ids().iter().copied().collect() + } else { + update.removed_upstream_actor_id.iter().copied().collect() + }; // `Watermark` of upstream may become stale after upstream scaling. select_all @@ -155,10 +164,9 @@ impl MergeExecutor { .values_mut() .for_each(|buffers| buffers.clear()); - if !update.added_upstream_actor_id.is_empty() { + if !added_upstream_actor_id.is_empty() { // Create new upstreams receivers. - let new_upstreams: Vec<_> = update - .added_upstream_actor_id + let new_upstreams: Vec<_> = added_upstream_actor_id .iter() .map(|&upstream_actor_id| { new_input( @@ -167,7 +175,7 @@ impl MergeExecutor { self.actor_context.id, self.fragment_id, upstream_actor_id, - self.upstream_fragment_id, + new_upstream_fragment_id, ) }) .try_collect() @@ -188,30 +196,25 @@ impl MergeExecutor { .buffered_watermarks .values_mut() .for_each(|buffers| { - buffers.add_buffers(update.added_upstream_actor_id.clone()) + buffers.add_buffers(added_upstream_actor_id.clone()) }); } - if !update.get_removed_upstream_actor_id().is_empty() { + if !removed_upstream_actor_id.is_empty() { // Remove upstreams. - select_all.remove_upstreams( - &update.removed_upstream_actor_id.iter().copied().collect(), - ); + select_all.remove_upstreams(&removed_upstream_actor_id); for buffers in select_all.buffered_watermarks.values_mut() { // Call `check_heap` in case the only upstream(s) that does not have // watermark in heap is removed - buffers.remove_buffer( - update.removed_upstream_actor_id.iter().copied().collect(), - ); + buffers.remove_buffer(removed_upstream_actor_id.clone()); } } - if !update.added_upstream_actor_id.is_empty() - || !update.get_removed_upstream_actor_id().is_empty() - { - select_all.update_actor_ids(); - } + self.upstream_fragment_id = new_upstream_fragment_id; + upstream_fragment_id_str = new_upstream_fragment_id.to_string(); + + select_all.update_actor_ids(); } } } @@ -363,6 +366,10 @@ impl SelectReceivers { .extend(upstreams.into_iter().map(|s| s.into_future())); } + fn upstream_actor_ids(&self) -> &[ActorId] { + &self.upstream_actor_ids + } + fn update_actor_ids(&mut self) { self.upstream_actor_ids = self .blocked diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 87c6e4695f3b..5dd16310c739 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -115,7 +115,7 @@ impl Executor for ReceiverExecutor { fn execute(mut self: Box) -> BoxedMessageStream { let actor_id = self.actor_context.id; let actor_id_str = actor_id.to_string(); - let upstream_fragment_id_str = self.upstream_fragment_id.to_string(); + let mut upstream_fragment_id_str = self.upstream_fragment_id.to_string(); let stream = #[try_stream] async move { @@ -149,13 +149,23 @@ impl Executor for ReceiverExecutor { if let Some(update) = barrier .as_update_merge(self.actor_context.id, self.upstream_fragment_id) { + let new_upstream_fragment_id = update + .new_upstream_fragment_id + .unwrap_or(self.upstream_fragment_id); + let added_upstream_actor_id = update.added_upstream_actor_id.clone(); + let removed_upstream_actor_id: Vec<_> = + if update.new_upstream_fragment_id.is_some() { + vec![self.input.actor_id()] + } else { + update.removed_upstream_actor_id.clone() + }; + assert_eq!( - update.removed_upstream_actor_id, + removed_upstream_actor_id, vec![self.input.actor_id()], "the removed upstream actor should be the same as the current input" ); - let upstream_actor_id = *update - .added_upstream_actor_id + let upstream_actor_id = *added_upstream_actor_id .iter() .exactly_one() .expect("receiver should have exactly one upstream"); @@ -167,7 +177,7 @@ impl Executor for ReceiverExecutor { self.actor_context.id, self.fragment_id, upstream_actor_id, - self.upstream_fragment_id, + new_upstream_fragment_id, ) .context("failed to create upstream input")?; @@ -178,6 +188,9 @@ impl Executor for ReceiverExecutor { // Replace the input. self.input = new_upstream; + + self.upstream_fragment_id = new_upstream_fragment_id; + upstream_fragment_id_str = new_upstream_fragment_id.to_string(); } } };