Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task: Add Unit Test for Flink-Spark Equality Delete Write #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

adelly13
Copy link

Created new flink-spark-bundle module with Unit Test testEqualityDeleteWritesOnSpark() in TestFlinkSpark.java.

Procedures for testEqualityDeleteWritesOnSpark():

  1. Create table in Flink
  2. Write initial data
  3. Create a deletestream
  4. Apply equality deletes
  5. Initialize spark session
  6. Read table using spark
  7. Compare

Current issues: running into errors with the imports as well as Invalid write distribution mode: range. Need to define sort order or partition spec.

@adelly13 adelly13 changed the title Added Unit Test for Flink-Spark Equality Delete Write Task: Add Unit Test for Flink-Spark Equality Delete Write Oct 18, 2024
}

@TestTemplate
public void testCheckAndGetEqualityFieldIds() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of the all of the tests but your spark + flink test


// Assert that only row with id=3 remains in the table
assertThat(actualData).containsExactlyInAnyOrderElementsOf(expectedData);

Copy link

@geruh geruh Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good test for writing the equality delete using the stream execution environment! I'd also suggest creating a test for an UPSERT case by leveraging the FlinkTableEnviroment. This will help you see how, the UPSERT leverages an equality delete to replace the value.

You can add a test to TestFlinkCatalogTable.java like this:

    sql("CREATE TABLE test_table (id INT, data STRING, PRIMARY KEY(id) NOT ENFORCED) WITH ('format-version'='2', 'write.upsert.enabled'='true')");

    sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c')");

    // Perform upsert operation
    sql("INSERT INTO test_table VALUES (2, 'updated_b'), (4, 'd')");

Copy link

@geruh geruh Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand upon this, I'd suggest adding some DeleteFile assertions. For example, what do you expect the DeleteFile to look like we want to delete based on:

  • one column: id = 1
  • all columns: id = 1 and data = 'a'
  • range: id > 3 (also how does flink get the values greater than 3?)

.master("local[*]")
.config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_catalog.type", "hadoop")
.config("spark.sql.catalog.myCatalog.warehouse", "file:///path/to/warehouse")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set the same warehouse as the Flink catalog to ensure we are using the same tables.

spark = SparkSession.builder()
        .appName("iceberg-spark")
        .master("local[*]")
        .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.hadoop_catalog.type", "hadoop")
        .config("spark.sql.catalog.hadoop_catalog.warehouse", CATALOG_EXTENSION.warehouse())
        .getOrCreate();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants