Skip to content

Commit

Permalink
[Feature][Kudu] Refactor Kudu functionality and Sink support CDC data. (
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl-Zhou-CN authored Oct 26, 2023
1 parent b3e1351 commit 22110eb
Show file tree
Hide file tree
Showing 33 changed files with 2,295 additions and 673 deletions.
152 changes: 113 additions & 39 deletions docs/en/connector-v2/sink/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,125 @@

> Kudu sink connector
## Description
## Support Kudu Version

Write data to Kudu.
- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

The tested kudu version is 1.11.1.
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| kudu_master | string | yes | - |
| kudu_table | string | yes | - |
| save_mode | string | yes | - |
| common-options | | no | - |

### kudu_master [string]

`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.

### kudu_table [string]

`kudu_table` The name of kudu table..

### save_mode [string]

Storage mode, we need support `overwrite` and `append`. `append` is now supported.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.

## Example

```bash

kudu {
kudu_master = "192.168.88.110:7051"
kudu_table = "studentlyhresultflink"
save_mode="append"
}

- [x] [cdc](../../concept/connector-v2-features.md)

## Data Type Mapping

| SeaTunnel Data type | kudu Data type |
|---------------------|--------------------------|
| BOOLEAN | BOOL |
| INT | INT8<br/>INT16<br/>INT32 |
| BIGINT | INT64 |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| TIMESTAMP | UNIXTIME_MICROS |
| BYTES | BINARY |

## Sink Options

| Name | Type | Required | Default | Description |
|-------------------------------------------|--------|----------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. |
| table_name | String | Yes | - | The name of kudu table. |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| enable_kerberos | Bool | No | false | Kerberos principal enable. |
| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. |
| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
| save_mode | String | No | - | Storage mode, support `overwrite` and `append`. |
| session_flush_mode | String | No | AUTO_FLUSH_SYNC | Kudu flush mode. Default AUTO_FLUSH_SYNC. |
| batch_size | Int | No | 1024 | The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100 |
| buffer_flush_interval | Int | No | 10000 | The flush interval mills, over this time, asynchronous threads will flush data. |
| ignore_not_found | Bool | No | false | If true, ignore all not found rows. |
| ignore_not_duplicate | Bool | No | false | If true, ignore all dulicate rows. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

> The following example refers to a FakeSource named "kudu" cdc write kudu table "kudu_sink_table"
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "kudu"
schema = {
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_BEFORE
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_AFTER
fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = DELETE
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
}
sink {
kudu{
source_table_name = "kudu"
kudu_masters = "kudu-master-cdc:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}
```

## Changelog
Expand Down
117 changes: 82 additions & 35 deletions docs/en/connector-v2/source/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,105 @@

> Kudu source connector
## Description
## Support Kudu Version

Used to read data from Kudu.
- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

The tested kudu version is 1.11.1.
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| kudu_master | string | yes | - |
| kudu_table | string | yes | - |
| columnsList | string | yes | - |
| common-options | | no | - |

### kudu_master [string]

`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.

### kudu_table [string]

`kudu_table` The name of kudu table..

### columnsList [string]

`columnsList` Specifies the column names of the table.
## Description

### common options
Used to read data from Kudu.

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
The tested kudu version is 1.11.1.

## Examples
## Data Type Mapping

| kudu Data type | SeaTunnel Data type |
|--------------------------|---------------------|
| BOOL | BOOLEAN |
| INT8<br/>INT16<br/>INT32 | INT |
| INT64 | BIGINT |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| UNIXTIME_MICROS | TIMESTAMP |
| BINARY | BYTES |

## Source Options

| Name | Type | Required | Default | Description |
|-------------------------------------------|--------|----------|------------------------------------------------|----------------------------------------------------------------------------------------------------------|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. |
| table_name | String | Yes | - | The name of kudu table. |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| enable_kerberos | Bool | No | false | Kerberos principal enable. |
| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. |
| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
| scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. |
| scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. |
| filter | Int | No | 1024 * 1024 | Kudu scan filter expressions,Not supported yet. |
| schema | Map | No | 1024 * 1024 | SeaTunnel Schema. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

> The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table"
```hocon
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Kudu {
result_table_name = "studentlyh2"
kudu_master = "192.168.88.110:7051"
kudu_table = "studentlyh2"
columnsList = "id,name,age,sex"
}
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu{
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table"
result_table_name = "kudu"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}
transform {
}
sink {
console {
source_table_name = "kudu"
}
kudu{
source_table_name = "kudu"
kudu_masters = "kudu-master:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
```

## Changelog
Expand Down
14 changes: 14 additions & 0 deletions seatunnel-connectors-v2/connector-kudu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,19 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 22110eb

Please sign in to comment.