Skip to content

Commit

Permalink
Fix updating delta lake table that has deletion vectors and cdf enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
homar committed Oct 18, 2024
1 parent 6637b9a commit 387f1e5
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -356,10 +357,25 @@ public CompletableFuture<Collection<Slice>> finish()

private Slice writeMergeResult(Slice path, FileDeletion deletion)
{
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
RoaringBitmapArray deletedRows = loadDeletionVector(Location.of(path.toStringUtf8()));
deletedRows.or(deletion.rowsDeletedByDelete());
deletedRows.or(deletion.rowsDeletedByUpdate());

deletedRows.or(rowsDeletedByDelete);
deletedRows.or(rowsDeletedByUpdate);

if (cdfEnabled) {
try (ConnectorPageSource connectorPageSource = createParquetPageSource(Location.of(path.toStringUtf8())).get()) {
readConnectorPageSource(
connectorPageSource,
rowsDeletedByDelete,
rowsDeletedByUpdate,
deletion,
_ -> {});
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Error reading Parquet file: " + path, e);
}
}
TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8()));
try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) {
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
Expand Down Expand Up @@ -523,47 +539,16 @@ private Optional<DataFileInfo> rewriteParquetFile(Location path, FileDeletion de
RoaringBitmapArray rowsDeletedByDelete = deletion.rowsDeletedByDelete();
RoaringBitmapArray rowsDeletedByUpdate = deletion.rowsDeletedByUpdate();
try (ConnectorPageSource connectorPageSource = createParquetPageSource(path).get()) {
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Page page = connectorPageSource.getNextPage();
if (page == null) {
continue;
}

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
for (int position = 0; position < positionCount; position++) {
if (rowsDeletedByDelete.contains(filePosition)) {
deletedByDelete[deletedByDeleteCount] = position;
deletedByDeleteCount++;
}
else if (rowsDeletedByUpdate.contains(filePosition)) {
deletedByUpdate[deletedByUpdateCount] = position;
deletedByUpdateCount++;
}
else {
retained[retainedCount] = position;
retainedCount++;
}
filePosition++;
}

storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL);
storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL);

if (retainedCount != positionCount) {
page = page.getPositions(retained, 0, retainedCount);
}

if (page.getPositionCount() > 0) {
fileWriter.appendRows(page);
}
}
readConnectorPageSource(
connectorPageSource,
rowsDeletedByDelete,
rowsDeletedByUpdate,
deletion,
page -> {
if (page.getPositionCount() > 0) {
fileWriter.appendRows(page);
}
});
if (fileWriter.getRowCount() == 0) {
fileWriter.rollback();
return Optional.empty();
Expand All @@ -585,6 +570,54 @@ else if (rowsDeletedByUpdate.contains(filePosition)) {
return Optional.of(fileWriter.getDataFileInfo());
}

private void readConnectorPageSource(
ConnectorPageSource connectorPageSource,
RoaringBitmapArray rowsDeletedByDelete,
RoaringBitmapArray rowsDeletedByUpdate,
FileDeletion deletion,
Consumer<Page> pageConsumer)
{
long filePosition = 0;
while (!connectorPageSource.isFinished()) {
Page page = connectorPageSource.getNextPage();
if (page == null) {
continue;
}

int positionCount = page.getPositionCount();
int[] retained = new int[positionCount];
int[] deletedByDelete = new int[(int) rowsDeletedByDelete.cardinality()];
int[] deletedByUpdate = new int[(int) rowsDeletedByUpdate.cardinality()];
int retainedCount = 0;
int deletedByUpdateCount = 0;
int deletedByDeleteCount = 0;
for (int position = 0; position < positionCount; position++) {
if (rowsDeletedByDelete.contains(filePosition)) {
deletedByDelete[deletedByDeleteCount] = position;
deletedByDeleteCount++;
}
else if (rowsDeletedByUpdate.contains(filePosition)) {
deletedByUpdate[deletedByUpdateCount] = position;
deletedByUpdateCount++;
}
else {
retained[retainedCount] = position;
retainedCount++;
}
filePosition++;
}

storeCdfEntries(page, deletedByDelete, deletedByDeleteCount, deletion, DELETE_CDF_LABEL);
storeCdfEntries(page, deletedByUpdate, deletedByUpdateCount, deletion, UPDATE_PREIMAGE_CDF_LABEL);

if (retainedCount != positionCount) {
page = page.getPositions(retained, 0, retainedCount);
}

pageConsumer.accept(page);
}
}

private void storeCdfEntries(Page page, int[] deleted, int deletedCount, FileDeletion deletion, String operation)
{
if (cdfEnabled && page.getPositionCount() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,27 @@ public void testCreateTableWithChangeDataFeed()
}
}

@Test
public void testChangeDataFeedWithDeletionVectors()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_cdf",
"(x VARCHAR, y INT) WITH (change_data_feed_enabled = true, deletion_vectors_enabled = true)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES('test1', 1)", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES('test2', 2)", 1);
assertUpdate("UPDATE " + table.getName() + " SET y = 20 WHERE x = 'test2'", 1);
assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + table.getName() + "'))",
"""
VALUES
('test1', 1, 'insert', BIGINT '1'),
('test2', 2, 'insert', BIGINT '2'),
('test2', 2, 'update_preimage', BIGINT '3'),
('test2', 20, 'update_postimage', BIGINT '3')
""");
}
}

@Test
public void testUnsupportedCreateTableWithChangeDataFeed()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,35 @@ public void testDeletionVectorsLargeNumbers()
}
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testChangeDataFeedWithDeletionVectors()
{
String tableName = "test_change_data_feed_with_deletion_vectors_" + randomNameSuffix();
onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
"(col1 STRING, updated_column INT)" +
"USING delta " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
"TBLPROPERTIES ('delta.enableChangeDataFeed' = true, 'delta.enableDeletionVectors' = true)");
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)");
onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 30 WHERE col1 = 'testValue3'");

assertThat(onDelta().executeQuery("SELECT col1, updated_column, _change_type FROM table_changes('default." + tableName + "', 0)"))
.containsOnly(
row("testValue1", 1, "insert"),
row("testValue2", 2, "insert"),
row("testValue3", 3, "insert"),
row("testValue3", 3, "update_preimage"),
row("testValue3", 30, "update_postimage"));
assertThat(onTrino().executeQuery("SELECT col1, updated_column, _change_type FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))"))
.containsOnly(
row("testValue1", 1, "insert"),
row("testValue2", 2, "insert"),
row("testValue3", 3, "insert"),
row("testValue3", 3, "update_preimage"),
row("testValue3", 30, "update_postimage"));
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS},
dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
public void testDeletionVectorsAcrossAddFile(boolean partitioned)
Expand Down

0 comments on commit 387f1e5

Please sign in to comment.