From ab2b04fe2d3e7d71bf64741bbe81ff7ebce7cf90 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Fri, 2 Dec 2022 22:49:59 +0800 Subject: [PATCH] [Connector][S3]Support s3a protocol Allow user to add additional hadoop-s3 parameters Allow the use of the s3a protocol Decouple hadoop-aws dependencies --- docs/en/connector-v2/sink/S3File.md | 33 +++++++++++--- docs/en/connector-v2/source/S3File.md | 28 +++++++++--- .../connector-file/connector-file-s3/pom.xml | 1 + .../seatunnel/file/s3/config/S3Conf.java | 44 ++++++++++++++++--- .../seatunnel/file/s3/config/S3Config.java | 35 +++++++++++---- .../file/s3/source/S3FileSourceFactory.java | 1 + 6 files changed, 116 insertions(+), 26 deletions(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 53175f33055..a07feee578a 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -8,6 +8,7 @@ Output data to aws s3 file system. > Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies. > It's only support hadoop version **2.6.5+**. +> Use this connector, you need add hadoop-aws.jar and hadoop-client.jar to the plugin directory. ## Key features @@ -31,6 +32,7 @@ By default, we use 2PC commit to ensure `exactly-once` | bucket | string | yes | - | | access_key | string | yes | - | | access_secret | string | yes | - | +| hadoop_s3_properties | map | no | - | | file_name_expression | string | no | "${transactionId}" | | file_format | string | no | "text" | | filename_time_format | string | no | "yyyy.MM.dd" | @@ -49,9 +51,7 @@ The target dir path is required. ### bucket [string] -The bucket address of s3 file system, for example: `s3n://seatunnel-test` - -**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`** +The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. ### access_key [string] @@ -61,6 +61,15 @@ The access key of s3 file system. The access secret of s3 file system. +### hadoop_s3_properties [map] + +If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +``` + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } +``` + ### file_name_expression [string] `file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, @@ -141,7 +150,7 @@ For text file format S3File { access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" - bucket = "s3n://seatunnel-test" + bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/text" row_delimiter="\n" @@ -151,6 +160,9 @@ For text file format file_format="text" filename_time_format="yyyy.MM.dd" is_enable_transaction=true + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } } ``` @@ -162,7 +174,7 @@ For parquet file format S3File { access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" - bucket = "s3n://seatunnel-test" + bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/parquet" row_delimiter="\n" @@ -172,6 +184,9 @@ For parquet file format file_format="parquet" filename_time_format="yyyy.MM.dd" is_enable_transaction=true + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } } ``` @@ -183,7 +198,7 @@ For orc file format S3File { access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" - bucket = "s3n://seatunnel-test" + bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel" path="/seatunnel/orc" row_delimiter="\n" @@ -193,6 +208,9 @@ For orc file format file_format="orc" filename_time_format="yyyy.MM.dd" is_enable_transaction=true + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } } ``` @@ -207,4 +225,5 @@ For orc file format - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) - When field from upstream is null it will throw NullPointerException - Sink columns mapping failed - - When restore writer from states getting transaction directly failed \ No newline at end of file + - When restore writer from states getting transaction directly failed + - Support S3A protocol \ No newline at end of file diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 81b4d52104b..106892a3c8b 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -8,6 +8,7 @@ Read data from aws s3 file system. > Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies. > It's only support hadoop version **2.6.5+**. +> Use this connector, you need add hadoop-aws.jar and hadoop-client.jar to the plugin directory. ## Key features @@ -36,6 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | bucket | string | yes | - | | access_key | string | yes | - | | access_secret | string | yes | - | +| hadoop_s3_properties | map | no | - | | delimiter | string | no | \001 | | parse_partition_from_path | boolean | no | true | | date_format | string | no | yyyy-MM-dd | @@ -182,9 +184,7 @@ connector will generate data as the following: ### bucket [string] -The bucket address of s3 file system, for example: `s3n://seatunnel-test` - -**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`** +The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. ### access_key [string] @@ -194,6 +194,15 @@ The access key of s3 file system. The access secret of s3 file system. +### hadoop_s3_properties [map] + +If you need to add a other option, you could add it here and refer to this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +``` + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } +``` + ### schema [config] #### fields [Config] @@ -212,8 +221,11 @@ Source plugin common parameters, please refer to [Source Common Options](common- path = "/seatunnel/text" access_key = "xxxxxxxxxxxxxxxxx" secret_key = "xxxxxxxxxxxxxxxxx" - bucket = "s3n://seatunnel-test" + bucket = "s3a://seatunnel-test" type = "text" + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } } ``` @@ -222,7 +234,7 @@ Source plugin common parameters, please refer to [Source Common Options](common- S3File { path = "/seatunnel/json" - bucket = "s3n://seatunnel-test" + bucket = "s3a://seatunnel-test" access_key = "xxxxxxxxxxxxxxxxx" access_secret = "xxxxxxxxxxxxxxxxxxxxxx" type = "json" @@ -232,6 +244,9 @@ Source plugin common parameters, please refer to [Source Common Options](common- name = string } } + hadoop_s3_properties { + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + } } ``` @@ -241,3 +256,6 @@ Source plugin common parameters, please refer to [Source Common Options](common- ### 2.3.0-beta 2022-10-20 - Add S3File Source Connector + +### Next version +- Support S3A protocol diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml index d212bb99fa6..65e22875046 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml @@ -57,6 +57,7 @@ org.apache.hadoop hadoop-aws ${hadoop-aws.version} + provided jdk.tools diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java index 2795c63e687..46cabf49bee 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java @@ -17,19 +17,24 @@ package org.apache.seatunnel.connectors.seatunnel.file.s3.config; +import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.shade.com.typesafe.config.Config; import java.util.HashMap; +import java.util.Map; public class S3Conf extends HadoopConf { - private static final String HDFS_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem"; - private static final String SCHEMA = "s3n"; + private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem"; + private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem"; + private static final String S3A_SCHEMA = "s3a"; + private static final String DEFAULT_SCHEMA = "s3n"; + private static String SCHEMA = DEFAULT_SCHEMA; @Override public String getFsHdfsImpl() { - return HDFS_IMPL; + return switchHdfsImpl(); } @Override @@ -42,11 +47,40 @@ private S3Conf(String hdfsNameKey) { } public static HadoopConf buildWithConfig(Config config) { + HadoopConf hadoopConf = new S3Conf(config.getString(S3Config.S3_BUCKET.key())); + String bucketName = config.getString(S3Config.S3_BUCKET.key()); + if (bucketName.startsWith(S3A_SCHEMA)) { + SCHEMA = S3A_SCHEMA; + } HashMap s3Options = new HashMap<>(); - s3Options.put("fs.s3n.awsAccessKeyId", config.getString(S3Config.S3_ACCESS_KEY.key())); - s3Options.put("fs.s3n.awsSecretAccessKey", config.getString(S3Config.S3_SECRET_KEY.key())); + putS3SK(s3Options, config); + if (CheckConfigUtil.isValidParam(config, S3Config.S3_PROPERTIES.key())) { + config.getObject(S3Config.S3_PROPERTIES.key()).forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped()))); + } hadoopConf.setExtraOptions(s3Options); return hadoopConf; } + + private String switchHdfsImpl() { + switch (SCHEMA) { + case S3A_SCHEMA: + return HDFS_S3A_IMPL; + default: + return HDFS_S3N_IMPL; + } + } + + private static void putS3SK(Map s3Options, Config config) { + String accessKey = config.getString(S3Config.S3_ACCESS_KEY.key()); + String secretKey = config.getString(S3Config.S3_SECRET_KEY.key()); + if (S3A_SCHEMA.equals(SCHEMA)) { + s3Options.put("fs.s3a.access.key", accessKey); + s3Options.put("fs.s3a.secret.key", secretKey); + return; + } + // default s3n + s3Options.put("fs.s3n.awsAccessKeyId", accessKey); + s3Options.put("fs.s3n.awsSecretAccessKey", secretKey); + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java index a3e9021a8aa..5b1acbdcc58 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java @@ -21,18 +21,35 @@ import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import java.util.Map; + public class S3Config extends BaseSourceConfig { public static final Option S3_ACCESS_KEY = Options.key("access_key") - .stringType() - .noDefaultValue() - .withDescription("S3 access key"); + .stringType() + .noDefaultValue() + .withDescription("S3 access key"); public static final Option S3_SECRET_KEY = Options.key("secret_key") - .stringType() - .noDefaultValue() - .withDescription("S3 secret key"); + .stringType() + .noDefaultValue() + .withDescription("S3 secret key"); public static final Option S3_BUCKET = Options.key("bucket") - .stringType() - .noDefaultValue() - .withDescription("S3 bucket"); + .stringType() + .noDefaultValue() + .withDescription("S3 bucket"); + + /** + * The current key for that config option. + * if you need to add a new option, you can add it here and refer to this: + *

+ * https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html + *

+ * such as: + * key = "fs.s3a.session.token" + * value = "SECRET-SESSION-TOKEN" + */ + public static final Option> S3_PROPERTIES = Options.key("hadoop_s3_properties") + .mapType() + .noDefaultValue() + .withDescription("S3 properties"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java index b9985690786..695152328b6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java @@ -43,6 +43,7 @@ public OptionRule optionRule() { .required(S3Config.S3_BUCKET) .required(S3Config.S3_ACCESS_KEY) .required(S3Config.S3_SECRET_KEY) + .optional(S3Config.S3_PROPERTIES) .optional(S3Config.DELIMITER) .optional(S3Config.PARSE_PARTITION_FROM_PATH) .optional(S3Config.DATE_FORMAT)