Skip to content

Commit

Permalink
[Feature][Connector-V2][Oss jindo] Add oss jindo source & sink connec…
Browse files Browse the repository at this point in the history
…tor (#3456)

* [Feature][Connector-V2][Oss-Jindo] Add oss-jindo source & sink connector

* [Feature][Connector-V2][Oss-Jindo] Update OssConf and pom

* [Feature][Connector-V2][Oss-Jindo] Update exception

* [Feature][Connector-V2][Oss-Jindo] Unified exception & add option factory
  • Loading branch information
TyrantLucifer authored Dec 8, 2022
1 parent 976a8ca commit 2507372
Show file tree
Hide file tree
Showing 16 changed files with 1,004 additions and 0 deletions.
215 changes: 215 additions & 0 deletions docs/en/connector-v2/sink/OssJindoFile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# OssFile

> OssJindo file sink connector
## Description

Output data to oss file system using jindo api.

> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OSS and this connector need some hadoop dependencies.
## Key features

- [x] [exactly-once](../../concept/connector-v2-features.md)

By default, we use 2PC commit to ensure `exactly-once`

- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
- [x] parquet
- [x] orc
- [x] json

## Options

| name | type | required | default value |
|----------------------------------|---------|----------|-----------------------------------------------------------|
| path | string | yes | - |
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
| endpoint | string | yes | - |
| file_name_expression | string | no | "${transactionId}" |
| file_format | string | no | "text" |
| filename_time_format | string | no | "yyyy.MM.dd" |
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
| partition_by | array | no | - |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
| is_partition_field_write_in_file | boolean | no | false |
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true |
| common-options | | no | - |

### path [string]

The target dir path is required.

### bucket [string]

The bucket address of oss file system, for example: `oss://tyrantlucifer-image-bed`

### access_key [string]

The access key of oss file system.

### access_secret [string]

The access secret of oss file system.

### endpoint [string]

The endpoint of oss file system.

### file_name_expression [string]

`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,
`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`.

Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.

### file_format [string]

We supported as the following file types:

`text` `csv` `parquet` `orc` `json`

Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`.

### filename_time_format [string]

When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:

| Symbol | Description |
| ------ | ------------------ |
| y | Year |
| M | Month |
| d | Day of month |
| H | Hour in day (0-23) |
| m | Minute in hour |
| s | Second in minute |

See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax.

### field_delimiter [string]

The separator between columns in a row of data. Only needed by `text` and `csv` file format.

### row_delimiter [string]

The separator between rows in a file. Only needed by `text` and `csv` file format.

### partition_by [array]

Partition data based on selected fields

### partition_dir_expression [string]

If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory.

Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field.

### is_partition_field_write_in_file [boolean]

If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be written into data file.

For example, if you want to write a Hive Data File, Its value should be `false`.

### sink_columns [array]

Which columns need be written to file, default value is all the columns get from `Transform` or `Source`.
The order of the fields determines the order in which the file is actually written.

### is_enable_transaction [boolean]

If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.

Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.

Only support `true` now.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.

## Example

For text file format

```hocon
OssFile {
path="/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
field_delimiter = "\t"
row_delimiter = "\n"
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format = "text"
sink_columns = ["name","age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
}
```

For parquet file format

```hocon
OssFile {
path = "/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
field_delimiter = "\t"
row_delimiter = "\n"
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format = "parquet"
sink_columns = ["name","age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
}
```

For orc file format

```bash

OssFile {
path="/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
field_delimiter = "\t"
row_delimiter = "\n"
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format = "orc"
sink_columns = ["name","age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
}

```

## Changelog

### Next version

- Add OSS Jindo File Sink Connector
Loading

0 comments on commit 2507372

Please sign in to comment.