Skip to content

Commit

Permalink
Support setting a field type with SET DATA TYPE in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 2, 2023
1 parent 0c37357 commit 4c6319e
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,36 @@ private static boolean fieldExists(StructType structType, String fieldName)
return false;
}

@Override
public void setFieldType(ConnectorSession session, ConnectorTableHandle tableHandle, List<String> fieldPath, io.trino.spi.type.Type type)
{
Table icebergTable = catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName());
String parentPath = String.join(".", fieldPath.subList(0, fieldPath.size() - 1));
NestedField parent = icebergTable.schema().caseInsensitiveFindField(parentPath);

String caseSensitiveParentName = icebergTable.schema().findColumnName(parent.fieldId());
NestedField field = parent.type().asStructType().caseInsensitiveField(getLast(fieldPath));
// TODO: Add support for changing non-primitive field type
if (!field.type().isPrimitiveType()) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg doesn't support changing field type from non-primitive types");
}

String name = caseSensitiveParentName + "." + field.name();
// Pass dummy AtomicInteger. The field id will be discarded because the subsequent logic disallows non-primitive types.
Type icebergType = toIcebergTypeForNewColumn(type, new AtomicInteger());
if (!icebergType.isPrimitiveType()) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg doesn't support changing field type to non-primitive types");
}
try {
icebergTable.updateSchema()
.updateColumn(name, icebergType.asPrimitiveType())
.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set field type: " + firstNonNull(e.getMessage(), e), e);
}
}

private List<ColumnMetadata> getColumnMetadatas(Schema schema)
{
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6951,6 +6951,48 @@ protected void verifySetColumnTypeFailurePermissible(Throwable e)
"|Type not supported for Iceberg: char\\(20\\)).*");
}

@Override
protected Optional<SetColumnTypeSetup> filterSetFieldTypesDataProvider(SetColumnTypeSetup setup)
{
switch ("%s -> %s".formatted(setup.sourceColumnType(), setup.newColumnType())) {
case "bigint -> integer":
case "decimal(5,3) -> decimal(5,2)":
case "varchar -> char(20)":
case "time(6) -> time(3)":
case "timestamp(6) -> timestamp(3)":
case "array(integer) -> array(bigint)":
case "row(x integer) -> row(x bigint)":
case "row(x integer) -> row(y integer)":
case "row(x integer, y integer) -> row(x integer, z integer)":
case "row(x integer) -> row(x integer, y integer)":
case "row(x integer, y integer) -> row(x integer)":
case "row(x integer, y integer) -> row(y integer, x integer)":
case "row(x integer, y integer) -> row(z integer, y integer, x integer)":
case "row(x row(nested integer)) -> row(x row(nested bigint))":
case "row(x row(a integer, b integer)) -> row(x row(b integer, a integer))":
// Iceberg allows updating column types if the update is safe. Safe updates are:
// - int to bigint
// - float to double
// - decimal(P,S) to decimal(P2,S) when P2 > P (scale cannot change)
// https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--alter-column
return Optional.of(setup.asUnsupported());

case "varchar(100) -> varchar(50)":
// Iceberg connector ignores the varchar length
return Optional.empty();
}
return Optional.of(setup);
}

@Override
protected void verifySetFieldTypeFailurePermissible(Throwable e)
{
assertThat(e).hasMessageMatching(".*(Failed to set field type: Cannot change (column type:|type from .* to )" +
"|Time(stamp)? precision \\(3\\) not supported for Iceberg. Use \"time(stamp)?\\(6\\)\" instead" +
"|Type not supported for Iceberg: char\\(20\\)" +
"|Iceberg doesn't support changing field type (from|to) non-primitive types).*");
}

@Override
protected boolean supportsPhysicalPushdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_ADD_FIELD:
case SUPPORTS_RENAME_FIELD:
case SUPPORTS_DROP_FIELD:
case SUPPORTS_SET_FIELD_TYPE:
return false;

