From a6f38d944df86b90923556320bd5222d9764eda1 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 13 Jun 2023 16:30:40 +0800 Subject: [PATCH] feat: Add revision for rescheduling process (#10199) Signed-off-by: Shanicky Chen --- proto/meta.proto | 3 + src/ctl/src/cmd_impl/meta/cluster_info.rs | 3 + src/ctl/src/cmd_impl/meta/reschedule.rs | 21 +++++- src/ctl/src/lib.rs | 11 ++- src/meta/src/barrier/command.rs | 10 +-- src/meta/src/manager/catalog/fragment.rs | 83 +++++++++++++++++++---- src/meta/src/manager/catalog/mod.rs | 25 ++++++- src/meta/src/rpc/ddl_controller.rs | 2 + src/meta/src/rpc/service/scale_service.rs | 23 ++++++- src/meta/src/stream/scale.rs | 54 ++++++++++++++- src/meta/src/stream/stream_manager.rs | 3 + src/rpc_client/src/meta_client.rs | 13 +++- src/tests/simulation/src/ctl_ext.rs | 14 ++++ 13 files changed, 234 insertions(+), 31 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 6d48e4994538..d70eb321f64c 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -325,6 +325,7 @@ message GetClusterInfoResponse { repeated TableFragments table_fragments = 2; map actor_splits = 3; map source_infos = 4; + uint64 revision = 5; } message RescheduleRequest { @@ -334,10 +335,12 @@ message RescheduleRequest { } // reschedule plan for each fragment map reschedules = 1; + uint64 revision = 2; } message RescheduleResponse { bool success = 1; + uint64 revision = 2; } service ScaleService { diff --git a/src/ctl/src/cmd_impl/meta/cluster_info.rs b/src/ctl/src/cmd_impl/meta/cluster_info.rs index d2c730346753..ff50f37c5d84 100644 --- a/src/ctl/src/cmd_impl/meta/cluster_info.rs +++ b/src/ctl/src/cmd_impl/meta/cluster_info.rs @@ -37,6 +37,7 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> { source_infos: _, table_fragments, mut actor_splits, + revision: _, } = get_cluster_info(context).await?; for table_fragment in &table_fragments { @@ -79,6 +80,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> { table_fragments, actor_splits: _, source_infos: _, + revision, } = get_cluster_info(context).await?; // Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)] @@ -167,6 +169,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> { } println!("{table}"); + println!("Revision: {}", revision); Ok(()) } diff --git a/src/ctl/src/cmd_impl/meta/reschedule.rs b/src/ctl/src/cmd_impl/meta/reschedule.rs index 41117f7ff65e..a5aa4c099cf5 100644 --- a/src/ctl/src/cmd_impl/meta/reschedule.rs +++ b/src/ctl/src/cmd_impl/meta/reschedule.rs @@ -52,7 +52,12 @@ const RESCHEDULE_ADDED_KEY: &str = "added"; // removed_parallel_units: [], // }, // } -pub async fn reschedule(context: &CtlContext, mut plan: String, dry_run: bool) -> Result<()> { +pub async fn reschedule( + context: &CtlContext, + mut plan: String, + dry_run: bool, + revision: u64, +) -> Result<()> { let meta_client = context.meta_client().await?; let regex = Regex::new(RESCHEDULE_MATCH_REGEXP)?; @@ -114,8 +119,18 @@ pub async fn reschedule(context: &CtlContext, mut plan: String, dry_run: bool) - if !dry_run { println!("---------------------------"); - let resp = meta_client.reschedule(reschedules).await?; - println!("Response from meta {}", resp); + let (success, revision) = meta_client.reschedule(reschedules, revision).await?; + + if !success { + println!( + "Reschedule failed, please check the plan or the revision, current revision is {}", + revision + ); + + return Err(anyhow!("reschedule failed")); + } + + println!("Reschedule success, current revision is {}", revision); } Ok(()) diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 30b2a30d7556..952c90a6594d 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -218,6 +218,9 @@ enum MetaCommands { /// Show the plan only, no actual operation #[clap(long)] dry_run: bool, + /// Revision of the plan + #[clap(long)] + revision: u64, }, /// backup meta by taking a meta snapshot BackupMeta, @@ -348,9 +351,11 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Meta(MetaCommands::SourceSplitInfo) => { cmd_impl::meta::source_split_info(context).await? } - Commands::Meta(MetaCommands::Reschedule { plan, dry_run }) => { - cmd_impl::meta::reschedule(context, plan, dry_run).await? - } + Commands::Meta(MetaCommands::Reschedule { + plan, + dry_run, + revision, + }) => cmd_impl::meta::reschedule(context, plan, dry_run, revision).await?, Commands::Meta(MetaCommands::BackupMeta) => cmd_impl::meta::backup_meta(context).await?, Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => { cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await? diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 55e148560a9e..046f60b97bdf 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -116,7 +116,9 @@ pub enum Command { /// /// Barriers from which actors should be collected, and the post behavior of this command are /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively. - RescheduleFragment(HashMap), + RescheduleFragment { + reschedules: HashMap, + }, /// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is /// essentially switching the downstream of the old table fragments to the new ones, and @@ -159,7 +161,7 @@ impl Command { Command::CancelStreamingJob(table_fragments) => { CommandChanges::DropTables(std::iter::once(table_fragments.table_id()).collect()) } - Command::RescheduleFragment(reschedules) => { + Command::RescheduleFragment { reschedules, .. } => { let to_add = reschedules .values() .flat_map(|r| r.added_actors.iter().copied()) @@ -319,7 +321,7 @@ where })) } - Command::RescheduleFragment(reschedules) => { + Command::RescheduleFragment { reschedules, .. } => { let mut dispatcher_update = HashMap::new(); for (_fragment_id, reschedule) in reschedules.iter() { for &(upstream_fragment_id, dispatcher_id) in @@ -613,7 +615,7 @@ where .await; } - Command::RescheduleFragment(reschedules) => { + Command::RescheduleFragment { reschedules } => { let mut node_dropped_actors = HashMap::new(); for table_fragments in self.fragment_manager.list_table_fragments().await? { for fragment_id in table_fragments.fragments.keys() { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 5a83a537b84d..8565b054ff89 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -36,17 +36,18 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use crate::barrier::Reschedule; use crate::manager::cluster::WorkerId; -use crate::manager::{commit_meta, MetaSrvEnv}; +use crate::manager::{commit_meta, commit_meta_with_trx, MetaSrvEnv}; use crate::model::{ ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments, ValTransaction, }; use crate::storage::{MetaStore, Transaction}; -use crate::stream::SplitAssignment; +use crate::stream::{SplitAssignment, TableRevision}; use crate::MetaResult; pub struct FragmentManagerCore { table_fragments: BTreeMap, + table_revision: TableRevision, } impl FragmentManagerCore { @@ -102,9 +103,14 @@ where .map(|tf| (tf.table_id(), tf)) .collect(); + let table_revision = TableRevision::get(env.meta_store()).await?; + Ok(Self { env, - core: RwLock::new(FragmentManagerCore { table_fragments }), + core: RwLock::new(FragmentManagerCore { + table_fragments, + table_revision, + }), }) } @@ -118,6 +124,10 @@ where Ok(map.values().cloned().collect()) } + pub async fn get_revision(&self) -> TableRevision { + self.core.read().await.table_revision + } + pub async fn has_any_table_fragments(&self) -> bool { !self.core.read().await.table_fragments.is_empty() } @@ -246,7 +256,9 @@ where dummy_table_id: TableId, merge_updates: &[MergeUpdate], ) -> MetaResult<()> { - let map = &mut self.core.write().await.table_fragments; + let mut guard = self.core.write().await; + let current_revision = guard.table_revision; + let map = &mut guard.table_fragments; let mut table_fragments = BTreeMapTransaction::new(map); @@ -317,7 +329,17 @@ where assert!(merge_updates.is_empty()); // Commit changes and notify about the changes. - commit_meta!(self, table_fragments)?; + let mut trx = Transaction::default(); + + // save next revision + let next_revision = current_revision.next(); + next_revision.store(&mut trx); + + // commit + commit_meta_with_trx!(self, trx, table_fragments)?; + + // update revision in memory + guard.table_revision = next_revision; // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table // catalog and need to access the old fragment. Let frontend nodes delete the old fragment @@ -347,7 +369,10 @@ where /// Drop table fragments info and remove downstream actor infos in fragments from its dependent /// tables. pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet) -> MetaResult<()> { - let map = &mut self.core.write().await.table_fragments; + let mut guard = self.core.write().await; + let current_revision = guard.table_revision; + + let map = &mut guard.table_fragments; let to_delete_table_fragments = table_ids .iter() .filter_map(|table_id| map.get(table_id).cloned()) @@ -385,7 +410,16 @@ where }); } } - commit_meta!(self, table_fragments)?; + + if table_ids.is_empty() { + commit_meta!(self, table_fragments)?; + } else { + let mut trx = Transaction::default(); + let next_revision = current_revision.next(); + next_revision.store(&mut trx); + commit_meta_with_trx!(self, trx, table_fragments)?; + guard.table_revision = next_revision; + } for table_fragments in to_delete_table_fragments { if table_fragments.state() != State::Initial { @@ -452,11 +486,19 @@ where } if updated { table_fragment.update_vnode_mapping(&migration_plan.parallel_unit_plan); - let map = &mut self.core.write().await.table_fragments; + let mut guard = self.core.write().await; + let current_revision = guard.table_revision; + let map = &mut guard.table_fragments; if map.contains_key(&table_fragment.table_id()) { - let mut txn = BTreeMapTransaction::new(map); - txn.insert(table_fragment.table_id(), table_fragment.clone()); - commit_meta!(self, txn)?; + let mut table_trx = BTreeMapTransaction::new(map); + table_trx.insert(table_fragment.table_id(), table_fragment.clone()); + + let next_revision = current_revision.next(); + let mut trx = Transaction::default(); + next_revision.store(&mut trx); + commit_meta_with_trx!(self, trx, table_trx)?; + guard.table_revision = next_revision; + self.notify_fragment_mapping(&table_fragment, Operation::Update) .await; } @@ -606,7 +648,10 @@ where &self, mut reschedules: HashMap, ) -> MetaResult<()> { - let map = &mut self.core.write().await.table_fragments; + let mut guard = self.core.write().await; + let current_version = guard.table_revision; + + let map = &mut guard.table_fragments; fn update_actors( actors: &mut Vec, @@ -821,7 +866,19 @@ where } assert!(reschedules.is_empty(), "all reschedules must be applied"); - commit_meta!(self, table_fragments)?; + + // new empty transaction + let mut trx = Transaction::default(); + + // save next revision + let next_revision = current_version.next(); + next_revision.store(&mut trx); + + // commit + commit_meta_with_trx!(self, trx, table_fragments)?; + + // update revision in memory + guard.table_revision = next_revision; for mapping in fragment_mapping_to_notify { self.env diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index a20488528102..da8f2aa812ff 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -86,10 +86,33 @@ macro_rules! commit_meta { } }; } -pub(crate) use commit_meta; + +/// `commit_meta_with_trx` is similar to `commit_meta`, but it accepts an external trx (transaction) +/// and commits it. +macro_rules! commit_meta_with_trx { + ($manager:expr, $trx:ident, $($val_txn:expr),*) => { + { + async { + // Apply the change in `ValTransaction` to trx + $( + $val_txn.apply_to_txn(&mut $trx)?; + )* + // Commit to meta store + $manager.env.meta_store().txn($trx).await?; + // Upon successful commit, commit the change to in-mem meta + $( + $val_txn.commit(); + )* + MetaResult::Ok(()) + }.await + } + }; +} + use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup}; +pub(crate) use {commit_meta, commit_meta_with_trx}; use crate::manager::catalog::utils::{ alter_relation_rename, alter_relation_rename_refs, refcnt_dec_connection, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c2fb0ca5dc3f..a63c81b78bd6 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -288,6 +288,7 @@ where } async fn drop_streaming_job(&self, job_id: StreamingJobId) -> MetaResult { + let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await; let table_fragments = self .fragment_manager .select_table_fragments_by_table_id(&job_id.id().into()) @@ -561,6 +562,7 @@ where fragment_graph: StreamFragmentGraphProto, table_col_index_mapping: ColIndexMapping, ) -> MetaResult { + let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await; let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); let fragment_graph = self diff --git a/src/meta/src/rpc/service/scale_service.rs b/src/meta/src/rpc/service/scale_service.rs index 27d5a24d835b..d69a9f69ac34 100644 --- a/src/meta/src/rpc/service/scale_service.rs +++ b/src/meta/src/rpc/service/scale_service.rs @@ -85,6 +85,8 @@ where &self, _: Request, ) -> Result, Status> { + let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await; + let table_fragments = self .fragment_manager .list_table_fragments() @@ -117,11 +119,14 @@ where let source_infos = sources.into_iter().map(|s| (s.id, s)).collect(); + let revision = self.fragment_manager.get_revision().await.inner(); + Ok(Response::new(GetClusterInfoResponse { worker_nodes, table_fragments, actor_splits, source_infos, + revision, })) } @@ -132,6 +137,17 @@ where ) -> Result, Status> { let req = request.into_inner(); + let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await; + + let current_revision = self.fragment_manager.get_revision().await; + + if req.revision != current_revision.inner() { + return Ok(Response::new(RescheduleResponse { + success: false, + revision: current_revision.inner(), + })); + } + self.stream_manager .reschedule_actors( req.reschedules @@ -162,6 +178,11 @@ where ) .await?; - Ok(Response::new(RescheduleResponse { success: true })) + let next_revision = self.fragment_manager.get_revision().await; + + Ok(Response::new(RescheduleResponse { + success: true, + revision: next_revision.into(), + })) } } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 874b7070bf90..5f274d438bb3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -39,9 +39,54 @@ use uuid::Uuid; use crate::barrier::{Command, Reschedule}; use crate::manager::{IdCategory, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; -use crate::storage::MetaStore; +use crate::storage::{MetaStore, MetaStoreError, Transaction, DEFAULT_COLUMN_FAMILY}; use crate::stream::GlobalStreamManager; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; + +#[derive(Copy, Clone, Debug)] +pub struct TableRevision(u64); + +const TABLE_REVISION_KEY: &[u8] = b"table_revision"; + +impl From for u64 { + fn from(value: TableRevision) -> Self { + value.0 + } +} + +impl TableRevision { + pub async fn get(store: &S) -> MetaResult + where + S: MetaStore, + { + let version = match store + .get_cf(DEFAULT_COLUMN_FAMILY, TABLE_REVISION_KEY) + .await + { + Ok(byte_vec) => memcomparable::from_slice(&byte_vec).unwrap(), + Err(MetaStoreError::ItemNotFound(_)) => 0, + Err(e) => return Err(MetaError::from(e)), + }; + + Ok(Self(version)) + } + + pub fn next(&self) -> Self { + TableRevision(self.0 + 1) + } + + pub fn store(&self, txn: &mut Transaction) { + txn.put( + DEFAULT_COLUMN_FAMILY.to_string(), + TABLE_REVISION_KEY.to_vec(), + memcomparable::to_vec(&self.0).unwrap(), + ); + } + + pub fn inner(&self) -> u64 { + self.0 + } +} #[derive(Debug)] pub struct ParallelUnitReschedule { @@ -557,6 +602,7 @@ where } return Err(e); } + Ok(()) } @@ -1127,7 +1173,9 @@ where tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); self.barrier_scheduler - .run_command_with_paused(Command::RescheduleFragment(reschedule_fragment)) + .run_command_with_paused(Command::RescheduleFragment { + reschedules: reschedule_fragment, + }) .await?; Ok(()) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 281a6a937672..d4a48aebcffb 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -171,6 +171,8 @@ pub struct GlobalStreamManager { creating_job_info: CreatingStreamingJobInfoRef, hummock_manager: HummockManagerRef, + + pub(crate) streaming_job_lock: Mutex<()>, } impl GlobalStreamManager @@ -193,6 +195,7 @@ where source_manager, hummock_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), + streaming_job_lock: Mutex::new(()), }) } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 39651136b5bb..a30ae91d0904 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -667,10 +667,17 @@ impl MetaClient { Ok(resp) } - pub async fn reschedule(&self, reschedules: HashMap) -> Result { - let request = RescheduleRequest { reschedules }; + pub async fn reschedule( + &self, + reschedules: HashMap, + revision: u64, + ) -> Result<(bool, u64)> { + let request = RescheduleRequest { + reschedules, + revision, + }; let resp = self.inner.reschedule(request).await?; - Ok(resp.success) + Ok((resp.success, resp.revision)) } pub async fn risectl_get_pinned_versions_summary( diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 432d0ceb04a2..cb285a09de3e 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -291,6 +291,18 @@ impl Cluster { pub async fn reschedule(&mut self, plan: impl Into) -> Result<()> { let plan = plan.into(); + let revision = self + .ctl + .spawn(async move { + let r = risingwave_ctl::cmd_impl::meta::get_cluster_info( + &risingwave_ctl::common::CtlContext::default(), + ) + .await?; + + Ok::<_, anyhow::Error>(r.revision) + }) + .await??; + self.ctl .spawn(async move { let opts = risingwave_ctl::CliOpts::parse_from([ @@ -299,6 +311,8 @@ impl Cluster { "reschedule", "--plan", plan.as_ref(), + "--revision", + &format!("{}", revision), ]); risingwave_ctl::start(opts).await })