Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(postgres-cdc): refactor postgres_row_to_owned_row #16714

Merged
merged 26 commits into from
May 28, 2024

Conversation

KeXiangWang
Copy link
Contributor

@KeXiangWang KeXiangWang commented May 13, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Try refactoring to resolve #16416 (comment)

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Sorry, something went wrong.

@KeXiangWang
Copy link
Contributor Author

KeXiangWang commented May 13, 2024

Before this PR, NULL::TYPE[] is not well handled, it will be parsed as {} instead of NULL. The bug is due to the following style of codes.

let mut builder = dtype.create_array_builder(0);
// do something
if let Some(v) = val {
    v.into_iter().for_each(|val| {
        builder.append(
            val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
        )
    });
}
// do something
Some(ScalarImpl::from(ListValue::new(builder.finish())))

The correct way to handle NULL::TYPE[] is to check if the value is None before creating the array builder.

if let Some(v) = val {
    let mut builder = dtype.create_array_builder(0);
    v.into_iter().for_each(|val| {
        builder.append(
            val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
        )
    });
    Some(ScalarImpl::from(ListValue::new(builder.finish())))
} else {
    None
}

@KeXiangWang
Copy link
Contributor Author

KeXiangWang commented May 13, 2024

An open question: Do you think we should also add UuidList(Vec<Option<uuid::Uuid>>) and EnumList(Vec<Option<EnumString>>) in ScalarAdapter?

@KeXiangWang
Copy link
Contributor Author

After refactoring, multi-dismension list should trivial to support. Will add it in next PR.

During refactoring, I find an interesting thing: while testing, I setup a postgres as data source and I run the test multiple times. The second round of test for the postgres will usually fail but the first and following tests will succeed🤣. Will continue to investigate.

@xiangjinwu
Copy link
Contributor

After refactoring, multi-dismension list should trivial to support. Will add it in next PR.

https://docs.rs/postgres-types/latest/postgres_types/trait.FromSql.html#arrays
https://docs.rs/postgres-types/latest/postgres_types/trait.ToSql.html#arrays

FromSql is implemented for Vec<T>, Box<[T]> and [T; N] where T implements FromSql, and corresponds to one-dimensional Postgres arrays.
ToSql is implemented for [u8; N], Vec<T>, &[T], Box<[T]> and [T; N] where T implements ToSql and N is const usize, and corresponds to one-dimensional Postgres arrays with an index offset of 1.

Related: #3811 (comment) #15614

@KeXiangWang
Copy link
Contributor Author

KeXiangWang commented May 23, 2024

During this refactoring, I found three another bugs of Debezium CDC.

#16895
Early timestamp is not handled correctly, so I have to use a timestamp close to now to avoid this error.
#16880
When enum in PG is recreated (delete and create with the same name), the type registry won't keep up with upstream so that Debezium cannot handle the enum correctly, so that lead to a NULL for this enum in debezium message. I have no solution to resolve, so temporarily didn't add test for this.
#16882
INTERVAL_ARRAY is not supported. Resolved by not allowing INTERVAL_ARRAY's into_scalar

BTW, besides these three, I suspect Debezium may not support "inf/-inf/nan" for some types. Will check later.

