Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector][S3]Support s3a protocol #3632

Merged
merged 1 commit into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions docs/en/connector-v2/sink/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Output data to aws s3 file system.

> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies.
> It's only support hadoop version **2.6.5+**.
> Use this connector, you need add hadoop-aws.jar and hadoop-client.jar to the plugin directory.

## Key features

Expand All @@ -31,6 +32,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
| hadoop_s3_properties | map | no | - |
| file_name_expression | string | no | "${transactionId}" |
| file_format | string | no | "text" |
| filename_time_format | string | no | "yyyy.MM.dd" |
Expand All @@ -49,9 +51,7 @@ The target dir path is required.

### bucket [string]

The bucket address of s3 file system, for example: `s3n://seatunnel-test`

**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`**
The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`.

### access_key [string]

Expand All @@ -61,6 +61,15 @@ The access key of s3 file system.

The access secret of s3 file system.

### hadoop_s3_properties [map]

If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html)
```
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
```

### file_name_expression [string]

`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,
Expand Down Expand Up @@ -141,7 +150,7 @@ For text file format
S3File {
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3n://seatunnel-test"
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
row_delimiter="\n"
Expand All @@ -151,6 +160,9 @@ For text file format
file_format="text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

```
Expand All @@ -162,7 +174,7 @@ For parquet file format
S3File {
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3n://seatunnel-test"
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
row_delimiter="\n"
Expand All @@ -172,6 +184,9 @@ For parquet file format
file_format="parquet"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

```
Expand All @@ -183,7 +198,7 @@ For orc file format
S3File {
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3n://seatunnel-test"
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
row_delimiter="\n"
Expand All @@ -193,6 +208,9 @@ For orc file format
file_format="orc"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

```
Expand All @@ -207,4 +225,5 @@ For orc file format
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
- When restore writer from states getting transaction directly failed
- Support S3A protocol
28 changes: 23 additions & 5 deletions docs/en/connector-v2/source/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Read data from aws s3 file system.

> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies.
> It's only support hadoop version **2.6.5+**.
> Use this connector, you need add hadoop-aws.jar and hadoop-client.jar to the plugin directory.

## Key features

Expand Down Expand Up @@ -36,6 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
| hadoop_s3_properties | map | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
Expand Down Expand Up @@ -182,9 +184,7 @@ connector will generate data as the following:

### bucket [string]

The bucket address of s3 file system, for example: `s3n://seatunnel-test`

**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`**
The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`.

### access_key [string]

Expand All @@ -194,6 +194,15 @@ The access key of s3 file system.

The access secret of s3 file system.

### hadoop_s3_properties [map]

If you need to add a other option, you could add it here and refer to this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html)
```
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
```

### schema [config]

#### fields [Config]
Expand All @@ -212,8 +221,11 @@ Source plugin common parameters, please refer to [Source Common Options](common-
path = "/seatunnel/text"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3n://seatunnel-test"
bucket = "s3a://seatunnel-test"
type = "text"
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

```
Expand All @@ -222,7 +234,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-

S3File {
path = "/seatunnel/json"
bucket = "s3n://seatunnel-test"
bucket = "s3a://seatunnel-test"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
type = "json"
Expand All @@ -232,6 +244,9 @@ Source plugin common parameters, please refer to [Source Common Options](common-
name = string
}
}
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

```
Expand All @@ -241,3 +256,6 @@ Source plugin common parameters, please refer to [Source Common Options](common-
### 2.3.0-beta 2022-10-20

- Add S3File Source Connector

### Next version
- Support S3A protocol
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.seatunnel.connectors.seatunnel.file.s3.config;

import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import java.util.HashMap;
import java.util.Map;

public class S3Conf extends HadoopConf {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String SCHEMA = "s3n";
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_SCHEMA = "s3a";
private static final String DEFAULT_SCHEMA = "s3n";
private static String SCHEMA = DEFAULT_SCHEMA;

@Override
public String getFsHdfsImpl() {
return HDFS_IMPL;
return switchHdfsImpl();
}

@Override
Expand All @@ -42,11 +47,40 @@ private S3Conf(String hdfsNameKey) {
}

public static HadoopConf buildWithConfig(Config config) {

HadoopConf hadoopConf = new S3Conf(config.getString(S3Config.S3_BUCKET.key()));
String bucketName = config.getString(S3Config.S3_BUCKET.key());
if (bucketName.startsWith(S3A_SCHEMA)) {
SCHEMA = S3A_SCHEMA;
}
HashMap<String, String> s3Options = new HashMap<>();
s3Options.put("fs.s3n.awsAccessKeyId", config.getString(S3Config.S3_ACCESS_KEY.key()));
s3Options.put("fs.s3n.awsSecretAccessKey", config.getString(S3Config.S3_SECRET_KEY.key()));
putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config, S3Config.S3_PROPERTIES.key())) {
config.getObject(S3Config.S3_PROPERTIES.key()).forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
}
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
}

private String switchHdfsImpl() {
switch (SCHEMA) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
default:
return HDFS_S3N_IMPL;
}
}

private static void putS3SK(Map<String, String> s3Options, Config config) {
String accessKey = config.getString(S3Config.S3_ACCESS_KEY.key());
String secretKey = config.getString(S3Config.S3_SECRET_KEY.key());
if (S3A_SCHEMA.equals(SCHEMA)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
return;
}
// default s3n
s3Options.put("fs.s3n.awsAccessKeyId", accessKey);
s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,35 @@
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;

import java.util.Map;

public class S3Config extends BaseSourceConfig {
public static final Option<String> S3_ACCESS_KEY = Options.key("access_key")
.stringType()
.noDefaultValue()
.withDescription("S3 access key");
.stringType()
.noDefaultValue()
.withDescription("S3 access key");
public static final Option<String> S3_SECRET_KEY = Options.key("secret_key")
.stringType()
.noDefaultValue()
.withDescription("S3 secret key");
.stringType()
.noDefaultValue()
.withDescription("S3 secret key");
public static final Option<String> S3_BUCKET = Options.key("bucket")
.stringType()
.noDefaultValue()
.withDescription("S3 bucket");
.stringType()
.noDefaultValue()
.withDescription("S3 bucket");

/**
* The current key for that config option.
* if you need to add a new option, you can add it here and refer to this:
* <p>
* https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
* <p>
* such as:
* key = "fs.s3a.session.token"
* value = "SECRET-SESSION-TOKEN"
*/
public static final Option<Map<String, String>> S3_PROPERTIES = Options.key("hadoop_s3_properties")
.mapType()
.noDefaultValue()
.withDescription("S3 properties");

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public OptionRule optionRule() {
.required(S3Config.S3_BUCKET)
.required(S3Config.S3_ACCESS_KEY)
.required(S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
.optional(S3Config.DELIMITER)
.optional(S3Config.PARSE_PARTITION_FROM_PATH)
.optional(S3Config.DATE_FORMAT)
Expand Down