From c1c552f3305fda74952e3b2cb5fb0c2b6a328d6b Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Fri, 18 Oct 2024 16:47:24 +0200 Subject: [PATCH] Fix updating delta lake table that has deletion vectors and cdf enabled --- .../plugin/deltalake/DeltaLakeMergeSink.java | 121 +++++++++++------- .../deltalake/TestDeltaLakeConnectorTest.java | 21 +++ .../TestDeltaLakeDeleteCompatibility.java | 29 +++++ 3 files changed, 127 insertions(+), 44 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 5fe764a727560..0ad393f8644e6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -62,6 +62,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -356,10 +357,25 @@ public CompletableFuture> finish() private Slice writeMergeResult(Slice path, FileDeletion deletion) { + RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete(); + RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate(); RoaringBitmapArray deletedRows = loadDeletionVector(Location.of(path.toStringUtf8())); - deletedRows.or(deletion.rowsDeletedByDelete()); - deletedRows.or(deletion.rowsDeletedByUpdate()); - + deletedRows.or(rowsDeletedByDelete); + deletedRows.or(rowsDeletedByUpdate); + + if (cdfEnabled) { + try (ConnectorPageSource connectorPageSource = createParquetPageSource(Location.of(path.toStringUtf8())).get()) { + readConnectorPageSource( + connectorPageSource, + rowsDeletedByDelete, + rowsDeletedByUpdate, + deletion, + _ -> {}); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading Parquet file: " + path, e); + } + } TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8())); try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) { ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); @@ -523,47 +539,16 @@ private Optional rewriteParquetFile(Location path, FileDeletion de RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete(); RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate(); try (ConnectorPageSource connectorPageSource = createParquetPageSource(path).get()) { - long filePosition = 0; - while (!connectorPageSource.isFinished()) { - Page page = connectorPageSource.getNextPage(); - if (page == null) { - continue; - } - - int positionCount = page.getPositionCount(); - int[] retained = new int[positionCount]; - int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()]; - int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()]; - int retainedCount = 0; - int deletedByUpdateCount = 0; - int deletedByDeleteCount = 0; - for (int position = 0; position < positionCount; position++) { - if (rowsDeletedByDelete.contains(filePosition)) { - deletedByDelete[deletedByDeleteCount] = position; - deletedByDeleteCount++; - } - else if (rowsDeletedByUpdate.contains(filePosition)) { - deletedByUpdate[deletedByUpdateCount] = position; - deletedByUpdateCount++; - } - else { - retained[retainedCount] = position; - retainedCount++; - } - filePosition++; - } - - storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL); - storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL); - - if (retainedCount != positionCount) { - page = page.getPositions(retained, 0, retainedCount); - } - - if (page.getPositionCount() > 0) { - fileWriter.appendRows(page); - } - } + readConnectorPageSource( + connectorPageSource, + rowsDeletedByDelete, + rowsDeletedByUpdate, + deletion, + page -> { + if (page.getPositionCount() > 0) { + fileWriter.appendRows(page); + } + }); if (fileWriter.getRowCount() == 0) { fileWriter.rollback(); return Optional.empty(); @@ -585,6 +570,54 @@ else if (rowsDeletedByUpdate.contains(filePosition)) { return Optional.of(fileWriter.getDataFileInfo()); } + private void readConnectorPageSource( + ConnectorPageSource connectorPageSource, + RoaringBitmapArray rowsDeletedByDelete, + RoaringBitmapArray rowsDeletedByUpdate, + FileDeletion deletion, + Consumer pageConsumer) + { + long filePosition = 0; + while (!connectorPageSource.isFinished()) { + Page page = connectorPageSource.getNextPage(); + if (page == null) { + continue; + } + + int positionCount = page.getPositionCount(); + int[] retained = new int[positionCount]; + int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()]; + int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()]; + int retainedCount = 0; + int deletedByUpdateCount = 0; + int deletedByDeleteCount = 0; + for (int position = 0; position < positionCount; position++) { + if (rowsDeletedByDelete.contains(filePosition)) { + deletedByDelete[deletedByDeleteCount] = position; + deletedByDeleteCount++; + } + else if (rowsDeletedByUpdate.contains(filePosition)) { + deletedByUpdate[deletedByUpdateCount] = position; + deletedByUpdateCount++; + } + else { + retained[retainedCount] = position; + retainedCount++; + } + filePosition++; + } + + storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL); + storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL); + + if (retainedCount != positionCount) { + page = page.getPositions(retained, 0, retainedCount); + } + + pageConsumer.accept(page); + } + } + private void storeCdfEntries(Page page, int[] deleted, int deletedCount, FileDeletion deletion, String operation) { if (cdfEnabled && page.getPositionCount() > 0) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index d7e4f7008ee53..ebe087b1b7036 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -1353,6 +1353,27 @@ public void testCreateTableWithChangeDataFeed() } } + @Test + public void testChangeDataFeedWithDeletionVectors() + { + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_cdf", + "(x VARCHAR, y INT) WITH (change_data_feed_enabled = true, deletion_vectors_enabled = true)")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES('test1', 1)", 1); + assertUpdate("INSERT INTO " + table.getName() + " VALUES('test2', 2)", 1); + assertUpdate("UPDATE " + table.getName() + " SET y = 20 WHERE x = 'test2'", 1); + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + table.getName() + "'))", + """ + VALUES + ('test1', 1, 'insert', BIGINT '1'), + ('test2', 2, 'insert', BIGINT '2'), + ('test2', 2, 'update_preimage', BIGINT '3'), + ('test2', 20, 'update_postimage', BIGINT '3') + """); + } + } + @Test public void testUnsupportedCreateTableWithChangeDataFeed() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index b90d3a31339a4..930ee3cbe7355 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -431,6 +431,35 @@ public void testDeletionVectorsLargeNumbers() } } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testChangeDataFeedWithDeletionVectors() + { + String tableName = "test_change_data_feed_with_deletion_vectors_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(col1 STRING, updated_column INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableChangeDataFeed' = true, 'delta.enableDeletionVectors' = true)"); + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)"); + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 30 WHERE col1 = 'testValue3'"); + + assertThat(onDelta().executeQuery("SELECT col1, updated_column, _change_type FROM table_changes('default." + tableName + "', 0)")) + .containsOnly( + row("testValue1", 1, "insert"), + row("testValue2", 2, "insert"), + row("testValue3", 3, "insert"), + row("testValue3", 3, "update_preimage"), + row("testValue3", 30, "update_postimage")); + assertThat(onTrino().executeQuery("SELECT col1, updated_column, _change_type FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))")) + .containsOnly( + row("testValue1", 1, "insert"), + row("testValue2", 2, "insert"), + row("testValue3", 3, "insert"), + row("testValue3", 3, "update_preimage"), + row("testValue3", 30, "update_postimage")); + } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse") public void testDeletionVectorsAcrossAddFile(boolean partitioned)