Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Core] Support using upstream table placeholders in sink options and auto replacement #7131

Merged
merged 20 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/en/concept/connector-v2-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,7 @@ For sink connector, the sink connector supports exactly-once if any piece of dat
### cdc(change data capture)

If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture).

### support multiple table write

Supports write multiple tables in one SeaTunnel job, users can dynamically specify the table's identifier by [configuring placeholders](./sink-options-placeholders.md).
110 changes: 110 additions & 0 deletions docs/en/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Sink Options Placeholders

## Introduction

The SeaTunnel provides a sink options placeholders feature that allows you to get upstream table metadata through placeholders.

This functionality is essential when you need to dynamically get upstream table metadata (such as multi-table writes).

This document will guide you through the usage of these placeholders and how to leverage them effectively.

## Support Those Engines

> SeaTunnel Zeta<br/>
> Flink<br/>
> Spark<br/>

## Placeholder
hailin0 marked this conversation as resolved.
Show resolved Hide resolved

The placeholders are mainly controlled by the following expressions:

- `${database_name}`
- Used to get the database in the upstream catalog table
- Default values can also be specified via expressions:`${database_name:default_my_db}`
- `${schema_name}`
- Used to get the schema in the upstream catalog table
- Default values can also be specified via expressions:`${schema_name:default_my_schema}`
- `${table_name}`
- Used to get the table in the upstream catalog table
- Default values can also be specified via expressions:`${table_name:default_my_table}`
- `${schema_full_name}`
- Used to get the schema full path(database & schema) in the upstream catalog table
- `${table_full_name}`
- Used to get the table full path(database & schema & table) in the upstream catalog table
- `${primary_key}`
- Used to get the table primary-key fields in the upstream catalog table
- `${unique_key}`
- Used to get the table unique-key fields in the upstream catalog table
- `${field_names}`
- Used to get the table field keys in the upstream catalog table

## Configuration

*Requires*:
- Make sure the sink connector you are using has implemented `TableSinkFactory` API

### Example 1

```hocon
env {
// ignore...
}
source {
MySQL-CDC {
// ignore...
}
}

transform {
// ignore...
}

sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"

database = "${database_name}_test"
table = "${table_name}_test"
primary_keys = ["${primary_key}"]
}
}
```

### Example 2

```hocon
env {
// ignore...
}
source {
Oracle-CDC {
// ignore...
}
}

transform {
// ignore...
}

sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"

database = "${schema_name}_test"
table = "${table_name}_test"
primary_keys = ["${primary_key}"]
}
}
```

We will complete the placeholder replacement before the connector is started, ensuring that the sink options is ready before use.
If the variable is not replaced, it may be that the upstream table metadata is missing this option, for example:
- `mysql` source not contain `${schema_name}`
- `oracle` source not contain `${databse_name}`
- ...
94 changes: 92 additions & 2 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Description

Expand Down Expand Up @@ -76,7 +77,7 @@ and the default template can be modified according to the situation.
Default template:

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
Expand All @@ -93,7 +94,7 @@ DISTRIBUTED BY HASH (${rowtype_primary_key})
If a custom field is filled in the template, such as adding an `id` field

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
(
id,
${rowtype_fields}
Expand Down Expand Up @@ -323,6 +324,95 @@ sink {
}
```

### Multiple table

#### example1

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
database = "${database_name}_test"
table = "${table_name}_test"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
```

#### example2

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}

source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword

table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}

transform {
}

sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
database = "${schema_name}_test"
table = "${table_name}_test"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
```

## Changelog

### 2.3.0-beta 2022-10-20
Expand Down
22 changes: 19 additions & 3 deletions docs/en/connector-v2/sink/Druid.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Write data to Druid
## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Data Type Mapping

Expand Down Expand Up @@ -52,10 +53,25 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti

## Example

Simple example:

```hocon
sink {
Druid {
coordinatorUrl = "testHost:8888"
datasource = "seatunnel"
}
}
```

Use placeholders get upstream table metadata example:

```hocon
Druid {
coordinatorUrl = "testHost:8888"
datasource = "seatunnel"
sink {
Druid {
coordinatorUrl = "testHost:8888"
datasource = "${table_name}_test"
}
}
```

Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and h

## Key features

- [x] [support multiple table write](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)

By default, we use 2PC commit to ensure `exactly-once`
Expand Down
70 changes: 70 additions & 0 deletions docs/en/connector-v2/sink/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Description

Expand Down Expand Up @@ -56,6 +57,75 @@ Http {
}
```

### Multiple table

#### example1

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Http {
...
url = "http://localhost/test/${database_name}_test/${table_name}_test"
}
}
```

#### example2

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
Http {
...
url = "http://localhost/test/${schema_name}_test/${table_name}_test"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Loading
Loading