From 72ab38f3170b9a01766a616253aaa62a740f8a2c Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 29 Oct 2024 20:30:51 +0800 Subject: [PATCH] [Improve][Iceberg] Support table comment for catalog (#7936) --- .../seatunnel/iceberg/catalog/IcebergCatalog.java | 9 +++++++-- .../connectors/seatunnel/iceberg/utils/SchemaUtils.java | 3 +++ .../seatunnel/iceberg/catalog/IcebergCatalogTest.java | 3 ++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java index 60591d9893c..216b08f9e2d 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java @@ -63,6 +63,8 @@ @Slf4j public class IcebergCatalog implements Catalog { + public static final String PROPS_TABLE_COMMENT = "comment"; + private final String catalogName; private final ReadonlyConfig readonlyConfig; private final IcebergCatalogLoader icebergCatalogLoader; @@ -257,14 +259,17 @@ public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) { icebergTable.spec().fields().stream() .map(PartitionField::name) .collect(Collectors.toList()); - + String comment = + Optional.ofNullable(icebergTable.properties()) + .map(e -> e.get(PROPS_TABLE_COMMENT)) + .orElse(null); return CatalogTable.of( org.apache.seatunnel.api.table.catalog.TableIdentifier.of( catalogName, tablePath.getDatabaseName(), tablePath.getTableName()), builder.build(), icebergTable.properties(), partitionKeys, - null, + comment, catalogName); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java index 9aba4a777d8..780990572da 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.iceberg.catalog.IcebergCatalog; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergTypeMapper; import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaAddColumn; @@ -105,6 +106,8 @@ public static Table autoCreateTable( SinkConfig config = new SinkConfig(readonlyConfig); // build auto create table Map options = new HashMap<>(table.getOptions()); + Optional.ofNullable(table.getComment()) + .map(e -> options.put(IcebergCatalog.PROPS_TABLE_COMMENT, e)); // override options.putAll(config.getAutoCreateProps()); return createTable(catalog, toIcebergTableIdentifier(tablePath), config, schema, options); diff --git a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java index 6ec5ae5783f..1eeeeebdf9e 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java @@ -194,7 +194,8 @@ CatalogTable buildAllTypesTable(TableIdentifier tableIdentifier) { TableSchema schema = builder.build(); HashMap options = new HashMap<>(); options.put("write.parquet.compression-codec", "zstd"); + options.put("comment", "test"); return CatalogTable.of( - tableIdentifier, schema, options, Collections.singletonList("dt_col"), "null"); + tableIdentifier, schema, options, Collections.singletonList("dt_col"), "test"); } }