Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix change data feed with delete vectors #23827

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -356,10 +357,25 @@ public CompletableFuture<Collection<Slice>> finish()

private Slice writeMergeResult(Slice path, FileDeletion deletion)
{
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
RoaringBitmapArray deletedRows = loadDeletionVector(Location.of(path.toStringUtf8()));
deletedRows.or(deletion.rowsDeletedByDelete());
deletedRows.or(deletion.rowsDeletedByUpdate());

deletedRows.or(rowsDeletedByDelete);
deletedRows.or(rowsDeletedByUpdate);

if (cdfEnabled) {
try (ConnectorPageSource connectorPageSource = createParquetPageSource(Location.of(path.toStringUtf8())).get()) {
readConnectorPageSource(
connectorPageSource,
rowsDeletedByDelete,
rowsDeletedByUpdate,
deletion,
_ -> {});
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading Parquet file: " + path, e);
}
}
TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8()));
try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) {
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
Expand Down Expand Up @@ -523,47 +539,16 @@ private Optional<DataFileInfo> rewriteParquetFile(Location path, FileDeletion de
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
try (ConnectorPageSource connectorPageSource = createParquetPageSource(path).get()) {
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Page page = connectorPageSource.getNextPage();
if (page == null) {
continue;
}

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
for (int position = 0; position < positionCount; position++) {
if (rowsDeletedByDelete.contains(filePosition)) {
deletedByDelete[deletedByDeleteCount] = position;
deletedByDeleteCount++;
}
else if (rowsDeletedByUpdate.contains(filePosition)) {
deletedByUpdate[deletedByUpdateCount] = position;
deletedByUpdateCount++;
}
else {
retained[retainedCount] = position;
retainedCount++;
}
filePosition++;
}

storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL);
storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL);

if (retainedCount != positionCount) {
page = page.getPositions(retained, 0, retainedCount);
}

if (page.getPositionCount() > 0) {
fileWriter.appendRows(page);
}
}
readConnectorPageSource(
connectorPageSource,
rowsDeletedByDelete,
rowsDeletedByUpdate,
deletion,
page -> {
if (page.getPositionCount() > 0) {
fileWriter.appendRows(page);
}
});
if (fileWriter.getRowCount() == 0) {
fileWriter.rollback();
return Optional.empty();
Expand All @@ -585,6 +570,54 @@ else if (rowsDeletedByUpdate.contains(filePosition)) {
return Optional.of(fileWriter.getDataFileInfo());
}

private void readConnectorPageSource(
ConnectorPageSource connectorPageSource,
RoaringBitmapArray rowsDeletedByDelete,
RoaringBitmapArray rowsDeletedByUpdate,
FileDeletion deletion,
Consumer<Page> pageConsumer)
{
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Page page = connectorPageSource.getNextPage();
if (page == null) {
continue;
}

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
for (int position = 0; position < positionCount; position++) {
if (rowsDeletedByDelete.contains(filePosition)) {
deletedByDelete[deletedByDeleteCount] = position;
deletedByDeleteCount++;
}
else if (rowsDeletedByUpdate.contains(filePosition)) {
deletedByUpdate[deletedByUpdateCount] = position;
deletedByUpdateCount++;
}
else {
retained[retainedCount] = position;
retainedCount++;
}
filePosition++;
}

storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL);
storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL);

if (retainedCount != positionCount) {
page = page.getPositions(retained, 0, retainedCount);
}

pageConsumer.accept(page);
}
}

private void storeCdfEntries(Page page, int[] deleted, int deletedCount, FileDeletion deletion, String operation)
{
if (cdfEnabled && page.getPositionCount() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2852,12 +2852,6 @@ private void checkWriteSupported(DeltaLakeTableHandle handle)
}
checkUnsupportedUniversalFormat(handle.getMetadataEntry());
checkUnsupportedWriterFeatures(handle.getProtocolEntry());

boolean changeDataFeedEnabled = changeDataFeedEnabled(handle.getMetadataEntry(), handle.getProtocolEntry()).orElse(false);
boolean deletionVectorEnabled = isDeletionVectorEnabled(handle.getMetadataEntry(), handle.getProtocolEntry());
if (changeDataFeedEnabled && deletionVectorEnabled) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with both change data feed and deletion vectors enabled is not supported");
}
}

public static void checkUnsupportedUniversalFormat(MetadataEntry metadataEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,22 +1354,27 @@ public void testCreateTableWithChangeDataFeed()
}

@Test
public void testUnsupportedChangeDataFeedAndDeletionVector()
public void testChangeDataFeedWithDeletionVectors()
{
// TODO https://github.com/trinodb/trino/issues/23620 Fix incorrect CDF entry when deletion vector is enabled
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_cdf_dv",
"(x int) WITH (change_data_feed_enabled = true, deletion_vectors_enabled = true)")) {
assertQueryFails("INSERT INTO " + table.getName() + " VALUES 1", "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("UPDATE " + table.getName() + " SET x = 1", "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("DELETE FROM " + table.getName(), "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("MERGE INTO " + table.getName() + " USING (VALUES 42) t(dummy) ON false WHEN NOT MATCHED THEN INSERT VALUES (1)", "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("TRUNCATE TABLE " + table.getName(), "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE optimize", "Writing to tables with both change data feed and deletion vectors enabled is not supported");

// TODO https://github.com/trinodb/trino/issues/22809 Add support for vacuuming tables with deletion vectors
assertQueryFails("CALL system.vacuum(current_schema, '" + table.getName() + "', '7d')", "Cannot execute vacuum procedure with deletionVectors writer features");
"test_cdf",
"(x VARCHAR, y INT) WITH (change_data_feed_enabled = true, deletion_vectors_enabled = true)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES('test1', 1)", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES('test2', 2)", 1);
assertUpdate("UPDATE " + table.getName() + " SET y = 20 WHERE x = 'test2'", 1);

assertThat(query("SELECT * FROM " + table.getName()))
.skippingTypesCheck()
.matches("VALUES ('test1', 1), ('test2', 20)");
assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + table.getName() + "'))",
"""
VALUES
('test1', 1, 'insert', BIGINT '1'),
('test2', 2, 'insert', BIGINT '2'),
('test2', 2, 'update_preimage', BIGINT '3'),
('test2', 20, 'update_postimage', BIGINT '3')
""");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,35 @@ public void testDeletionVectorsLargeNumbers()
}
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testChangeDataFeedWithDeletionVectors()
{
String tableName = "test_change_data_feed_with_deletion_vectors_" + randomNameSuffix();
onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
"(col1 STRING, updated_column INT)" +
"USING delta " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
"TBLPROPERTIES ('delta.enableChangeDataFeed' = true, 'delta.enableDeletionVectors' = true)");
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)");
onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 30 WHERE col1 = 'testValue3'");

assertThat(onDelta().executeQuery("SELECT col1, updated_column, _change_type FROM table_changes('default." + tableName + "', 0)"))
.containsOnly(
row("testValue1", 1, "insert"),
row("testValue2", 2, "insert"),
row("testValue3", 3, "insert"),
row("testValue3", 3, "update_preimage"),
row("testValue3", 30, "update_postimage"));
assertThat(onTrino().executeQuery("SELECT col1, updated_column, _change_type FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))"))
.containsOnly(
row("testValue1", 1, "insert"),
row("testValue2", 2, "insert"),
row("testValue3", 3, "insert"),
row("testValue3", 3, "update_preimage"),
row("testValue3", 30, "update_postimage"));
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS},
dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
public void testDeletionVectorsAcrossAddFile(boolean partitioned)
Expand Down
Loading