Skip to content

Commit

Permalink
[Feature][Connector-V2] Iceberg-sink supports writing data to branches (
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian authored Apr 22, 2024
1 parent c23804f commit e310353
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ libfb303-xxx.jar
| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below |
| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below |
| iceberg.table.commit-branch | string | no | - | Default branch for commits |

## Task Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,20 @@ public class SinkConfig extends CommonConfig {
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data save mode");

public static final Option<String> TABLES_DEFAULT_COMMIT_BRANCH =
Options.key("iceberg.table.commit-branch")
.stringType()
.noDefaultValue()
.withDescription("Default branch for commits");

@VisibleForTesting private static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

private final ReadonlyConfig readonlyConfig;
private Map<String, String> autoCreateProps;
private Map<String, String> writeProps;
private List<String> primaryKeys;
private List<String> partitionKeys;
private String commitBranch;

private boolean upsertModeEnabled;
private boolean tableSchemaEvolutionEnabled;
Expand All @@ -133,6 +140,7 @@ public SinkConfig(ReadonlyConfig readonlyConfig) {
this.tableSchemaEvolutionEnabled = readonlyConfig.get(TABLE_SCHEMA_EVOLUTION_ENABLED_PROP);
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
this.commitBranch = readonlyConfig.get(TABLES_DEFAULT_COMMIT_BRANCH);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public OptionRule optionRule() {
SinkConfig.TABLE_PRIMARY_KEYS,
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP)
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
public class IcebergFilesCommitter implements Serializable {
private IcebergTableLoader icebergTableLoader;
private boolean caseSensitive;
private String branch;

private IcebergFilesCommitter(SinkConfig config, IcebergTableLoader icebergTableLoader) {
this.icebergTableLoader = icebergTableLoader;
this.caseSensitive = config.isCaseSensitive();
this.branch = config.getCommitBranch();
}

public static IcebergFilesCommitter of(
Expand Down Expand Up @@ -77,10 +79,16 @@ private void commit(TableIdentifier tableIdentifier, Table table, List<WriteResu
} else {
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
if (branch != null) {
append.toBranch(branch);
}
dataFiles.forEach(append::appendFile);
append.commit();
} else {
RowDelta delta = table.newRowDelta();
if (branch != null) {
delta.toBranch(branch);
}
delta.caseSensitive(caseSensitive);
dataFiles.forEach(delta::addRows);
deleteFiles.forEach(delta::addDeletes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public RecordWriter createWriter(SeaTunnelRowType rowType) {
tableLoader.getTableIdentifier(),
config,
rowType);
// Create an empty snapshot for the branch
if (config.getCommitBranch() != null) {
table.manageSnapshots().createBranch(config.getCommitBranch()).commit();
}
break;
default:
throw exception;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.connector.iceberg;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
import static org.awaitility.Awaitility.given;

@Slf4j
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
type = {},
disabledReason = "")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkWithBranchIT extends TestSuiteBase {

private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/hadoop-sink/";

private static final String NAMESPACE = "seatunnel_namespace";

private static final String commitBranch = "commit-branch";

private String zstdUrl() {
return "https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.5-5/zstd-jni-1.5.5-5.jar";
}

@TestContainerExtension
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR);
container.execInContainer(
"sh",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget "
+ zstdUrl());
};

private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz";
protected final ContainerExtendedFactory containerExtendedFactory =
new ContainerExtendedFactory() {
@Override
public void extend(GenericContainer<?> container)
throws IOException, InterruptedException {
FileUtils.createNewDir(CATALOG_DIR);
container.execInContainer(
"sh",
"-c",
"cd "
+ CATALOG_DIR
+ " && tar -czvf "
+ NAMESPACE_TAR
+ " "
+ NAMESPACE);
container.copyFileFromContainer(
CATALOG_DIR + NAMESPACE_TAR, CATALOG_DIR + NAMESPACE_TAR);
extractFiles();
}

private void extractFiles() {
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(
"sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " + NAMESPACE_TAR);
try {
Process process = processBuilder.start();
int exitCode = process.waitFor();
if (exitCode == 0) {
log.info("Extract files successful.");
} else {
log.error("Extract files failed with exit code " + exitCode);
}
} catch (IOException | InterruptedException e) {
log.error("Extract data files from container error :", e);
}
}
};

@TestTemplate
public void testInsertAndCheckDataE2e(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult textWriteResult =
container.executeJob("/iceberg/fake_to_iceberg_with_branch.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
// stream stage
given().ignoreExceptions()
.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// copy iceberg to local
container.executeExtraCommands(containerExtendedFactory);
// check branch exists
Assertions.assertEquals(true, checkBranchExists());
// load from branch
Assertions.assertEquals(100, loadDataFromIcebergTableBranch().size());
});
}

private boolean checkBranchExists() {
Table table = getTable();
Map<String, SnapshotRef> refs = table.refs();
if (refs.containsKey(commitBranch)) {
return true;
}
return false;
}

private List<Object> loadDataFromIcebergTableBranch() {
List<Object> results = new ArrayList<>();
Table table = getTable();
TableScan branchRead = table.newScan().useRef(commitBranch);
CloseableIterable<FileScanTask> fileScanTasks = branchRead.planFiles();
fileScanTasks.forEach(
fileScanTask -> {
try {
DataFile file = fileScanTask.file();
HadoopInputFile inputFile =
HadoopInputFile.fromPath(
new Path(file.path().toString()), new Configuration());
try (ParquetReader<Object> reader =
AvroParquetReader.builder(inputFile).build()) {
Object record;
while ((record = reader.read()) != null) {
results.add(record);
}
}
} catch (IOException e) {
log.error("Table scan branch error :", e);
}
});
return results;
}

public Table getTable() {

Map<String, Object> configs = new HashMap<>();
Map<String, Object> catalogProps = new HashMap<>();
catalogProps.put("type", HADOOP.getType());
catalogProps.put("warehouse", "file://" + CATALOG_DIR);
configs.put(CommonConfig.KEY_CATALOG_NAME.key(), "seatunnel_test");
configs.put(CommonConfig.KEY_NAMESPACE.key(), "seatunnel_namespace");
configs.put(CommonConfig.KEY_TABLE.key(), "iceberg_sink_table");
configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
IcebergTableLoader tableLoader =
IcebergTableLoader.create(new SourceConfig(ReadonlyConfig.fromMap(configs)));
tableLoader.open();
// from branch
return tableLoader.loadTable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
row.num = 100
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
result_table_name = "fake"
}
}

transform {
}

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=10
}
iceberg.table.commit-branch="commit-branch"
iceberg.table.partition-keys="c_timestamp"
case_sensitive=true
}
}

0 comments on commit e310353

Please sign in to comment.