case SUPPORTS_CREATE_VIEW:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2748,6 +2748,26 @@ private void testTrinoSetColumnType(boolean partitioned, StorageFormat storageFo
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSetColumnTypeDataProvider")
public void testTrinoSetFieldType(StorageFormat storageFormat, String sourceFieldType, String sourceValueLiteral, String newFieldType, Object newValue)
{
String baseTableName = "test_set_field_type_" + randomNameSuffix();
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);

onTrino().executeQuery("CREATE TABLE " + trinoTableName + " " +
"WITH (format = '" + storageFormat + "')" +
"AS SELECT CAST(row(" + sourceValueLiteral + ") AS row(field " + sourceFieldType + ")) AS col");

onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col.field SET DATA TYPE " + newFieldType);

assertEquals(getColumnType(baseTableName, "col"), "row(field " + newFieldType + ")");
assertThat(onTrino().executeQuery("SELECT col.field FROM " + trinoTableName)).containsOnly(row(newValue));
assertThat(onSpark().executeQuery("SELECT col.field FROM " + sparkTableName)).containsOnly(row(newValue));

onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@DataProvider
public static Object[][] testSetColumnTypeDataProvider()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_LEVEL_DELETE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_TYPE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_SET_COLUMN_TYPE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_SET_FIELD_TYPE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TRUNCATE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_UPDATE;
Expand Down Expand Up @@ -3041,6 +3042,156 @@ protected void verifySetColumnTypeFailurePermissible(Throwable e)
throw new AssertionError("Unexpected set column type failure", e);
}

@Test
public void testSetFieldType()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA) && hasBehavior(SUPPORTS_ROW_TYPE));

if (!hasBehavior(SUPPORTS_SET_FIELD_TYPE)) {
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_field_type_", "(col row(field int))")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " ALTER COLUMN col.field SET DATA TYPE bigint",
"This connector does not support setting field types");
}
return;
}

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_field_type_", "AS SELECT CAST(row(123) AS row(field integer)) AS col")) {
assertEquals(getColumnType(table.getName(), "col"), "row(field integer)");

assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN col.field SET DATA TYPE bigint");

assertEquals(getColumnType(table.getName(), "col"), "row(field bigint)");
assertThat(query("SELECT * FROM " + table.getName()))
.skippingTypesCheck()
.matches("SELECT row(bigint '123')");
}
}

@Test(dataProvider = "setFieldTypesDataProvider")
public void testSetFieldTypes(SetColumnTypeSetup setup)
{
skipTestUnless(hasBehavior(SUPPORTS_SET_FIELD_TYPE) && hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA));

TestTable table;
try {
table = new TestTable(
getQueryRunner()::execute,
"test_set_field_type_",
" AS SELECT CAST(row(" + setup.sourceValueLiteral + ") AS row(field " + setup.sourceColumnType + ")) AS col");
}
catch (Exception e) {
verifyUnsupportedTypeException(e, setup.sourceColumnType);
throw new SkipException("Unsupported column type: " + setup.sourceColumnType);
}
try (table) {
Runnable setFieldType = () -> assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN col.field SET DATA TYPE " + setup.newColumnType);
if (setup.unsupportedType) {
assertThatThrownBy(setFieldType::run)
.satisfies(this::verifySetFieldTypeFailurePermissible);
return;
}
setFieldType.run();

assertEquals(getColumnType(table.getName(), "col"), "row(field " + setup.newColumnType + ")");
assertThat(query("SELECT * FROM " + table.getName()))
.skippingTypesCheck()
.matches("SELECT row(" + setup.newValueLiteral + ")");
}
}

@DataProvider
public Object[][] setFieldTypesDataProvider()
{
return setColumnTypeSetupData().stream()
.map(this::filterSetFieldTypesDataProvider)
.flatMap(Optional::stream)
.collect(toDataProvider());
}

protected Optional<SetColumnTypeSetup> filterSetFieldTypesDataProvider(SetColumnTypeSetup setup)
{
return Optional.of(setup);
}

