-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(source): fix panic for ALTER SOURCE
with schema registry
#17293
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
if cfg!(debug_assertions) { | ||
// validate column ids | ||
// Note: this just documents how it works currently. It doesn't mean whether it's reasonable. | ||
if let Some(ref columns) = columns { | ||
let mut i = 1; | ||
fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec<ColumnCatalog>) { | ||
for nested_col in &col.field_descs { | ||
// What's the usage of struct fields' column IDs? | ||
check_col(nested_col, i, columns); | ||
} | ||
assert!( | ||
col.column_id.get_id() == *i as i32, | ||
"unexpected column id\ncol: {col:?}\ni: {i}\ncolumns: {columns:#?}" | ||
); | ||
*i += 1; | ||
} | ||
for col in columns { | ||
check_col(&col.column_desc, &mut i, columns); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should use ColumnId::placeholder()
to assign IDs, because later we will re-assign column ids with col_id_gen
, or use col_id_gen
here directly, but it would be very intrusive.
@@ -894,6 +895,7 @@ pub(super) async fn handle_create_table_plan( | |||
with_version_column: Option<String>, | |||
include_column_options: IncludeOption, | |||
) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> { | |||
let col_id_gen = ColumnIdGenerator::new_initial(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move arg to eliminate unnecessary param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I believe we reused this function for ALTER TABLE
prior to some refactoring and that's why we took an argument of column id generator. 🤡
295121d
to
74040b6
Compare
let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); | ||
let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); | ||
// The newly resolved columns' column IDs also starts from 1. They cannot be used directly. | ||
let mut next_col_id = max_column_id(&original_source.columns).next(); | ||
for col in &mut added_columns { | ||
col.column_desc.column_id = next_col_id; | ||
next_col_id = next_col_id.next(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the real fix of the issue. Other changes are just refactoring/debugging
ALTER SOURCE
with schema registry
let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); | ||
bound_column.column_desc.column_id = columns | ||
.iter() | ||
.fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) | ||
.next(); | ||
bound_column.column_desc.column_id = max_column_id(columns).next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can see ALTER SOURCE ADD COLUMN
use this solution. Actually bind_sql_columns
previously takes col_id_gen
as a param, but it's changed and use ColumnId::placeholder()
after the refactor :( #10307
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -894,6 +895,7 @@ pub(super) async fn handle_create_table_plan( | |||
with_version_column: Option<String>, | |||
include_column_options: IncludeOption, | |||
) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> { | |||
let col_id_gen = ColumnIdGenerator::new_initial(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I believe we reused this function for ALTER TABLE
prior to some refactoring and that's why we took an argument of column id generator. 🤡
@@ -70,6 +70,9 @@ impl LogicalSource { | |||
ctx: OptimizerContextRef, | |||
as_of: Option<AsOf>, | |||
) -> Result<Self> { | |||
// XXX: should we reorder the columns? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the order does not matter much. The columns field is essentially a map indexed by the column id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, it's just for what users will see in SELECT *
.
But I'm not sure if we rely on the position of hidden column like _row_id
somewhere.. For projected_row_id
we do so...
.iter() | ||
.fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) | ||
.next(); | ||
bound_column.column_desc.column_id = max_column_id(columns).next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong: we actually can directly go through the path of planning a completely new source catalog without keeping the consistency for column ids between the old and the new one. The current approach is just to be more compatible with ALTER TABLE
, in case of future extension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the same, but without confidence.
#17336 change a position to unify the additional columns related logic |
I finally found the reason of CI failure, and why I can't reproduce it locally: When using Unfortunately
|
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
… (#17353) Signed-off-by: xxchan <xxchan22f@gmail.com> Co-authored-by: xxchan <xxchan22f@gmail.com>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
fix #16486
Note that it doesn't panic only when columns are added at the end, but actually only few cases of
ALTER SOURCE
won't panic.The root cause is quite clear: when the schema registry is refreshed, the newly resolved columns still use column ids from 1. So the updated SourceCatalog contains duplicated column IDs.
Why protobuf test works previously? Or when it won't panic?
Test: https://github.com/risingwavelabs/risingwave/blob/dec1c4f0d8e9400b98888e923f941e9b54d40c3e/e2e_test/schema_registry/alter_sr.slt
It just happen to work, and since the test proto file has a
struct
field, whose field occupies a column ID. And it works because of 2 mistakes combined together.e.g.,
When
CREATE SOURCE
In
bind_columns_from_source
: we get[a:#1, bar:#3 {bar.baz:#2}]
according to the schemaThen in
bind_create_source
: we usecol_id_gen
to "compact" the ids, and also added additional cols. We get[a:#1, bar:#2 {bar.baz:#2}, _rw_kafka_timestamp:#3, _row_id:#0]
(notebar.baz
is unchanged, although it doesn't matter, but is strange :)risingwave/src/frontend/src/handler/create_source.rs
Lines 1390 to 1392 in fbb597f
Then when adding field
b=3
toFoo
, andALTER SOURCE
, we will get:In
refresh_sr_and_get_columns_diff
, we only callbind_columns_from_source
, and get[a:#1, bar:#3 {bar.baz:#2}, b:#4]
.Note that we don't use
col_id_gen
here!We calculated
added_columns
, and gotb:#4
. And simplyextend
it to the original columns.Note that we don't compare hidden columns here (
_rw_kafka_timestamp
)!So there are a lot of edge cases to make it fail or crash:
INCLUDE timestamp
to make_rw_kafka_timestamp
not hidden,ALTER SOURCE src_user REFRESH SCHEMA
will fail with:this altering statement will drop columns, which is not supported yet: (_rw_kafka_timestamp: timestamp with time zone)
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.