Skip to content

Commit

Permalink
[Improve][MulitTableSink] Refactor multi-table configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Jul 8, 2024
1 parent ae81879 commit 40d4d76
Show file tree
Hide file tree
Showing 32 changed files with 853 additions and 409 deletions.
14 changes: 14 additions & 0 deletions docs/en/concept/connector-v2-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,17 @@ For sink connector, the sink connector supports exactly-once if any piece of dat
### cdc(change data capture)

If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture).

### support multiple table write

Supports write multiple tables in one SeaTunnel job.

requires:
1. Support apis: `SupportMultiTableSink``SupportMultiTableSinkWriter`
2. Support the following table identifier placeholders in sink options.
- `${database_name}` Used to get the database in the upstream catalog table
- `${schema_name}` Used to get the schema in the upstream catalog table
- `${table_name}` Used to get the table in the upstream catalog table
- `${primary_key}` Used to get the table primary-key fields in the upstream catalog table
- `${unique_key}` Used to get the table unique-key fields in the upstream catalog table
- `${field_names}` Used to get the table field keys in the upstream catalog table
90 changes: 90 additions & 0 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Description

Expand Down Expand Up @@ -323,6 +324,95 @@ sink {
}
```

### Multiple table

#### example1

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
database = "${database_name}_test"
table = "${table_name}_test"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
```

#### example2

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
database = "${schema_name}_test"
table = "${table_name}_test"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
```

## Changelog

### 2.3.0-beta 2022-10-20
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and h

## Key features

- [x] [support multiple table write](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)

By default, we use 2PC commit to ensure `exactly-once`
Expand Down
70 changes: 70 additions & 0 deletions docs/en/connector-v2/sink/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Description

Expand Down Expand Up @@ -56,6 +57,75 @@ Http {
}
```

### Multiple table

#### example1

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Http {
...
url = "http://localhost/test/${database_name}_test/${table_name}_test"
}
}
```

#### example2

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
Http {
...
url = "http://localhost/test/${schema_name}_test/${table_name}_test"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
75 changes: 75 additions & 0 deletions docs/en/connector-v2/sink/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution.

## Key features

- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Supported DataSource Info

| Datasource | Dependent | Maven |
Expand Down Expand Up @@ -173,6 +177,77 @@ sink {
```

### Multiple table

#### example1

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Iceberg {
...
namespace = "${database_name}_test"
table = "${table_name}_test"
}
}
```

#### example2

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
Iceberg {
...
namespace = "${schema_name}_test"
table = "${table_name}_test"
}
}
```

## Changelog

### 2.3.4-SNAPSHOT 2024-01-18
Expand Down
34 changes: 34 additions & 0 deletions docs/en/connector-v2/sink/InfluxDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Write data to InfluxDB.
## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Options

Expand Down Expand Up @@ -100,6 +101,39 @@ sink {
```

### Multiple table

#### example1

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "${table_name}_test"
}
}
```

## Changelog

### next version
Expand Down
Loading

0 comments on commit 40d4d76

Please sign in to comment.