@Test
public void testSetFieldTypeCaseSensitivity()
{
skipTestUnless(hasBehavior(SUPPORTS_SET_FIELD_TYPE) && hasBehavior(SUPPORTS_NOT_NULL_CONSTRAINT));

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_field_type_case_", " AS SELECT CAST(row(1) AS row(\"UPPER\" integer)) col")) {
assertEquals(getColumnType(table.getName(), "col"), "row(UPPER integer)");

assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN col.upper SET DATA TYPE bigint");
assertEquals(getColumnType(table.getName(), "col"), "row(UPPER bigint)");
assertThat(query("SELECT * FROM " + table.getName()))
.matches("SELECT CAST(row(1) AS row(UPPER bigint))");
}
}

@Test
public void testSetFieldTypeWithNotNull()
{
skipTestUnless(hasBehavior(SUPPORTS_SET_FIELD_TYPE) && hasBehavior(SUPPORTS_NOT_NULL_CONSTRAINT));

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_field_type_null_", "(col row(field int) NOT NULL)")) {
assertFalse(columnIsNullable(table.getName(), "col"));

assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN col.field SET DATA TYPE bigint");
assertFalse(columnIsNullable(table.getName(), "col"));
}
}

@Test
public void testSetFieldTypeWithComment()
{
skipTestUnless(hasBehavior(SUPPORTS_SET_FIELD_TYPE) && hasBehavior(SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT));

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_field_type_comment_", "(col row(field int) COMMENT 'test comment')")) {
assertEquals(getColumnComment(table.getName(), "col"), "test comment");

assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN col.field SET DATA TYPE bigint");
assertEquals(getColumnComment(table.getName(), "col"), "test comment");
}
}

@Test
public void testSetFieldIncompatibleType()
{
skipTestUnless(hasBehavior(SUPPORTS_SET_FIELD_TYPE) && hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA));

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_set_invalid_field_type_",
"(row_col row(field varchar), nested_col row(field row(nested int)))")) {
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN row_col.field SET DATA TYPE row(nested integer)"))
.satisfies(this::verifySetFieldTypeFailurePermissible);
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN row_col.field SET DATA TYPE integer"))
.satisfies(this::verifySetFieldTypeFailurePermissible);
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN nested_col.field SET DATA TYPE integer"))
.satisfies(this::verifySetFieldTypeFailurePermissible);
}
}

@Test
public void testSetFieldOutOfRangeType()
{
skipTestUnless(hasBehavior(SUPPORTS_SET_FIELD_TYPE) && hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA));

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_set_field_type_invalid_range_",
"AS SELECT CAST(row(9223372036854775807) AS row(field bigint)) AS col")) {
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + table.getName() + " ALTER COLUMN col.field SET DATA TYPE integer"))
.satisfies(this::verifySetFieldTypeFailurePermissible);
}
}

protected void verifySetFieldTypeFailurePermissible(Throwable e)
{
throw new AssertionError("Unexpected set field type failure", e);
}

protected String getColumnType(String tableName, String columnName)
{
return (String) computeScalar(format("SELECT data_type FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name = '%s' AND column_name = '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum TestingConnectorBehavior
SUPPORTS_RENAME_COLUMN,
SUPPORTS_RENAME_FIELD(fallback -> fallback.test(SUPPORTS_RENAME_COLUMN) && fallback.test(SUPPORTS_ROW_TYPE)),
SUPPORTS_SET_COLUMN_TYPE,
SUPPORTS_SET_FIELD_TYPE(fallback -> fallback.test(SUPPORTS_SET_COLUMN_TYPE) && fallback.test(SUPPORTS_ROW_TYPE)),

SUPPORTS_COMMENT_ON_TABLE,
SUPPORTS_COMMENT_ON_COLUMN(SUPPORTS_COMMENT_ON_TABLE),
Expand Down

0 comments on commit 4c6319e

Please sign in to comment.