Skip to content

Commit

Permalink
Add support for CDF and deletion vectors in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and ebyhr committed Oct 22, 2024
1 parent 1aebb4a commit 757f892
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -356,10 +357,25 @@ public CompletableFuture<Collection<Slice>> finish()

private Slice writeMergeResult(Slice path, FileDeletion deletion)
{
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
RoaringBitmapArray deletedRows = loadDeletionVector(Location.of(path.toStringUtf8()));
deletedRows.or(deletion.rowsDeletedByDelete());
deletedRows.or(deletion.rowsDeletedByUpdate());

deletedRows.or(rowsDeletedByDelete);
deletedRows.or(rowsDeletedByUpdate);

if (cdfEnabled) {
try (ConnectorPageSource connectorPageSource = createParquetPageSource(Location.of(path.toStringUtf8())).get()) {
readConnectorPageSource(
connectorPageSource,
rowsDeletedByDelete,
rowsDeletedByUpdate,
deletion,
_ -> {});
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading Parquet file: " + path, e);
}
}
TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8()));
try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) {
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
Expand Down Expand Up @@ -523,47 +539,16 @@ private Optional<DataFileInfo> rewriteParquetFile(Location path, FileDeletion de
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
try (ConnectorPageSource connectorPageSource = createParquetPageSource(path).get()) {
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Page page = connectorPageSource.getNextPage();
if (page == null) {
continue;
}

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
for (int position = 0; position < positionCount; position++) {
if (rowsDeletedByDelete.contains(filePosition)) {
deletedByDelete[deletedByDeleteCount] = position;
deletedByDeleteCount++;
}
else if (rowsDeletedByUpdate.contains(filePosition)) {
deletedByUpdate[deletedByUpdateCount] = position;
deletedByUpdateCount++;
}
else {
retained[retainedCount] = position;
retainedCount++;
}
filePosition++;
}

storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL);
storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL);

if (retainedCount != positionCount) {
page = page.getPositions(retained, 0, retainedCount);
}

if (page.getPositionCount() > 0) {
fileWriter.appendRows(page);
}
}
readConnectorPageSource(
connectorPageSource,
rowsDeletedByDelete,
rowsDeletedByUpdate,
deletion,
page -> {
if (page.getPositionCount() > 0) {
fileWriter.appendRows(page);
}
});
if (fileWriter.getRowCount() == 0) {
fileWriter.rollback();
return Optional.empty();
Expand All @@ -585,6 +570,54 @@ else if (rowsDeletedByUpdate.contains(filePosition)) {
return Optional.of(fileWriter.getDataFileInfo());
}

private void readConnectorPageSource(
ConnectorPageSource connectorPageSource,
RoaringBitmapArray rowsDeletedByDelete,
RoaringBitmapArray rowsDeletedByUpdate,
FileDeletion deletion,
Consumer<Page> pageConsumer)
{
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Page page = connectorPageSource.getNextPage();
if (page == null) {
continue;
}

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
for (int position = 0; position < positionCount; position++) {
if (rowsDeletedByDelete.contains(filePosition)) {
deletedByDelete[deletedByDeleteCount] = position;
deletedByDeleteCount++;
}
else if (rowsDeletedByUpdate.contains(filePosition)) {
deletedByUpdate[deletedByUpdateCount] = position;
deletedByUpdateCount++;
}
else {
retained[retainedCount] = position;
retainedCount++;
}
filePosition++;
}

storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL);
storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL);

if (retainedCount != positionCount) {
page = page.getPositions(retained, 0, retainedCount);
}

pageConsumer.accept(page);
}
}