@KeXiangWang KeXiangWang requested a review from StrikeW May 23, 2024 04:14
(ScalarRefImpl::List(list), &Type::NUMERIC_ARRAY, _) => {
let mut vec = vec![];
for scalar in list.iter() {
vec.push(match scalar {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scalar should rename to datum, because Datum is Option<ScalarImpl>. And I am curious why not vec.push( datum.map(|scalar| ScalarAdapter::from_scalar(scalar, &Type::NUMERIC) ) ) to call the from_scalar recursively? Just add one more branch to handle (ScalarRefImpl::Decimal, &Type::NUMERIC) as base case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scalar should rename to datum, because Datum is Option

updated.

And I am curious why not vec.push( datum.map(|scalar| ScalarAdapter::from_scalar(scalar, &Type::NUMERIC) ) ) to call the from_scalar recursively? Just add one more branch to handle (ScalarRefImpl::Decimal, &Type::NUMERIC) as base case.

For two reason:

  1. from_scalar return ScalarAdapter directly, but ScalarAdapter::NumericList requires Vec<Option<PgNumeric>> actually.
  2. If we don't handle (ScalarRefImpl::Decimal, &Type::NUMERIC), ScalarRefImpl::Decimal will be builtin. We have implement ToSql for rw's Decimal, so it acceptale by PG. If we handle (ScalarRefImpl::Decimal, &Type::NUMERIC), we have to first convert it to string and then parse as PgNumeric, seems unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we handle (ScalarRefImpl::Decimal, &Type::NUMERIC), we have to first convert it to string and then parse as PgNumeric, seems unnecessary.

Your are right, I got it.

src/connector/src/parser/scalar_adapter.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

src/connector/src/parser/scalar_adapter.rs Show resolved Hide resolved
src/connector/src/parser/postgres.rs Show resolved Hide resolved
e2e_test/source/cdc/cdc.check_new_rows.slt Outdated Show resolved Hide resolved
Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};

impl<'a> FromSql<'a> for ScalarImpl {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These FromSql implementations LGTM, @xiangjinwu please also take a look

Copy link
Contributor

@xiangjinwu xiangjinwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the efforts to test all these tricky cases and help to get them organized!

Just a reminder to all of us: rather than going too far away to be bug-compatible with debezium, we may:

  • Disallow ingesting such data type at all, for example interval[]
  • Spend more time on fixing debezium. IIRC there is dark magic in java class loader to replace a certain library class without forking the project

src/common/src/types/jsonb.rs Outdated Show resolved Hide resolved
src/common/src/types/jsonb.rs Outdated Show resolved Hide resolved

impl ToSql for ScalarRefImpl<'_> {
impl ToSql for ScalarImpl {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we change from ScalarRefImpl to ScalarImpl? The former is always more general and there is as_scalar_ref. (That is, to for a ref type and from for an owned type.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of our types that implement FromSql are scalar types instead of scalar_ref. So it's easier to implement FromSql for ScalarImpl then ScalarRefImpl. Besides, as the postgres_cell_to_scalar_impl's return type is ScalarImpl, ScalarImpl would be more natural.

src/common/src/types/from_sql.rs Outdated Show resolved Hide resolved
src/common/src/types/from_sql.rs Outdated Show resolved Hide resolved
src/common/src/types/from_sql.rs Show resolved Hide resolved
src/connector/src/parser/scalar_adapter.rs Outdated Show resolved Hide resolved
ScalarAdapter::List(vec)
}
},
_ => ScalarAdapter::Builtin(scalar.into_scalar_impl()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.into_scalar_impl() contains a clone and is the original reason ScalarAdapter contains ScalarRefImpl. But given it is only used as primary key during backfill at this time, I guess cloning is okay here.

src/connector/src/parser/postgres.rs Show resolved Hide resolved
@KeXiangWang KeXiangWang force-pushed the wkx/refactor-postgres-row-to-owned-row branch from efabf6b to 360a6af Compare May 27, 2024 19:19
@KeXiangWang
Copy link
Contributor Author

Spend more time on fixing debezium.

Agree. I've noted down the bugs found as issues. But they may not be urgent, will fix when requested.

IIRC there is dark magic in java class loader to replace a certain library class without forking the project

Should be ClassLoader.

src/common/src/types/from_sql.rs Outdated Show resolved Hide resolved
@KeXiangWang KeXiangWang enabled auto-merge May 28, 2024 14:47
@KeXiangWang KeXiangWang added this pull request to the merge queue May 28, 2024
Merged via the queue into main with commit 12f5c0d May 28, 2024
29 of 30 checks passed
@KeXiangWang KeXiangWang deleted the wkx/refactor-postgres-row-to-owned-row branch May 28, 2024 21:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants