Skip to content

Commit

Permalink
fix executor bug
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Aug 30, 2024
1 parent a7622f4 commit 15d8287
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
25 changes: 18 additions & 7 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}

let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();

tracing::info!(
actor_id = self.actor_ctx.id,
"source backfill finished. Enter forward stage"
);
// All splits finished backfilling. Now we only forward the source data.
#[for_await]
for msg in input {
Expand Down Expand Up @@ -630,13 +633,19 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
let persisted_states = self.backfill_state_store.scan().await?;
let actor_id = self.actor_ctx.id;
tracing::debug!(
actor_id,
"checking whether source backfill is finished, persisted_states: {:?}, states: {:?}",
persisted_states,
states
);
Ok(states
.values()
.all(|state| matches!(state, BackfillState::Finished))
&& self
.backfill_state_store
.scan()
.await?
&& !persisted_states.is_empty()
&& persisted_states
.into_iter()
.all(|state| matches!(state, BackfillState::Finished)))
}
Expand Down Expand Up @@ -780,8 +789,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
BackfillState::Finished => {}
_ => {
return Err(anyhow::anyhow!(
"Unexpected backfill state: {:?}",
backfill_state
"Unexpected backfill state in update_state_if_changed_forward_stage: {:?}, target_splits: {:?}, current_splits: {:?}",
backfill_state,
target_splits,
current_splits
)
.into());
}
Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ impl<S: StateStore> SourceExecutor<S> {
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// Note that this is only a performance optimization.
// For correctness, the shared SourceExecutor can start from anywhere.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
if ele.is_cdc_split() {
// shared CDC source already starts from latest.
Expand All @@ -487,7 +489,7 @@ impl<S: StateStore> SourceExecutor<S> {
SplitImpl::Kafka(split) => {
split.seek_to_latest_offset();
}
_ => unreachable!("only kafka source can be shared, got {:?}", ele),
_ => unreachable!("got a non-shareable connector: {:?}", ele),
}
}
}
Expand Down

0 comments on commit 15d8287

Please sign in to comment.