private void storeCdfEntries(Page page, int[] deleted, int deletedCount, FileDeletion deletion, String operation)
{
if (cdfEnabled && page.getPositionCount() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2852,12 +2852,6 @@ private void checkWriteSupported(DeltaLakeTableHandle handle)
}
checkUnsupportedUniversalFormat(handle.getMetadataEntry());
checkUnsupportedWriterFeatures(handle.getProtocolEntry());

boolean changeDataFeedEnabled = changeDataFeedEnabled(handle.getMetadataEntry(), handle.getProtocolEntry()).orElse(false);
boolean deletionVectorEnabled = isDeletionVectorEnabled(handle.getMetadataEntry(), handle.getProtocolEntry());
if (changeDataFeedEnabled && deletionVectorEnabled) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with both change data feed and deletion vectors enabled is not supported");
}
}

public static void checkUnsupportedUniversalFormat(MetadataEntry metadataEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,22 +1354,27 @@ public void testCreateTableWithChangeDataFeed()
}

@Test
public void testUnsupportedChangeDataFeedAndDeletionVector()
public void testChangeDataFeedWithDeletionVectors()
{
// TODO https://github.com/trinodb/trino/issues/23620 Fix incorrect CDF entry when deletion vector is enabled
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_cdf_dv",
"(x int) WITH (change_data_feed_enabled = true, deletion_vectors_enabled = true)")) {
assertQueryFails("INSERT INTO " + table.getName() + " VALUES 1", "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("UPDATE " + table.getName() + " SET x = 1", "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("DELETE FROM " + table.getName(), "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("MERGE INTO " + table.getName() + " USING (VALUES 42) t(dummy) ON false WHEN NOT MATCHED THEN INSERT VALUES (1)", "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("TRUNCATE TABLE " + table.getName(), "Writing to tables with both change data feed and deletion vectors enabled is not supported");
assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE optimize", "Writing to tables with both change data feed and deletion vectors enabled is not supported");

// TODO https://github.com/trinodb/trino/issues/22809 Add support for vacuuming tables with deletion vectors
assertQueryFails("CALL system.vacuum(current_schema, '" + table.getName() + "', '7d')", "Cannot execute vacuum procedure with deletionVectors writer features");
"test_cdf",
"(x VARCHAR, y INT) WITH (change_data_feed_enabled = true, deletion_vectors_enabled = true)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES('test1', 1)", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES('test2', 2)", 1);
assertUpdate("UPDATE " + table.getName() + " SET y = 20 WHERE x = 'test2'", 1);

assertThat(query("SELECT * FROM " + table.getName()))
.skippingTypesCheck()
.matches("VALUES ('test1', 1), ('test2', 20)");
assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + table.getName() + "'))",
"""
VALUES
('test1', 1, 'insert', BIGINT '1'),
('test2', 2, 'insert', BIGINT '2'),
('test2', 2, 'update_preimage', BIGINT '3'),
('test2', 20, 'update_postimage', BIGINT '3')
""");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,35 @@ public void testDeletionVectorsLargeNumbers()
}
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testChangeDataFeedWithDeletionVectors()
{
String tableName = "test_change_data_feed_with_deletion_vectors_" + randomNameSuffix();
onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
"(col1 STRING, updated_column INT)" +
"USING delta " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
"TBLPROPERTIES ('delta.enableChangeDataFeed' = true, 'delta.enableDeletionVectors' = true)");
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)");
onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 30 WHERE col1 = 'testValue3'");

assertThat(onDelta().executeQuery("SELECT col1, updated_column, _change_type FROM table_changes('default." + tableName + "', 0)"))
.containsOnly(
row("testValue1", 1, "insert"),
row("testValue2", 2, "insert"),
row("testValue3", 3, "insert"),
row("testValue3", 3, "update_preimage"),
row("testValue3", 30, "update_postimage"));
assertThat(onTrino().executeQuery("SELECT col1, updated_column, _change_type FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))"))
.containsOnly(
row("testValue1", 1, "insert"),
row("testValue2", 2, "insert"),
row("testValue3", 3, "insert"),
row("testValue3", 3, "update_preimage"),
row("testValue3", 30, "update_postimage"));
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS},
dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
public void testDeletionVectorsAcrossAddFile(boolean partitioned)
Expand Down

0 comments on commit 757f892

Please sign in to comment.