Skip to content

Commit

Permalink
feat: Add revision for rescheduling process (#10199)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <peng@risingwave-labs.com>
  • Loading branch information
shanicky authored Jun 13, 2023
1 parent 6347fa9 commit a6f38d9
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 31 deletions.
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ message GetClusterInfoResponse {
repeated TableFragments table_fragments = 2;
map<uint32, source.ConnectorSplits> actor_splits = 3;
map<uint32, catalog.Source> source_infos = 4;
uint64 revision = 5;
}

message RescheduleRequest {
Expand All @@ -334,10 +335,12 @@ message RescheduleRequest {
}
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 1;
uint64 revision = 2;
}

message RescheduleResponse {
bool success = 1;
uint64 revision = 2;
}

service ScaleService {
Expand Down
3 changes: 3 additions & 0 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
source_infos: _,
table_fragments,
mut actor_splits,
revision: _,
} = get_cluster_info(context).await?;

for table_fragment in &table_fragments {
Expand Down Expand Up @@ -79,6 +80,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
table_fragments,
actor_splits: _,
source_infos: _,
revision,
} = get_cluster_info(context).await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
Expand Down Expand Up @@ -167,6 +169,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
}

println!("{table}");
println!("Revision: {}", revision);

Ok(())
}
21 changes: 18 additions & 3 deletions src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ const RESCHEDULE_ADDED_KEY: &str = "added";
// removed_parallel_units: [],
// },
// }
pub async fn reschedule(context: &CtlContext, mut plan: String, dry_run: bool) -> Result<()> {
pub async fn reschedule(
context: &CtlContext,
mut plan: String,
dry_run: bool,
revision: u64,
) -> Result<()> {
let meta_client = context.meta_client().await?;

let regex = Regex::new(RESCHEDULE_MATCH_REGEXP)?;
Expand Down Expand Up @@ -114,8 +119,18 @@ pub async fn reschedule(context: &CtlContext, mut plan: String, dry_run: bool) -

if !dry_run {
println!("---------------------------");
let resp = meta_client.reschedule(reschedules).await?;
println!("Response from meta {}", resp);
let (success, revision) = meta_client.reschedule(reschedules, revision).await?;

if !success {
println!(
"Reschedule failed, please check the plan or the revision, current revision is {}",
revision
);

return Err(anyhow!("reschedule failed"));
}

println!("Reschedule success, current revision is {}", revision);
}

Ok(())
Expand Down
11 changes: 8 additions & 3 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ enum MetaCommands {
/// Show the plan only, no actual operation
#[clap(long)]
dry_run: bool,
/// Revision of the plan
#[clap(long)]
revision: u64,
},
/// backup meta by taking a meta snapshot
BackupMeta,
Expand Down Expand Up @@ -348,9 +351,11 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Meta(MetaCommands::SourceSplitInfo) => {
cmd_impl::meta::source_split_info(context).await?
}
Commands::Meta(MetaCommands::Reschedule { plan, dry_run }) => {
cmd_impl::meta::reschedule(context, plan, dry_run).await?
}
Commands::Meta(MetaCommands::Reschedule {
plan,
dry_run,
revision,
}) => cmd_impl::meta::reschedule(context, plan, dry_run, revision).await?,
Commands::Meta(MetaCommands::BackupMeta) => cmd_impl::meta::backup_meta(context).await?,
Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ pub enum Command {
///
/// Barriers from which actors should be collected, and the post behavior of this command are
/// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
RescheduleFragment(HashMap<FragmentId, Reschedule>),
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
/// essentially switching the downstream of the old table fragments to the new ones, and
Expand Down Expand Up @@ -159,7 +161,7 @@ impl Command {
Command::CancelStreamingJob(table_fragments) => {
CommandChanges::DropTables(std::iter::once(table_fragments.table_id()).collect())
}
Command::RescheduleFragment(reschedules) => {
Command::RescheduleFragment { reschedules, .. } => {
let to_add = reschedules
.values()
.flat_map(|r| r.added_actors.iter().copied())
Expand Down Expand Up @@ -319,7 +321,7 @@ where
}))
}

Command::RescheduleFragment(reschedules) => {
Command::RescheduleFragment { reschedules, .. } => {
let mut dispatcher_update = HashMap::new();
for (_fragment_id, reschedule) in reschedules.iter() {
for &(upstream_fragment_id, dispatcher_id) in
Expand Down Expand Up @@ -613,7 +615,7 @@ where
.await;
}

Command::RescheduleFragment(reschedules) => {
Command::RescheduleFragment { reschedules } => {
let mut node_dropped_actors = HashMap::new();
for table_fragments in self.fragment_manager.list_table_fragments().await? {
for fragment_id in table_fragments.fragments.keys() {
Expand Down
83 changes: 70 additions & 13 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ use tokio::sync::{RwLock, RwLockReadGuard};

use crate::barrier::Reschedule;
use crate::manager::cluster::WorkerId;
use crate::manager::{commit_meta, MetaSrvEnv};
use crate::manager::{commit_meta, commit_meta_with_trx, MetaSrvEnv};
use crate::model::{
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments,
ValTransaction,
};
use crate::storage::{MetaStore, Transaction};
use crate::stream::SplitAssignment;
use crate::stream::{SplitAssignment, TableRevision};
use crate::MetaResult;

pub struct FragmentManagerCore {
table_fragments: BTreeMap<TableId, TableFragments>,
table_revision: TableRevision,
}

impl FragmentManagerCore {
Expand Down Expand Up @@ -102,9 +103,14 @@ where
.map(|tf| (tf.table_id(), tf))
.collect();

let table_revision = TableRevision::get(env.meta_store()).await?;

Ok(Self {
env,
core: RwLock::new(FragmentManagerCore { table_fragments }),
core: RwLock::new(FragmentManagerCore {
table_fragments,
table_revision,
}),
})
}

Expand All @@ -118,6 +124,10 @@ where
Ok(map.values().cloned().collect())
}

pub async fn get_revision(&self) -> TableRevision {
self.core.read().await.table_revision
}

pub async fn has_any_table_fragments(&self) -> bool {
!self.core.read().await.table_fragments.is_empty()
}
Expand Down Expand Up @@ -246,7 +256,9 @@ where
dummy_table_id: TableId,
merge_updates: &[MergeUpdate],
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;
let map = &mut guard.table_fragments;

let mut table_fragments = BTreeMapTransaction::new(map);

Expand Down Expand Up @@ -317,7 +329,17 @@ where
assert!(merge_updates.is_empty());

// Commit changes and notify about the changes.
commit_meta!(self, table_fragments)?;
let mut trx = Transaction::default();

// save next revision
let next_revision = current_revision.next();
next_revision.store(&mut trx);

// commit
commit_meta_with_trx!(self, trx, table_fragments)?;

// update revision in memory
guard.table_revision = next_revision;

// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
Expand Down Expand Up @@ -347,7 +369,10 @@ where
/// Drop table fragments info and remove downstream actor infos in fragments from its dependent
/// tables.
pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet<TableId>) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;

let map = &mut guard.table_fragments;
let to_delete_table_fragments = table_ids
.iter()
.filter_map(|table_id| map.get(table_id).cloned())
Expand Down Expand Up @@ -385,7 +410,16 @@ where
});
}
}
commit_meta!(self, table_fragments)?;

if table_ids.is_empty() {
commit_meta!(self, table_fragments)?;
} else {
let mut trx = Transaction::default();
let next_revision = current_revision.next();
next_revision.store(&mut trx);
commit_meta_with_trx!(self, trx, table_fragments)?;
guard.table_revision = next_revision;
}

for table_fragments in to_delete_table_fragments {
if table_fragments.state() != State::Initial {
Expand Down Expand Up @@ -452,11 +486,19 @@ where
}
if updated {
table_fragment.update_vnode_mapping(&migration_plan.parallel_unit_plan);
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;
let map = &mut guard.table_fragments;
if map.contains_key(&table_fragment.table_id()) {
let mut txn = BTreeMapTransaction::new(map);
txn.insert(table_fragment.table_id(), table_fragment.clone());
commit_meta!(self, txn)?;
let mut table_trx = BTreeMapTransaction::new(map);
table_trx.insert(table_fragment.table_id(), table_fragment.clone());

let next_revision = current_revision.next();
let mut trx = Transaction::default();
next_revision.store(&mut trx);
commit_meta_with_trx!(self, trx, table_trx)?;
guard.table_revision = next_revision;

self.notify_fragment_mapping(&table_fragment, Operation::Update)
.await;
}
Expand Down Expand Up @@ -606,7 +648,10 @@ where
&self,
mut reschedules: HashMap<FragmentId, Reschedule>,
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_version = guard.table_revision;

let map = &mut guard.table_fragments;

fn update_actors(
actors: &mut Vec<ActorId>,
Expand Down Expand Up @@ -821,7 +866,19 @@ where
}

assert!(reschedules.is_empty(), "all reschedules must be applied");
commit_meta!(self, table_fragments)?;

// new empty transaction
let mut trx = Transaction::default();

// save next revision
let next_revision = current_version.next();
next_revision.store(&mut trx);

// commit
commit_meta_with_trx!(self, trx, table_fragments)?;

// update revision in memory
guard.table_revision = next_revision;

for mapping in fragment_mapping_to_notify {
self.env
Expand Down
25 changes: 24 additions & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,33 @@ macro_rules! commit_meta {
}
};
}
pub(crate) use commit_meta;

/// `commit_meta_with_trx` is similar to `commit_meta`, but it accepts an external trx (transaction)
/// and commits it.
macro_rules! commit_meta_with_trx {
($manager:expr, $trx:ident, $($val_txn:expr),*) => {
{
async {
// Apply the change in `ValTransaction` to trx
$(
$val_txn.apply_to_txn(&mut $trx)?;
)*
// Commit to meta store
$manager.env.meta_store().txn($trx).await?;
// Upon successful commit, commit the change to in-mem meta
$(
$val_txn.commit();
)*
MetaResult::Ok(())
}.await
}
};
}

use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup};
pub(crate) use {commit_meta, commit_meta_with_trx};

use crate::manager::catalog::utils::{
alter_relation_rename, alter_relation_rename_refs, refcnt_dec_connection,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ where
}

async fn drop_streaming_job(&self, job_id: StreamingJobId) -> MetaResult<NotificationVersion> {
let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await;
let table_fragments = self
.fragment_manager
.select_table_fragments_by_table_id(&job_id.id().into())
Expand Down Expand Up @@ -561,6 +562,7 @@ where
fragment_graph: StreamFragmentGraphProto,
table_col_index_mapping: ColIndexMapping,
) -> MetaResult<NotificationVersion> {
let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await;
let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap());

let fragment_graph = self
Expand Down
Loading

0 comments on commit a6f38d9

Please sign in to comment.