From 8a76610b2f56d174dff933d8b58685f784b3b339 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Mon, 17 Oct 2022 16:16:09 +0800 Subject: [PATCH 01/11] [Feature][Connector-V2][S3] Add S3 file source & sink connector --- .../seatunnel/file/config/FileSystemType.java | 3 +- .../connector-file/connector-file-s3/pom.xml | 52 ++++++++++ .../seatunnel/file/s3/config/S3Conf.java | 47 +++++++++ .../seatunnel/file/s3/config/S3Config.java | 27 ++++++ .../seatunnel/file/s3/sink/S3FileSink.java | 52 ++++++++++ .../file/s3/source/S3FileSource.java | 95 +++++++++++++++++++ .../services/org.apache.hadoop.fs.FileSystem | 16 ++++ .../connector-file/pom.xml | 1 + 8 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java index f1d271f3686..9dbc34a5d01 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java @@ -23,7 +23,8 @@ public enum FileSystemType implements Serializable { HDFS("HdfsFile"), LOCAL("LocalFile"), OSS("OssFile"), - FTP("FtpFile"); + FTP("FtpFile"), + S3("S3File"); private final String fileSystemPluginName; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml new file mode 100644 index 00000000000..e60e5b281b1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml @@ -0,0 +1,52 @@ + + + + + connector-file + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-file-s3 + + + 2.6.5 + + + + + + org.apache.seatunnel + connector-file-base + ${project.version} + + + + org.apache.hadoop + hadoop-aws + ${hadoop-aws.version} + + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java new file mode 100644 index 00000000000..360e5c7afb2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.config; + +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.HashMap; + +public class S3Conf extends HadoopConf { + private final String fsHdfsImpl = "org.apache.hadoop.fs.s3.S3FileSystem"; + + @Override + public String getFsHdfsImpl() { + return fsHdfsImpl; + } + + private S3Conf(String hdfsNameKey) { + super(hdfsNameKey); + } + + public static HadoopConf buildWithConfig(Config config) { + HadoopConf hadoopConf = new S3Conf(config.getString(S3Config.S3_BUCKET)); + HashMap s3Options = new HashMap<>(); + s3Options.put("fs.s3n.awsAccessKeyId", config.getString(S3Config.S3_ACCESS_KEY)); + s3Options.put("fs.s3n.awsSecretAccessKey", config.getString(S3Config.S3_SECRET_KEY)); + s3Options.put("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + hadoopConf.setExtraOptions(s3Options); + return hadoopConf; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java new file mode 100644 index 00000000000..f7a6b611613 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.config; + +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; + +public class S3Config extends BaseSourceConfig { + public static final String S3_ACCESS_KEY = "access_key"; + public static final String S3_SECRET_KEY = "secret_key"; + public static final String S3_BUCKET = "bucket"; + +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java new file mode 100644 index 00000000000..8db00f14971 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSink.class) +public class S3FileSink extends BaseFileSink { + @Override + public String getPluginName() { + return FileSystemType.S3.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + super.prepare(pluginConfig); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + S3Config.FILE_PATH, S3Config.S3_BUCKET, + S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + } + hadoopConf = S3Conf.buildWithConfig(pluginConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java new file mode 100644 index 00000000000..d77c3d4e559 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.s3.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf; +import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config; +import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSource.class) +public class S3FileSource extends BaseFileSource { + @Override + public String getPluginName() { + return FileSystemType.S3.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + S3Config.FILE_PATH, S3Config.FILE_TYPE, S3Config.S3_BUCKET, + S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE)); + readStrategy.setPluginConfig(pluginConfig); + String path = pluginConfig.getString(S3Config.FILE_PATH); + hadoopConf = S3Conf.buildWithConfig(pluginConfig); + try { + filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); + } catch (IOException e) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail."); + } + // support user-defined schema + FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE).toUpperCase()); + // only json text csv type support user-defined schema now + if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) { + switch (fileFormat) { + case CSV: + case TEXT: + case JSON: + Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA); + SeaTunnelRowType userDefinedSchema = SeaTunnelSchema + .buildWithConfig(schemaConfig) + .getSeaTunnelRowType(); + readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); + break; + case ORC: + case PARQUET: + throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files"); + default: + // never got in there + throw new UnsupportedOperationException("SeaTunnel does not supported this file format"); + } + } else { + try { + rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); + } catch (FilePluginException e) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 00000000000..0a7ee15c556 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.s3native.NativeS3FileSystem \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml index fff8e0cd917..60d73ab9d39 100644 --- a/seatunnel-connectors-v2/connector-file/pom.xml +++ b/seatunnel-connectors-v2/connector-file/pom.xml @@ -40,6 +40,7 @@ connector-file-oss connector-file-ftp connector-file-base-hadoop + connector-file-s3 From 006889955cddd0c68263bf4a5668a14b17905f13 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Mon, 17 Oct 2022 19:03:07 +0800 Subject: [PATCH 02/11] [Feature][Connector-V2][S3] Update pom --- .../connector-file/connector-file-s3/pom.xml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml index e60e5b281b1..8d99577c206 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml @@ -41,10 +41,28 @@ ${project.version} + + org.apache.flink + flink-shaded-hadoop-2 + provided + + + avro + org.apache.avro + + + + org.apache.hadoop hadoop-aws ${hadoop-aws.version} + + + hadoop-common + org.apache.hadoop + + From ab5923e029eb2d0721712581eb4cf03d28a62726 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 14:22:37 +0800 Subject: [PATCH 03/11] [Feature][Connector-V2][S3] Change class name --- .../seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java index 360e5c7afb2..728ff14fa87 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java @@ -24,7 +24,7 @@ import java.util.HashMap; public class S3Conf extends HadoopConf { - private final String fsHdfsImpl = "org.apache.hadoop.fs.s3.S3FileSystem"; + private final String fsHdfsImpl = "org.apache.hadoop.fs.s3native.NativeS3FileSystem"; @Override public String getFsHdfsImpl() { From 49be220d074c1b3904f9ac82ca60b2dfef02bd5b Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 14:23:21 +0800 Subject: [PATCH 04/11] [Feature][Connector-V2][S3] Add docs --- docs/en/connector-v2/sink/S3File.md | 198 +++++++++++++++++++++ docs/en/connector-v2/source/S3File.md | 237 ++++++++++++++++++++++++++ 2 files changed, 435 insertions(+) create mode 100644 docs/en/connector-v2/sink/S3File.md create mode 100644 docs/en/connector-v2/source/S3File.md diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md new file mode 100644 index 00000000000..e1b9534aa08 --- /dev/null +++ b/docs/en/connector-v2/sink/S3File.md @@ -0,0 +1,198 @@ +# S3File + +> S3 file sink connector + +## Description + +Output data to aws s3 file system. + +> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies. +> It's only support hadoop version **2.6.5+**. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +By default, we use 2PC commit to ensure `exactly-once` + +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [x] file format + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + +## Options + +| name | type | required | default value | +|----------------------------------|---------|----------|-----------------------------------------------------------| +| path | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| file_name_expression | string | no | "${transactionId}" | +| file_format | string | no | "text" | +| filename_time_format | string | no | "yyyy.MM.dd" | +| field_delimiter | string | no | '\001' | +| row_delimiter | string | no | "\n" | +| partition_by | array | no | - | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | +| is_partition_field_write_in_file | boolean | no | false | +| sink_columns | array | no | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | +| common-options | | no | - | + +### path [string] + +The target dir path is required. + +### bucket [string] + +The bucket address of s3 file system, for example: `s3n://seatunnel-test` + +**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`** + +### access_key [string] + +The access key of s3 file system. + +### access_secret [string] + +The access secret of s3 file system. + +### file_name_expression [string] + +`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, +`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +### file_format [string] + +We supported as the following file types: + +`text` `csv` `parquet` `orc` `json` + +Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. + +### filename_time_format [string] + +When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: + +| Symbol | Description | +|--------|--------------------| +| y | Year | +| M | Month | +| d | Day of month | +| H | Hour in day (0-23) | +| m | Minute in hour | +| s | Second in minute | + +See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax. + +### field_delimiter [string] + +The separator between columns in a row of data. Only needed by `text` and `csv` file format. + +### row_delimiter [string] + +The separator between rows in a file. Only needed by `text` and `csv` file format. + +### partition_by [array] + +Partition data based on selected fields + +### partition_dir_expression [string] + +If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. + +Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. + +### is_partition_field_write_in_file [boolean] + +If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be written into data file. + +For example, if you want to write a Hive Data File, Its value should be `false`. + +### sink_columns [array] + +Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. +The order of the fields determines the order in which the file is actually written. + +### is_enable_transaction [boolean] + +If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +Only support `true` now. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +## Example + +For text file format + +```hocon + + S3File { + access_key = "xxxxxxxxxxxxxxxxx" + secret_key = "xxxxxxxxxxxxxxxxx" + bucket = "s3n://seatunnel-test" + tmp_path = "/tmp/seatunnel" + path="/seatunnel/text" + row_delimiter="\n" + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="text" + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + } + +``` + +For parquet file format + +```hocon + + S3File { + access_key = "xxxxxxxxxxxxxxxxx" + secret_key = "xxxxxxxxxxxxxxxxx" + bucket = "s3n://seatunnel-test" + tmp_path = "/tmp/seatunnel" + path="/seatunnel/parquet" + row_delimiter="\n" + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="parquet" + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + } + +``` + +For orc file format + +```hocon + + S3File { + access_key = "xxxxxxxxxxxxxxxxx" + secret_key = "xxxxxxxxxxxxxxxxx" + bucket = "s3n://seatunnel-test" + tmp_path = "/tmp/seatunnel" + path="/seatunnel/orc" + row_delimiter="\n" + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="orc" + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + } + +``` diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md new file mode 100644 index 00000000000..cf4b243b077 --- /dev/null +++ b/docs/en/connector-v2/source/S3File.md @@ -0,0 +1,237 @@ +# S3File + +> S3 file source connector + +## Description + +Read data from aws s3 file system. + +> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies. +> It's only support hadoop version **2.6.5+**. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) + +Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot. + +- [x] [schema projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) +- [x] file format + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + +## Options + +| name | type | required | default value | +|---------------------------|---------|----------|---------------------| +| path | string | yes | - | +| type | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| delimiter | string | no | \001 | +| parse_partition_from_path | boolean | no | true | +| date_format | string | no | yyyy-MM-dd | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | +| time_format | string | no | HH:mm:ss | +| schema | config | no | - | +| common-options | | no | - | + +### path [string] + +The source file path. + +### delimiter [string] + +Field delimiter, used to tell connector how to slice and dice fields when reading text files + +default `\001`, the same as hive's default delimiter + +### parse_partition_from_path [boolean] + +Control whether parse the partition keys and values from file path + +For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` + +Every record data from file will be added these two fields: + +| name | age | +|----------------|-----| +| tyrantlucifer | 26 | + +Tips: **Do not define partition fields in schema option** + +### date_format [string] + +Date type format, used to tell connector how to convert string to date, supported as the following formats: + +`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` + +default `yyyy-MM-dd` + +### datetime_format [string] + +Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats: + +`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` + +default `yyyy-MM-dd HH:mm:ss` + +### time_format [string] + +Time type format, used to tell connector how to convert string to time, supported as the following formats: + +`HH:mm:ss` `HH:mm:ss.SSS` + +default `HH:mm:ss` + +### type [string] + +File type, supported as the following file types: + +`text` `csv` `parquet` `orc` `json` + +If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. + +For example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +You can also save multiple pieces of data in one file and split them by newline: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +you should assign schema as the following: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +connector will generate data as the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. + +If you assign file type to `text` `csv`, you can choose to specify the schema information or not. + +For example, upstream data is the following: + +```text + +tyrantlucifer#26#male + +``` + +If you do not assign data schema connector will treat the upstream data as the following: + +| content | +|------------------------| +| tyrantlucifer#26#male | + +If you assign data schema, you should also assign the option `delimiter` too except CSV file type + +you should assign schema and delimiter as the following: + +```hocon + +delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +connector will generate data as the following: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +### bucket [string] + +The bucket address of s3 file system, for example: `s3n://seatunnel-test` + +**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`** + +### access_key [string] + +The access key of s3 file system. + +### access_secret [string] + +The access secret of s3 file system. + +### schema [config] + +#### fields [Config] + +The schema of upstream data. + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +## Example + +```hocon + + S3File { + path = "/seatunnel/text" + access_key = "xxxxxxxxxxxxxxxxx" + secret_key = "xxxxxxxxxxxxxxxxx" + bucket = "s3n://seatunnel-test" + type = "text" + } + +``` + +```hocon + + S3File { + path = "/seatunnel/json" + bucket = "s3n://seatunnel-test" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + type = "json" + schema { + fields { + id = int + name = string + } + } + } + +``` \ No newline at end of file From 94c38414909d6afc193e6a687866aec7ad7404f0 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 14:25:10 +0800 Subject: [PATCH 05/11] [Feature][Connector-V2][S3] Update plugin-mapping.properties --- plugin-mapping.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 0b786891e48..f46a22bad2b 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -130,4 +130,6 @@ seatunnel.sink.Sentry = connector-sentry seatunnel.source.MongoDB = connector-mongodb seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg -seatunnel.source.influxdb = connector-influxdb \ No newline at end of file +seatunnel.source.influxdb = connector-influxdb +seatunnel.source.S3File = connector-file-s3 +seatunnel.sink.S3File = connector-file-s3 \ No newline at end of file From b376aa50286b22f0bc94818bad106d92189e5af1 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 16:25:22 +0800 Subject: [PATCH 06/11] [Feature][Connector-V2][S3] Update pom --- .../connector-file/connector-file-s3/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml index 8d99577c206..144aa1b3783 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml @@ -57,12 +57,6 @@ org.apache.hadoop hadoop-aws ${hadoop-aws.version} - - - hadoop-common - org.apache.hadoop - - From 77c9bcfd156c0bd4fad506fdecd00e8e66778c4c Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 17:38:44 +0800 Subject: [PATCH 07/11] [Feature][Connector-V2][S3] Update pom --- .../connector-file/connector-file-s3/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml index 144aa1b3783..d212bb99fa6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml @@ -57,6 +57,12 @@ org.apache.hadoop hadoop-aws ${hadoop-aws.version} + + + jdk.tools + jdk.tools + + From 9b63eaddb6e6383b76044256fabacb36f29f31d5 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 17:53:04 +0800 Subject: [PATCH 08/11] [Feature][Connector-V2][S3] Fix plugin-mapping.properties --- plugin-mapping.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index f46a22bad2b..fa70c6ea731 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -130,6 +130,6 @@ seatunnel.sink.Sentry = connector-sentry seatunnel.source.MongoDB = connector-mongodb seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg -seatunnel.source.influxdb = connector-influxdb +seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 seatunnel.sink.S3File = connector-file-s3 \ No newline at end of file From 8bd50b48fcd393e9fd8d84b25f24bf858db012b5 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 17:57:41 +0800 Subject: [PATCH 09/11] [Feature][Connector-V2][Influxdb] Fix seatunnel-dist --- seatunnel-dist/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index a0d0350d3eb..5ede53eb2ae 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -272,6 +272,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-influxdb + ${project.version} + provided + From 1089809c51fa6c9d5725f9dbcf4ba178714a8709 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Tue, 18 Oct 2022 18:09:07 +0800 Subject: [PATCH 10/11] [Feature][Connector-V2][S3] Add dependency in seatunnel-dist --- seatunnel-dist/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 5ede53eb2ae..ef635378f45 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -278,6 +278,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-file-s3 + ${project.version} + provided + From 69b95d3b67c9136d3ccb0b9e8d9927a65916ef99 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 19 Oct 2022 16:58:53 +0800 Subject: [PATCH 11/11] [Feature][Connector-V2][S3] Add change log --- docs/en/connector-v2/sink/S3File.md | 6 ++++++ docs/en/connector-v2/source/S3File.md | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index e1b9534aa08..4f0e3376f34 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -196,3 +196,9 @@ For orc file format } ``` + +## Changelog + +| Version | Date | Pull Request | Subject | +|------------|------------|-----------------------------------------------------------------|--------------| +| 2.2.0-beta | 2022-10-17 | [3119](https://github.com/apache/incubator-seatunnel/pull/3119) | First commit | \ No newline at end of file diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index cf4b243b077..3178726f2a9 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -234,4 +234,10 @@ Source plugin common parameters, please refer to [Source Common Options](common- } } -``` \ No newline at end of file +``` + +## Changelog + +| Version | Date | Pull Request | Subject | +|------------|------------|-----------------------------------------------------------------|--------------| +| 2.2.0-beta | 2022-10-17 | [3119](https://github.com/apache/incubator-seatunnel/pull/3119) | First commit | \ No newline at end of file