-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(snapshot-backfill): introduce state to snapshot backfill #19720
base: main
Are you sure you want to change the base?
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
641fa65
to
a3f0881
Compare
c2868dd
to
d5c4a10
Compare
d5c4a10
to
5567d22
Compare
5567d22
to
ba717b7
Compare
if self.streams.is_empty() { | ||
break Poll::Ready(Ok(None)); | ||
} | ||
for (vnode, stream) in &mut self.streams { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will polling the streams in vnode order cause starvation or data skew in downstream? Should we use FuturesUnordered
instead?
let uncommitted = backfill_state.uncommitted_state(); | ||
// TODO: apply to progress state table | ||
drop(uncommitted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we need to get the reference of the uncommitted state and drop it immediately?
@@ -105,6 +117,8 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> { | |||
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; | |||
debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier"); | |||
let should_backfill = first_barrier.epoch != first_recv_barrier.epoch; | |||
let mut backfill_state = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see we persist the state. Will it be done in a separated PR?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.