From e3103535cccad6928ab09cca76278abbbb57d490 Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Mon, 22 Apr 2024 12:12:24 +0800 Subject: [PATCH] [Feature][Connector-V2] Iceberg-sink supports writing data to branches (#6697) --- docs/en/connector-v2/sink/Iceberg.md | 1 + .../seatunnel/iceberg/config/SinkConfig.java | 8 + .../iceberg/sink/IcebergSinkFactory.java | 3 +- .../sink/commit/IcebergFilesCommitter.java | 8 + .../sink/writer/IcebergWriterFactory.java | 4 + .../iceberg/IcebergSinkWithBranchIT.java | 204 ++++++++++++++++++ .../iceberg/fake_to_iceberg_with_branch.conf | 75 +++++++ 7 files changed, 302 insertions(+), 1 deletion(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkWithBranchIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf diff --git a/docs/en/connector-v2/sink/Iceberg.md b/docs/en/connector-v2/sink/Iceberg.md index dc73f491bcd..3aa24a0a636 100644 --- a/docs/en/connector-v2/sink/Iceberg.md +++ b/docs/en/connector-v2/sink/Iceberg.md @@ -72,6 +72,7 @@ libfb303-xxx.jar | iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` | | schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | | data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | +| iceberg.table.commit-branch | string | no | - | Default branch for commits | ## Task Example diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java index de9c74344f0..0c0dc1b138c 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java @@ -109,6 +109,12 @@ public class SinkConfig extends CommonConfig { .defaultValue(DataSaveMode.APPEND_DATA) .withDescription("data save mode"); + public static final Option TABLES_DEFAULT_COMMIT_BRANCH = + Options.key("iceberg.table.commit-branch") + .stringType() + .noDefaultValue() + .withDescription("Default branch for commits"); + @VisibleForTesting private static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))"; private final ReadonlyConfig readonlyConfig; @@ -116,6 +122,7 @@ public class SinkConfig extends CommonConfig { private Map writeProps; private List primaryKeys; private List partitionKeys; + private String commitBranch; private boolean upsertModeEnabled; private boolean tableSchemaEvolutionEnabled; @@ -133,6 +140,7 @@ public SinkConfig(ReadonlyConfig readonlyConfig) { this.tableSchemaEvolutionEnabled = readonlyConfig.get(TABLE_SCHEMA_EVOLUTION_ENABLED_PROP); this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE); this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE); + this.commitBranch = readonlyConfig.get(TABLES_DEFAULT_COMMIT_BRANCH); } @VisibleForTesting diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java index 3c30c38e0da..3441420226c 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java @@ -62,7 +62,8 @@ public OptionRule optionRule() { SinkConfig.TABLE_PRIMARY_KEYS, SinkConfig.TABLE_DEFAULT_PARTITION_KEYS, SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP, - SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP) + SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP, + SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH) .build(); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java index 07363d69e1a..0b5e473440d 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java @@ -39,10 +39,12 @@ public class IcebergFilesCommitter implements Serializable { private IcebergTableLoader icebergTableLoader; private boolean caseSensitive; + private String branch; private IcebergFilesCommitter(SinkConfig config, IcebergTableLoader icebergTableLoader) { this.icebergTableLoader = icebergTableLoader; this.caseSensitive = config.isCaseSensitive(); + this.branch = config.getCommitBranch(); } public static IcebergFilesCommitter of( @@ -77,10 +79,16 @@ private void commit(TableIdentifier tableIdentifier, Table table, List { + container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR); + container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR); + container.execInContainer( + "sh", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget " + + zstdUrl()); + }; + + private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz"; + protected final ContainerExtendedFactory containerExtendedFactory = + new ContainerExtendedFactory() { + @Override + public void extend(GenericContainer container) + throws IOException, InterruptedException { + FileUtils.createNewDir(CATALOG_DIR); + container.execInContainer( + "sh", + "-c", + "cd " + + CATALOG_DIR + + " && tar -czvf " + + NAMESPACE_TAR + + " " + + NAMESPACE); + container.copyFileFromContainer( + CATALOG_DIR + NAMESPACE_TAR, CATALOG_DIR + NAMESPACE_TAR); + extractFiles(); + } + + private void extractFiles() { + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command( + "sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " + NAMESPACE_TAR); + try { + Process process = processBuilder.start(); + int exitCode = process.waitFor(); + if (exitCode == 0) { + log.info("Extract files successful."); + } else { + log.error("Extract files failed with exit code " + exitCode); + } + } catch (IOException | InterruptedException e) { + log.error("Extract data files from container error :", e); + } + } + }; + + @TestTemplate + public void testInsertAndCheckDataE2e(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult textWriteResult = + container.executeJob("/iceberg/fake_to_iceberg_with_branch.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + // stream stage + given().ignoreExceptions() + .await() + .atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // copy iceberg to local + container.executeExtraCommands(containerExtendedFactory); + // check branch exists + Assertions.assertEquals(true, checkBranchExists()); + // load from branch + Assertions.assertEquals(100, loadDataFromIcebergTableBranch().size()); + }); + } + + private boolean checkBranchExists() { + Table table = getTable(); + Map refs = table.refs(); + if (refs.containsKey(commitBranch)) { + return true; + } + return false; + } + + private List loadDataFromIcebergTableBranch() { + List results = new ArrayList<>(); + Table table = getTable(); + TableScan branchRead = table.newScan().useRef(commitBranch); + CloseableIterable fileScanTasks = branchRead.planFiles(); + fileScanTasks.forEach( + fileScanTask -> { + try { + DataFile file = fileScanTask.file(); + HadoopInputFile inputFile = + HadoopInputFile.fromPath( + new Path(file.path().toString()), new Configuration()); + try (ParquetReader reader = + AvroParquetReader.builder(inputFile).build()) { + Object record; + while ((record = reader.read()) != null) { + results.add(record); + } + } + } catch (IOException e) { + log.error("Table scan branch error :", e); + } + }); + return results; + } + + public Table getTable() { + + Map configs = new HashMap<>(); + Map catalogProps = new HashMap<>(); + catalogProps.put("type", HADOOP.getType()); + catalogProps.put("warehouse", "file://" + CATALOG_DIR); + configs.put(CommonConfig.KEY_CATALOG_NAME.key(), "seatunnel_test"); + configs.put(CommonConfig.KEY_NAMESPACE.key(), "seatunnel_namespace"); + configs.put(CommonConfig.KEY_TABLE.key(), "iceberg_sink_table"); + configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps); + IcebergTableLoader tableLoader = + IcebergTableLoader.create(new SourceConfig(ReadonlyConfig.fromMap(configs))); + tableLoader.open(); + // from branch + return tableLoader.loadTable(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf new file mode 100644 index 00000000000..91a48193d77 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 100 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + result_table_name = "fake" + } +} + +transform { +} + +sink { + Iceberg { + catalog_name="seatunnel_test" + iceberg.catalog.config={ + "type"="hadoop" + "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/" + } + namespace="seatunnel_namespace" + table="iceberg_sink_table" + iceberg.table.write-props={ + write.format.default="parquet" + write.target-file-size-bytes=10 + } + iceberg.table.commit-branch="commit-branch" + iceberg.table.partition-keys="c_timestamp" + case_sensitive=true + } +} \ No newline at end of file