Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add revision for rescheduling process #10199

Merged
merged 7 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ message GetClusterInfoResponse {
repeated TableFragments table_fragments = 2;
map<uint32, source.ConnectorSplits> actor_splits = 3;
map<uint32, catalog.Source> source_infos = 4;
uint64 revision = 5;
}

message RescheduleRequest {
Expand All @@ -334,10 +335,12 @@ message RescheduleRequest {
}
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 1;
uint64 revision = 2;
}

message RescheduleResponse {
bool success = 1;
uint64 revision = 2;
}

service ScaleService {
Expand Down
3 changes: 3 additions & 0 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
source_infos: _,
table_fragments,
mut actor_splits,
revision: _,
} = get_cluster_info(context).await?;

for table_fragment in &table_fragments {
Expand Down Expand Up @@ -79,6 +80,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
table_fragments,
actor_splits: _,
source_infos: _,
revision,
} = get_cluster_info(context).await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
Expand Down Expand Up @@ -167,6 +169,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
}

println!("{table}");
println!("Revision: {}", revision);

Ok(())
}
21 changes: 18 additions & 3 deletions src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ const RESCHEDULE_ADDED_KEY: &str = "added";
// removed_parallel_units: [],
// },
// }
pub async fn reschedule(context: &CtlContext, mut plan: String, dry_run: bool) -> Result<()> {
pub async fn reschedule(
context: &CtlContext,
mut plan: String,
dry_run: bool,
revision: u64,
) -> Result<()> {
let meta_client = context.meta_client().await?;

let regex = Regex::new(RESCHEDULE_MATCH_REGEXP)?;
Expand Down Expand Up @@ -114,8 +119,18 @@ pub async fn reschedule(context: &CtlContext, mut plan: String, dry_run: bool) -

if !dry_run {
println!("---------------------------");
let resp = meta_client.reschedule(reschedules).await?;
println!("Response from meta {}", resp);
let (success, revision) = meta_client.reschedule(reschedules, revision).await?;

if !success {
println!(
"Reschedule failed, please check the plan or the revision, current revision is {}",
revision
);

return Err(anyhow!("reschedule failed"));
}

println!("Reschedule success, current revision is {}", revision);
}

Ok(())
Expand Down
11 changes: 8 additions & 3 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ enum MetaCommands {
/// Show the plan only, no actual operation
#[clap(long)]
dry_run: bool,
/// Revision of the plan
#[clap(long)]
revision: u64,
},
/// backup meta by taking a meta snapshot
BackupMeta,
Expand Down Expand Up @@ -348,9 +351,11 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Meta(MetaCommands::SourceSplitInfo) => {
cmd_impl::meta::source_split_info(context).await?
}
Commands::Meta(MetaCommands::Reschedule { plan, dry_run }) => {
cmd_impl::meta::reschedule(context, plan, dry_run).await?
}
Commands::Meta(MetaCommands::Reschedule {
plan,
dry_run,
revision,
}) => cmd_impl::meta::reschedule(context, plan, dry_run, revision).await?,
Commands::Meta(MetaCommands::BackupMeta) => cmd_impl::meta::backup_meta(context).await?,
Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ pub enum Command {
///
/// Barriers from which actors should be collected, and the post behavior of this command are
/// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
RescheduleFragment(HashMap<FragmentId, Reschedule>),
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
},

/// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is
/// essentially switching the downstream of the old table fragments to the new ones, and
Expand Down Expand Up @@ -159,7 +161,7 @@ impl Command {
Command::CancelStreamingJob(table_fragments) => {
CommandChanges::DropTables(std::iter::once(table_fragments.table_id()).collect())
}
Command::RescheduleFragment(reschedules) => {
Command::RescheduleFragment { reschedules, .. } => {
let to_add = reschedules
.values()
.flat_map(|r| r.added_actors.iter().copied())
Expand Down Expand Up @@ -319,7 +321,7 @@ where
}))
}

Command::RescheduleFragment(reschedules) => {
Command::RescheduleFragment { reschedules, .. } => {
let mut dispatcher_update = HashMap::new();
for (_fragment_id, reschedule) in reschedules.iter() {
for &(upstream_fragment_id, dispatcher_id) in
Expand Down Expand Up @@ -613,7 +615,7 @@ where
.await;
}

Command::RescheduleFragment(reschedules) => {
Command::RescheduleFragment { reschedules } => {
let mut node_dropped_actors = HashMap::new();
for table_fragments in self.fragment_manager.list_table_fragments().await? {
for fragment_id in table_fragments.fragments.keys() {
Expand Down
83 changes: 70 additions & 13 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ use tokio::sync::{RwLock, RwLockReadGuard};

use crate::barrier::Reschedule;
use crate::manager::cluster::WorkerId;
use crate::manager::{commit_meta, MetaSrvEnv};
use crate::manager::{commit_meta, commit_meta_with_trx, MetaSrvEnv};
use crate::model::{
ActorId, BTreeMapTransaction, FragmentId, MetadataModel, MigrationPlan, TableFragments,
ValTransaction,
};
use crate::storage::{MetaStore, Transaction};
use crate::stream::SplitAssignment;
use crate::stream::{SplitAssignment, TableRevision};
use crate::MetaResult;

pub struct FragmentManagerCore {
table_fragments: BTreeMap<TableId, TableFragments>,
table_revision: TableRevision,
}

impl FragmentManagerCore {
Expand Down Expand Up @@ -102,9 +103,14 @@ where
.map(|tf| (tf.table_id(), tf))
.collect();

let table_revision = TableRevision::get(env.meta_store()).await?;

Ok(Self {
env,
core: RwLock::new(FragmentManagerCore { table_fragments }),
core: RwLock::new(FragmentManagerCore {
table_fragments,
table_revision,
}),
})
}

Expand All @@ -118,6 +124,10 @@ where
Ok(map.values().cloned().collect())
}

pub async fn get_revision(&self) -> TableRevision {
self.core.read().await.table_revision
}

pub async fn has_any_table_fragments(&self) -> bool {
!self.core.read().await.table_fragments.is_empty()
}
Expand Down Expand Up @@ -246,7 +256,9 @@ where
dummy_table_id: TableId,
merge_updates: &[MergeUpdate],
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;
let map = &mut guard.table_fragments;

let mut table_fragments = BTreeMapTransaction::new(map);

Expand Down Expand Up @@ -317,7 +329,17 @@ where
assert!(merge_updates.is_empty());

// Commit changes and notify about the changes.
commit_meta!(self, table_fragments)?;
let mut trx = Transaction::default();

// save next revision
let next_revision = current_revision.next();
next_revision.store(&mut trx);

// commit
commit_meta_with_trx!(self, trx, table_fragments)?;

// update revision in memory
guard.table_revision = next_revision;

// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
Expand Down Expand Up @@ -347,7 +369,10 @@ where
/// Drop table fragments info and remove downstream actor infos in fragments from its dependent
/// tables.
pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet<TableId>) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;

let map = &mut guard.table_fragments;
let to_delete_table_fragments = table_ids
.iter()
.filter_map(|table_id| map.get(table_id).cloned())
Expand Down Expand Up @@ -385,7 +410,16 @@ where
});
}
}
commit_meta!(self, table_fragments)?;

if table_ids.is_empty() {
commit_meta!(self, table_fragments)?;
} else {
let mut trx = Transaction::default();
let next_revision = current_revision.next();
next_revision.store(&mut trx);
commit_meta_with_trx!(self, trx, table_fragments)?;
guard.table_revision = next_revision;
}

for table_fragments in to_delete_table_fragments {
if table_fragments.state() != State::Initial {
Expand Down Expand Up @@ -452,11 +486,19 @@ where
}
if updated {
table_fragment.update_vnode_mapping(&migration_plan.parallel_unit_plan);
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_revision = guard.table_revision;
let map = &mut guard.table_fragments;
if map.contains_key(&table_fragment.table_id()) {
let mut txn = BTreeMapTransaction::new(map);
txn.insert(table_fragment.table_id(), table_fragment.clone());
commit_meta!(self, txn)?;
let mut table_trx = BTreeMapTransaction::new(map);
table_trx.insert(table_fragment.table_id(), table_fragment.clone());

let next_revision = current_revision.next();
let mut trx = Transaction::default();
next_revision.store(&mut trx);
commit_meta_with_trx!(self, trx, table_trx)?;
guard.table_revision = next_revision;

self.notify_fragment_mapping(&table_fragment, Operation::Update)
.await;
}
Expand Down Expand Up @@ -606,7 +648,10 @@ where
&self,
mut reschedules: HashMap<FragmentId, Reschedule>,
) -> MetaResult<()> {
let map = &mut self.core.write().await.table_fragments;
let mut guard = self.core.write().await;
let current_version = guard.table_revision;

let map = &mut guard.table_fragments;

fn update_actors(
actors: &mut Vec<ActorId>,
Expand Down Expand Up @@ -821,7 +866,19 @@ where
}

assert!(reschedules.is_empty(), "all reschedules must be applied");
commit_meta!(self, table_fragments)?;

// new empty transaction
let mut trx = Transaction::default();

// save next revision
let next_revision = current_version.next();
next_revision.store(&mut trx);

// commit
commit_meta_with_trx!(self, trx, table_fragments)?;

// update revision in memory
guard.table_revision = next_revision;

for mapping in fragment_mapping_to_notify {
self.env
Expand Down
25 changes: 24 additions & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,33 @@ macro_rules! commit_meta {
}
};
}
pub(crate) use commit_meta;

/// `commit_meta_with_trx` is similar to `commit_meta`, but it accepts an external trx (transaction)
/// and commits it.
macro_rules! commit_meta_with_trx {
($manager:expr, $trx:ident, $($val_txn:expr),*) => {
{
async {
// Apply the change in `ValTransaction` to trx
$(
$val_txn.apply_to_txn(&mut $trx)?;
)*
// Commit to meta store
$manager.env.meta_store().txn($trx).await?;
// Upon successful commit, commit the change to in-mem meta
$(
$val_txn.commit();
)*
MetaResult::Ok(())
}.await
}
};
}

use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup};
pub(crate) use {commit_meta, commit_meta_with_trx};

use crate::manager::catalog::utils::{
alter_relation_rename, alter_relation_rename_refs, refcnt_dec_connection,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ where
}

async fn drop_streaming_job(&self, job_id: StreamingJobId) -> MetaResult<NotificationVersion> {
let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await;
let table_fragments = self
.fragment_manager
.select_table_fragments_by_table_id(&job_id.id().into())
Expand Down Expand Up @@ -561,6 +562,7 @@ where
fragment_graph: StreamFragmentGraphProto,
table_col_index_mapping: ColIndexMapping,
) -> MetaResult<NotificationVersion> {
let _streaming_job_lock = self.stream_manager.streaming_job_lock.lock().await;
let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap());

let fragment_graph = self
Expand Down
Loading