Skip to content

Commit

Permalink
Support FIRST and AFTER clause when adding a new column in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 21, 2024
1 parent 1542fde commit f2ab14b
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ default void setColumnComment(ConnectorSession session, ConnectorTableHandle tab
/**
* @deprecated Use {{@link #addColumn(ConnectorSession, ConnectorTableHandle, ColumnMetadata, ColumnPosition, Optional)}}
*/
@Deprecated
default void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns");
Expand All @@ -633,6 +634,7 @@ default void addColumn(ConnectorSession session, ConnectorTableHandle tableHandl
/**
* @deprecated Use {{@link #addField(ConnectorSession, ConnectorTableHandle, List, String, Type, ColumnPosition, Optional, boolean)}}
*/
@Deprecated
default void addField(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> parentPath, String fieldName, Type type, boolean ignoreExisting)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding fields");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_UPDATE -> true;
case SUPPORTS_CREATE_MATERIALIZED_VIEW,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_CREATE_VIEW,
SUPPORTS_MERGE,
SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return switch (connectorBehavior) {
case SUPPORTS_CREATE_OR_REPLACE_TABLE,
SUPPORTS_REPORTING_WRITTEN_BYTES -> true;
case SUPPORTS_ADD_FIELD,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_ADD_FIELD,
SUPPORTS_AGGREGATION_PUSHDOWN,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_DROP_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return switch (connectorBehavior) {
case SUPPORTS_MULTI_STATEMENT_WRITES,
SUPPORTS_REPORTING_WRITTEN_BYTES -> true; // FIXME: Fails because only allowed with transactional tables
case SUPPORTS_ADD_FIELD,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_ADD_FIELD,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_DROP_FIELD,
SUPPORTS_MERGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnPosition;
import io.trino.spi.connector.ConnectorAnalyzeMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
Expand Down Expand Up @@ -1879,7 +1880,7 @@ private static Term toIcebergTerm(Schema schema, PartitionField partitionField)
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column, ColumnPosition position, Optional<String> afterColumnName)
{
// Spark doesn't support adding a NOT NULL column to Iceberg tables
// Also, Spark throws an exception when reading the table if we add such columns and execute a rollback procedure
Expand All @@ -1893,17 +1894,22 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
// added - instead of relying on addColumn in iceberg library to assign Ids
AtomicInteger nextFieldId = new AtomicInteger(icebergTable.schema().highestFieldId() + 2);
try {
icebergTable.updateSchema()
.addColumn(column.getName(), toIcebergTypeForNewColumn(column.getType(), nextFieldId), column.getComment())
.commit();
UpdateSchema updateSchema = icebergTable.updateSchema();
updateSchema.addColumn(column.getName(), toIcebergTypeForNewColumn(column.getType(), nextFieldId), column.getComment());
switch (position) {
case FIRST -> updateSchema.moveFirst(column.getName());
case AFTER -> updateSchema.moveAfter(column.getName(), afterColumnName.orElseThrow());
case LAST -> {}
}
updateSchema.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add column: " + firstNonNull(e.getMessage(), e), e);
}
}

@Override
public void addField(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> parentPath, String fieldName, io.trino.spi.type.Type type, boolean ignoreExisting)
public void addField(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> parentPath, String fieldName, io.trino.spi.type.Type type, ColumnPosition position, Optional<String> afterFieldName, boolean ignoreExisting)
{
// Iceberg disallows ambiguous field names in a table. e.g. (a row(b int), "a.b" int)
String parentName = String.join(".", parentPath);
Expand All @@ -1921,9 +1927,14 @@ public void addField(ConnectorSession session, ConnectorTableHandle tableHandle,
}

try {
icebergTable.updateSchema()
.addColumn(caseSensitiveParentName, fieldName, toIcebergTypeForNewColumn(type, new AtomicInteger())) // Iceberg library assigns fresh id internally
.commit();
UpdateSchema updateSchema = icebergTable.updateSchema();
updateSchema.addColumn(caseSensitiveParentName, fieldName, toIcebergTypeForNewColumn(type, new AtomicInteger())); // Iceberg library assigns fresh id internally;
switch (position) {
case FIRST -> updateSchema.moveFirst(caseSensitiveParentName + "." + fieldName);
case AFTER -> updateSchema.moveAfter(caseSensitiveParentName + "." + fieldName, caseSensitiveParentName + "." + afterFieldName.orElseThrow());
case LAST -> {}
}
updateSchema.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to add field: " + firstNonNull(e.getMessage(), e), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ARRAY,
SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_COMMENT_ON_COLUMN,
SUPPORTS_COMMENT_ON_TABLE,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public final void destroy()
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ADD_FIELD,
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_ADD_FIELD,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
SUPPORTS_CREATE_VIEW,
SUPPORTS_DROP_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_ADD_COLUMN_WITH_COMMENT,
SUPPORTS_ADD_COLUMN_WITH_POSITION,
SUPPORTS_COMMENT_ON_COLUMN,
SUPPORTS_COMMENT_ON_TABLE,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN_WITH_COMMENT;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_COLUMN_WITH_POSITION;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_FIELD;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_FIELD_WITH_POSITION;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ARRAY;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN;
Expand Down Expand Up @@ -2395,6 +2397,40 @@ protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable
throw new AssertionError("Unexpected failure when adding not null column", e);
}

@Test
public void testAddColumnWithPosition()
{
skipTestUnless(hasBehavior(SUPPORTS_ADD_COLUMN)); // covered by testAddColumn

if (!hasBehavior(SUPPORTS_ADD_COLUMN_WITH_POSITION)) {
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_", "AS SELECT 2 second, 4 fourth")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN first integer FIRST",
"This connector does not support adding columns with FIRST clause");
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN third integer AFTER second",
"This connector does not support adding columns with AFTER clause");
}
return;
}

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_", "AS SELECT 2 second, 4 fourth")) {
assertTableColumnNames(table.getName(), "second", "fourth");
assertQuery("SELECT * FROM " + table.getName(), "VALUES (2, 4)");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN first integer FIRST");
assertTableColumnNames(table.getName(), "first", "second", "fourth");
assertQuery("SELECT * FROM " + table.getName(), "VALUES (null, 2, 4)");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN third integer AFTER second");
assertTableColumnNames(table.getName(), "first", "second", "third", "fourth");
assertQuery("SELECT * FROM " + table.getName(), "VALUES (null, 2, null, 4)");

assertUpdate("INSERT INTO " + table.getName() + " VALUES (10, 20, 30, 40)", 1);
assertQuery("SELECT * FROM " + table.getName(), "VALUES (null, 2, null, 4), (10, 20, 30, 40)");
}
}

