-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(meta): refactor how upstream fragment is handled when creating stream job #14510
refactor(meta): refactor how upstream fragment is handled when creating stream job #14510
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
950e10e
to
494ead9
Compare
#[strum_discriminants(name(DdlType))] | ||
#[strum_discriminants(vis(pub))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A step back... 🤣
494ead9
to
b5c248d
Compare
// handle MV on MV | ||
|
||
// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` | ||
// of the new materialized view. | ||
let mview_fragment = upstream_root_fragments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can also be source_fragment
for MV on backfill-able source later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, isn't this also the same for CDC table..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, the only difference is how to get output_indices
, we have as_materialize
in this branch.
b5c248d
to
9985531
Compare
src/meta/src/controller/fragment.rs
Outdated
f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 | ||
} | ||
fragments.retain(|f| { | ||
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A normal table also contains a Source
fragment which should not filtered as the root fragment. That's why e2e fails I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think what to do for MV on reuseable source..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with a solution: search for Source fragment only if there's no Mview.
It doesn't work for MV with an upstream MV and an upstream source.
It should be: For each upstream table id, if it doesn't have a MV fragment, then look for its source fragment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we are looking for the bottom-most node in the graph. 🤔
Oh, that's why it's called "Root fragment" 🤣
@@ -668,6 +675,9 @@ impl CompleteStreamFragmentGraph { | |||
|
|||
(mview_id, edge) | |||
} | |||
DdlType::Source | DdlType::Table(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this branch reachable for schema change on a sink-into table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#14417 Oh, it's a TODO now. Never mind.
9985531
to
2aaa7f5
Compare
// look for Source fragment if there's no MView fragment | ||
fragments.insert(table_id, fragment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite counter-intu😄tive. May add some comments to show that this is for Source
job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's already on pub async fn get_upstream_root_fragments
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
// look for Source fragment if there's no MView fragment | ||
_ = fragments.try_insert(fragment.job_id, fragment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought I made a mistake, then I finally noticed it is try_insert
😅
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Mainly because when I trying to add source backfill, I found it's not easy to understand how to handle
TableJobType
correctly.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.