Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disallow online scaling for shared source #18355

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ pub struct ScaleController {

pub env: MetaSrvEnv,

/// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
/// e.g., a MV cannot be rescheduled during foreground backfill.
pub reschedule_lock: RwLock<()>,
}

Expand Down
23 changes: 12 additions & 11 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
// FIXME: consider SourceCatchingUp here?
yield Message::Chunk(chunk);
}
Message::Watermark(watermark) => {
Expand All @@ -721,19 +722,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {

/// All splits finished backfilling.
///
/// We check all splits for the source, including other actors' splits here, before going to the forward stage.
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
/// Note: we don't need to consider split migration (online scaling) here, so we can just check the splits assigned to this actor.
/// - For foreground DDL, scaling is not allowed during backfilling.
/// - For background DDL, scaling is skipped when backfilling is not finished, and can be triggered by recreating actors during recovery.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
Ok(states
.values()
.all(|state| matches!(state, BackfillState::Finished))
&& self
.backfill_state_store
.scan()
.await?
.into_iter()
.all(|state| matches!(state, BackfillState::Finished)))
.all(|state| matches!(state, BackfillState::Finished)))
}

/// For newly added splits, we do not need to backfill and can directly forward from upstream.
Expand Down Expand Up @@ -792,7 +789,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
// TODO: disallow online scaling during backfilling.
debug_assert!(
false,
"split migration during backfill stage should not happen"
);
target_state.insert(split_id, backfill_state);
}
}
Expand Down Expand Up @@ -883,6 +883,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. It should also be finished since we are in forwarding stage.
// FIXME: it's still possible to have SourceCatchingUp here if we want to consider it as finished..?
match backfill_state {
BackfillState::Finished => {}
_ => {
Expand Down
24 changes: 0 additions & 24 deletions src/stream/src/executor/source/source_backfill_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,6 @@ impl<S: StateStore> BackfillStateTableHandler<S> {
.map_err(StreamExecutorError::from)
}

/// XXX: we might get stale data for other actors' writes, but it's fine?
pub async fn scan(&self) -> StreamExecutorResult<Vec<BackfillState>> {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);

let state_table_iter = self
.state_store
.iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
.await?;
pin_mut!(state_table_iter);

let mut ret = vec![];
while let Some(item) = state_table_iter.next().await {
let row = item?.into_owned_row();
let state = match row.datum_at(1) {
Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())?
}
_ => unreachable!(),
};
ret.push(state);
}
Ok(ret)
}

async fn set(&mut self, key: SplitId, state: BackfillState) -> StreamExecutorResult<()> {
let row = [
Some(Self::string_to_scalar(key.as_ref())),
Expand Down
Loading