@Test
public void testAddRowField()
{
Expand Down Expand Up @@ -2433,6 +2469,59 @@ public void testAddRowField()
}
}

@Test
public void testAddRowFieldWithPosition()
{
skipTestUnless(hasBehavior(SUPPORTS_ADD_FIELD)); // covered by testAddRowField

if (!hasBehavior(SUPPORTS_ADD_FIELD_WITH_POSITION)) {
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_add_field_",
"AS SELECT CAST(row(1) AS row(field integer)) AS col")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN col.new_field integer FIRST",
"This connector does not support adding columns with FIRST clause");
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN col.new_field integer AFTER field",
"This connector does not support adding columns with AFTER clause");
}
return;
}

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_add_field_",
"AS SELECT CAST(row(2, row(22, 44)) AS row(b integer, d row(f2 integer, f4 integer))) AS col")) {
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(b integer, d row(f2 integer, f4 integer))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.a integer FIRST");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, d row(f2 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, row(22, 44)) AS row(a integer, b integer, d row(f2 integer, f4 integer)))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.c integer AFTER b");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, c integer, d row(f2 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, NULL, row(22, 44)) AS row(a integer, b integer, c integer, d row(f2 integer, f4 integer)))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.d.f1 integer FIRST");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, NULL, row(NULL, 22, 44)) AS row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f4 integer)))");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col.d.f3 integer AFTER f2");
assertThat(getColumnType(table.getName(), "col")).isEqualTo("row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f3 integer, f4 integer))");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(NULL, 2, NULL, row(NULL, 22, NULL, 44)) AS row(a integer, b integer, c integer, d row(f1 integer, f2 integer, f3 integer, f4 integer)))");

assertUpdate("INSERT INTO " + table.getName() + " SELECT row(10, 20, 30, row(111, 222, 333, 444))", 1);
assertThat(query("SELECT * FROM " + table.getName()))
.skippingTypesCheck()
.matches("SELECT row(NULL, 2, NULL, row(NULL, 22, NULL, 44)) UNION SELECT row(10, 20, 30, row(111, 222, 333, 444))");
}
}

@Test
public void testDropColumn()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public enum TestingConnectorBehavior

SUPPORTS_ADD_COLUMN,
SUPPORTS_ADD_COLUMN_WITH_COMMENT(SUPPORTS_ADD_COLUMN),
SUPPORTS_ADD_COLUMN_WITH_POSITION(SUPPORTS_ADD_COLUMN),
SUPPORTS_ADD_FIELD(fallback -> fallback.test(SUPPORTS_ADD_COLUMN) && fallback.test(SUPPORTS_ROW_TYPE)),
SUPPORTS_ADD_FIELD_WITH_POSITION(SUPPORTS_ADD_FIELD),
SUPPORTS_DROP_COLUMN(SUPPORTS_ADD_COLUMN),
SUPPORTS_DROP_FIELD(and(SUPPORTS_DROP_COLUMN, SUPPORTS_ROW_TYPE)),
SUPPORTS_RENAME_COLUMN,
Expand Down

0 comments on commit f2ab14b

Please sign in to comment.