-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): throttling source based on storage stats #8049
Conversation
Codecov Report
@@ Coverage Diff @@
## main #8049 +/- ##
==========================================
- Coverage 71.63% 71.60% -0.03%
==========================================
Files 1133 1134 +1
Lines 182211 182302 +91
==========================================
+ Hits 130530 130544 +14
- Misses 51681 51758 +77
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This comment was marked as outdated.
This comment was marked as outdated.
IIUC, the semantics here indicates that we're not accepting any data into the stream. However, apart from the source executor, the DML executor can also yield data chunks into the stream. |
BTW we should not use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically LGTM.
please add some log about why to throttle when throttler pauses a stream and resumes it
for throttler in &mut self.throttlers { | ||
throttler.on_barrier(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need multiple throttler in source exec
SourceThrottlerImpl::MaxWaitBarrier(inner) => inner.should_pause(), | ||
SourceThrottlerImpl::StateStore(inner) => { | ||
if let Some(hummock) = inner.as_hummock() { | ||
return hummock.need_write_throttling(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any hint on when hummock needs throttle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any hint on when hummock needs throttle?
when ingest_batch
finds that there are too many L0 SSTs
@@ -271,6 +274,18 @@ impl HummockStorage { | |||
pub fn get_pinned_version(&self) -> PinnedVersion { | |||
self.pinned_version.load().deref().deref().clone() | |||
} | |||
|
|||
pub fn need_write_throttling(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to distinguish between different groups of l0 ? , I think their write paths are independent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat! LGTM!
.values() | ||
.any(|levels| { | ||
levels.l0.as_ref().unwrap().sub_levels.len() | ||
> self.options.throttle_l0_sub_level_number as usize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we explain here the problem of deadlock when throttle_l0_sub_level_number < level0_tier_compact_file_number?
// we can guarantee the source is not paused since it received stream | ||
// chunks. | ||
self_paused = true; | ||
if !stream.paused() && self.throttlers.iter().any(|t| t.should_pause()) { | ||
stream.pause_source(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add warn!
logging to inform us/users that streaming is paused due to accumulated L0 SST files.
The DML executor is able to pause now via #8110. It pauses and resumes in a similar way as the source executor, so the new throttle can be implemented on the DML executor without much pain. |
# Conflicts: # src/common/src/config.rs # src/meta/src/lib.rs # src/meta/src/manager/env.rs # src/rpc_client/src/meta_client.rs # src/stream/src/executor/source/fs_source_executor.rs # src/stream/src/executor/source/source_executor.rs # src/stream/src/executor/stream_reader.rs
We need this throttle on the source executor to control the data stream from external connectors, based on which we could gain control over the whole graph. BTW, as another form of data source, maybe the DML executor requires a similar throttling mechanism on user data. cc. @BugenZhao @tabVersion |
Let's assume that DML won't have a too large throughput now. Basic back-pressure should work well for this case. |
We have decided to stall locally (ingest_batch), instead of pausing source. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Implement #7997. There will be two types of source throttlers:
As before
Note that
barrier_interval_ms
will be a mutable system param (not implemented yet). So MaxWaitBarrier throttler should be updated as well at that time.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note