Skip to content

Commit

Permalink
[Improve][Connector-V2] Refactor hdfs file sink connector code struct…
Browse files Browse the repository at this point in the history
…ure (apache#2701)

* [Improve][Connector-V2] Refactor hdfs file sink codes
  • Loading branch information
TyrantLucifer committed Sep 18, 2022
1 parent 8b40291 commit 12a1fd5
Show file tree
Hide file tree
Showing 21 changed files with 51 additions and 1,831 deletions.
44 changes: 26 additions & 18 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,29 @@ By default, we use 2PC commit to ensure `exactly-once`

In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.

| name | type | required | default value |
| --------------------------------- | ------ | -------- |---------------------------------------------------------|
| path | string | yes | - |
| file_name_expression | string | no | "${transactionId}" |
| file_format | string | no | "text" |
| filename_time_format | string | no | "yyyy.MM.dd" |
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
| partition_by | array | no | - |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
| is_partition_field_write_in_file | boolean| no | false |
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean| no | true |
| save_mode | string | no | "error" |
| name | type | required | default value |
|----------------------------------| ------ | -------- |---------------------------------------------------------|
| fs.defaultFS | string | yes | - |
| path | string | yes | - |
| file_name_expression | string | no | "${transactionId}" |
| file_format | string | no | "text" |
| filename_time_format | string | no | "yyyy.MM.dd" |
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
| partition_by | array | no | - |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
| is_partition_field_write_in_file | boolean| no | false |
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean| no | true |
| save_mode | string | no | "error" |

### fs.defaultFS [string]

The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster`

### path [string]

The target dir path is required. The `hdfs file` starts with `hdfs://`.
The target dir path is required.

### file_name_expression [string]

Expand Down Expand Up @@ -125,7 +130,8 @@ For text file format
```bash

HdfsFile {
path="hdfs://mycluster/tmp/hive/warehouse/test2"
fs.defaultFS="hdfs://hadoopcluster"
path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
Expand All @@ -145,7 +151,8 @@ For parquet file format
```bash

HdfsFile {
path="hdfs://mycluster/tmp/hive/warehouse/test2"
fs.defaultFS="hdfs://hadoopcluster"
path="/tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
Expand All @@ -163,7 +170,8 @@ For orc file format
```bash

HdfsFile {
path="hdfs://mycluster/tmp/hive/warehouse/test2"
fs.defaultFS="hdfs://hadoopcluster"
path="/tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,36 @@

package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;

@AutoService(SeaTunnelSink.class)
public class HdfsFileSink extends AbstractFileSink {
public class HdfsFileSink extends BaseFileSink {

@Override
public String getPluginName() {
return FileSystemType.HDFS.getFileSystemPluginName();
}

@Override
public SinkFileSystemPlugin getSinkFileSystemPlugin() {
return new HdfsFileSinkPlugin();
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, FS_DEFAULT_NAME_KEY);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
}
super.prepare(pluginConfig);
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 12a1fd5

Please sign in to comment.