Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Allow missing fields with implicit casting during streaming write #3822

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

johanl-db
Copy link
Collaborator

@johanl-db johanl-db commented Oct 29, 2024

Description

Follow-up on #3443 that introduced implicit casting during streaming write to delta tables.

The feature was shipped disabled due to a regression found in testing where writing data with missing struct fields start being rejected. Streaming writes are one of the few inserts that allows missing struct fields.

This change allows configuring the casting behavior used in MERGE, UPDATE and streaming writes wrt to missing struct fields.

How was this patch tested?

Extensive tests were added in #3762 in preparation for this changes, covering for all inserts (SQL, dataframe, append/overwrite, ..):

  • Missing top-level columns and nested struct fields.
  • Extra top-level columns and nested struct fields with schema evolution.
  • Position vs. name based resolution for top-level columns and nested struct fields.
    with e.p. the goal of ensuring that enabling implicit casting in stream writes here doesn't cause any other unwanted behavior change.

This PR introduces the following user-facing changes

From the initial PR: #3443

Previously, writing to a Delta sink using a type that doesn't match the column type in the Delta table failed with DELTA_FAILED_TO_MERGE_FIELDS:

spark.readStream
    .table("delta_source")
    # Column 'a' has type INT in 'delta_sink'.
    .select(col("a").cast("long").alias("a"))
    .writeStream
    .format("delta")
    .option("checkpointLocation", "<location>")
    .toTable("delta_sink")

DeltaAnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'a' and 'a'

With this change, writing to the sink now succeeds and data is cast from LONG to INT. If any value overflows, the stream fails with (assuming default storeAssignmentPolicy=ANSI):

SparkArithmeticException: [CAST_OVERFLOW_IN_TABLE_INSERT] Fail to assign a value of 'LONG' type to the 'INT' type column or variable 'a' due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead."

@johanl-db johanl-db changed the title [Spark][WIP] Allow missing fields with implicit casting during streaming write [Spark] Allow missing fields with implicit casting during streaming write Oct 30, 2024
Copy link
Collaborator

@tomvanbussel tomvanbussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but left a few nits.

The casting and missing columns behavior is getting really complex overall, as it depends so strongly on the operation performed. It would be good if we could clean this up, but this will likely require some breaking changes.

case class CastingBehavior(
allowMissingStructField: Boolean,
resolveStructsByName: Boolean,
isMergeOrUpdate: Boolean
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is mixing policy flags (isMergeOrUpdate) with mechanism flags (allowMissingStructField, resolveStructsByName).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled isMergeOrUpdate out, this is now a trait that gets mixed in instead of a flag

* in error messages and to provide backward compatible behavior.
*/
case class CastingBehavior(
allowMissingStructField: Boolean,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this config is completely ignored when resolveStructsByName is false. This may lead to some surprising behavior in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now using the type system to enforce a constraint that by position cannot specify allowMissingStructField.

includeInserts = inserts -- insertsByName.intersect(insertsDataframe)
// Exclude dataframe inserts by name (except streaming) which don't support implicit cast.
// See negative test below.
includeInserts = inserts -- (insertsByName.intersect(insertsDataframe) - StreamingInsert)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is getting a little bit complex. It's hard for me to understand for me which cases are actually covered here. It's okay to have some duplication in tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor improvement: I introduced a variable for "inserts that don't support implicit casting" to make it a bit easier to reason about.

includeInserts = inserts -- insertsDataframe.intersect(insertsByName)
// Exclude dataframe inserts by name (except streaming) which don't support implicit cast.
// See negative test below.
includeInserts = inserts -- (insertsByName.intersect(insertsDataframe) - StreamingInsert)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just:

Suggested change
includeInserts = inserts -- (insertsByName.intersect(insertsDataframe) - StreamingInsert)
includeInserts = inserts -- insertsByName.intersect(insertsDataframe) + StreamingInsert

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inserts is one of:

  • insertsAppend - StreamingInsert
  • insertsOverwrite - SQLInsertOverwritePartitionByPosition
  • Set(StreamingInsert)
  • Set(SQLInsertOverwritePartitionByPosition)

It doesn't always contain StreamingInsert, we don't want to add it then otherwise we'll get duplicate tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants