Skip to content

Commit

Permalink
Support FIRST and AFTER clause when adding a new column in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 24, 2024
1 parent 5e3f48a commit 4966026
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_UPDATE -> true;
case SUPPORTS_CREATE_MATERIALIZED_VIEW,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_CREATE_VIEW,
SUPPORTS_MERGE,
SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_REPORTING_WRITTEN_BYTES -> true;
case SUPPORTS_ADD_FIELD,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_ADD_FIELD,
SUPPORTS_AGGREGATION_PUSHDOWN,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_DROP_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return switch (connectorBehavior) {
case SUPPORTS_MULTI_STATEMENT_WRITES,
SUPPORTS_REPORTING_WRITTEN_BYTES -> true; // FIXME: Fails because only allowed with transactional tables
case SUPPORTS_ADD_FIELD,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_ADD_FIELD,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_DROP_FIELD,
SUPPORTS_MERGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnPosition;
import io.trino.spi.connector.ConnectorAnalyzeMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
Expand Down Expand Up @@ -1878,7 +1879,7 @@ private static Term toIcebergTerm(Schema schema, PartitionField partitionField)
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column, ColumnPosition position, Optional<String> afterColumnName)
{
// Spark doesn't support adding a NOT NULL column to Iceberg tables
// Also, Spark throws an exception when reading the table if we add such columns and execute a rollback procedure
Expand All @@ -1892,17 +1893,22 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
// added - instead of relying on addColumn in iceberg library to assign Ids
AtomicInteger nextFieldId = new AtomicInteger(icebergTable.schema().highestFieldId() + 2);
try {
icebergTable.updateSchema()
.addColumn(column.getName(), toIcebergTypeForNewColumn(column.getType(), nextFieldId), column.getComment())
.commit();
UpdateSchema updateSchema = icebergTable.updateSchema();
updateSchema.addColumn(column.getName(), toIcebergTypeForNewColumn(column.getType(), nextFieldId), column.getComment());
switch (position) {
case FIRST -> updateSchema.moveFirst(column.getName());
case AFTER -> updateSchema.moveAfter(column.getName(), afterColumnName.orElseThrow());
case LAST -> {}
}
updateSchema.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add column: " + firstNonNull(e.getMessage(), e), e);
}
}

@Override
public void addField(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> parentPath, String fieldName, io.trino.spi.type.Type type, boolean ignoreExisting)
public void addField(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> parentPath, String fieldName, io.trino.spi.type.Type type, ColumnPosition position, Optional<String> afterFieldName, boolean ignoreExisting)
{
// Iceberg disallows ambiguous field names in a table. e.g. (a row(b int), "a.b" int)
String parentName = String.join(".", parentPath);
Expand All @@ -1920,9 +1926,14 @@ public void addField(ConnectorSession session, ConnectorTableHandle tableHandle,
}

try {
icebergTable.updateSchema()
.addColumn(caseSensitiveParentName, fieldName, toIcebergTypeForNewColumn(type, new AtomicInteger())) // Iceberg library assigns fresh id internally
.commit();
UpdateSchema updateSchema = icebergTable.updateSchema();
updateSchema.addColumn(caseSensitiveParentName, fieldName, toIcebergTypeForNewColumn(type, new AtomicInteger())); // Iceberg library assigns fresh id internally;
switch (position) {
case FIRST -> updateSchema.moveFirst(caseSensitiveParentName + "." + fieldName);
case AFTER -> updateSchema.moveAfter(caseSensitiveParentName + "." + fieldName, caseSensitiveParentName + "." + afterFieldName.orElseThrow());
case LAST -> {}
}
updateSchema.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add field: " + firstNonNull(e.getMessage(), e), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ARRAY,
SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_COMMENT_ON_COLUMN,
SUPPORTS_COMMENT_ON_TABLE,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public final void destroy()
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ADD_FIELD,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_ADD_FIELD,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_CREATE_VIEW,
SUPPORTS_DROP_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ADD_COLUMN_WITH_COMMENT,
SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_COMMENT_ON_COLUMN,
SUPPORTS_COMMENT_ON_TABLE,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN_WITH_COMMENT;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN_WITH_POSITION;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_FIELD;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_FIELD_WITH_POSITION;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ARRAY;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN;
Expand Down Expand Up @@ -2422,6 +2424,40 @@ protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable
throw new AssertionError("Unexpected failure when adding not null column", e);
}

@Test
public void testAddColumnWithPosition()
{
skipTestUnless(hasBehavior(SUPPORTS_ADD_COLUMN)); // covered by testAddColumn

if (!hasBehavior(SUPPORTS_ADD_COLUMN_WITH_POSITION)) {
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_", "AS SELECT 2 second, 4 fourth")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN first integer FIRST",
"This connector does not support adding columns with FIRST clause");
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN third integer AFTER second",
"This connector does not support adding columns with AFTER clause");
}
return;
}

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_", "AS SELECT 2 second, 4 fourth")) {
assertTableColumnNames(table.getName(), "second", "fourth");
assertQuery("SELECT * FROM " + table.getName(), "VALUES (2, 4)");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN first integer FIRST");
assertTableColumnNames(table.getName(), "first", "second", "fourth");
assertQuery("SELECT * FROM " + table.getName(), "VALUES (null, 2, 4)");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN third integer AFTER second");
assertTableColumnNames(table.getName(), "first", "second", "third", "fourth");
assertQuery("SELECT * FROM " + table.getName(), "VALUES (null, 2, null, 4)");

assertUpdate("INSERT INTO " + table.getName() + " VALUES (10, 20, 30, 40)", 1);
assertQuery("SELECT * FROM " + table.getName(), "VALUES (null, 2, null, 4), (10, 20, 30, 40)");
}
}

@Test
public void testAddRowField()
{
Expand Down Expand Up @@ -2460,6 +2496,59 @@ public void testAddRowField()
}
}

@Test
public void testAddRowFieldWithPosition()
{
skipTestUnless(hasBehavior(SUPPORTS_ADD_FIELD)); // covered by testAddRowField

if (!hasBehavior(SUPPORTS_ADD_FIELD_WITH_POSITION)) {
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_add_field_",
"AS SELECT CAST(row(1) AS row(field integer)) AS col")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN col.new_field integer FIRST",
"This connector does not support adding columns with FIRST clause");
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN col.new_field integer AFTER field",
"This connector does not support adding columns with AFTER clause");
}
return;
}

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_add_field_",
"AS SELECT CAST(row(2, row(22, 44)) AS row(b integer, d row(f2 integer, f4 integer))) AS col")) {
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(b integer, d row(f2 integer, f4 integer))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.a integer FIRST");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, d row(f2 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, row(22, 44)) AS row(a integer, b integer, d row(f2 integer, f4 integer)))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.c integer AFTER b");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, c integer, d row(f2 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, NULL, row(22, 44)) AS row(a integer, b integer, c integer, d row(f2 integer, f4 integer)))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.d.f1 integer FIRST");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, NULL, row(NULL, 22, 44)) AS row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f4 integer)))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.d.f3 integer AFTER f2");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f3 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, NULL, row(NULL, 22, NULL, 44)) AS row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f3 integer, f4 integer)))");

assertUpdate("INSERT INTO " + table.getName() + " SELECT row(10, 20, 30, row(111, 222, 333, 444))", 1);
assertThat(query("SELECT * FROM " + table.getName()))
.skippingTypesCheck()
.matches("SELECT row(NULL, 2, NULL, row(NULL, 22, NULL, 44)) UNION SELECT row(10, 20, 30, row(111, 222, 333, 444))");
}
}

@Test
public void testDropColumn()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public enum TestingConnectorBehavior

SUPPORTS_ADD_COLUMN,
SUPPORTS_ADD_COLUMN_WITH_COMMENT(SUPPORTS_ADD_COLUMN),
SUPPORTS_ADD_COLUMN_WITH_POSITION(SUPPORTS_ADD_COLUMN),
SUPPORTS_ADD_FIELD(fallback -> fallback.test(SUPPORTS_ADD_COLUMN) && fallback.test(SUPPORTS_ROW_TYPE)),
SUPPORTS_ADD_FIELD_WITH_POSITION(SUPPORTS_ADD_FIELD),
SUPPORTS_DROP_COLUMN(SUPPORTS_ADD_COLUMN),
SUPPORTS_DROP_FIELD(and(SUPPORTS_DROP_COLUMN, SUPPORTS_ROW_TYPE)),
SUPPORTS_RENAME_COLUMN,
Expand Down

0 comments on commit 4966026

Please sign in to comment.