Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Kudu] Refactor Kudu functionality and Sink support CDC data. #5437

Merged
merged 14 commits into from
Oct 26, 2023
152 changes: 113 additions & 39 deletions docs/en/connector-v2/sink/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,125 @@

> Kudu sink connector

## Description
## Support Kudu Version

Write data to Kudu.
- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

The tested kudu version is 1.11.1.
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| kudu_master | string | yes | - |
| kudu_table | string | yes | - |
| save_mode | string | yes | - |
| common-options | | no | - |

### kudu_master [string]

`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.

### kudu_table [string]

`kudu_table` The name of kudu table..

### save_mode [string]

Storage mode, we need support `overwrite` and `append`. `append` is now supported.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.

## Example

```bash

kudu {
kudu_master = "192.168.88.110:7051"
kudu_table = "studentlyhresultflink"
save_mode="append"
}

- [x] [cdc](../../concept/connector-v2-features.md)

## Data Type Mapping

| SeaTunnel Data type | kudu Data type |
|---------------------|--------------------------|
| BOOLEAN | BOOL |
| INT | INT8<br/>INT16<br/>INT32 |
| BIGINT | INT64 |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| TIMESTAMP | UNIXTIME_MICROS |
| BYTES | BINARY |

## Sink Options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format have some error:


| Name | Type | Required | Default | Description |
|-------------------------------------------|--------|----------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. |
| table_name | String | Yes | - | The name of kudu table. |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| enable_kerberos | Bool | No | false | Kerberos principal enable. |
| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. |
| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
| save_mode | String | No | - | Storage mode, support `overwrite` and `append`. |
| session_flush_mode | String | No | AUTO_FLUSH_SYNC | Kudu flush mode. Default AUTO_FLUSH_SYNC. |
| batch_size | Int | No | 1024 | The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100 |
| buffer_flush_interval | Int | No | 10000 | The flush interval mills, over this time, asynchronous threads will flush data. |
| ignore_not_found | Bool | No | false | If true, ignore all not found rows. |
| ignore_not_duplicate | Bool | No | false | If true, ignore all dulicate rows. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

> The following example refers to a FakeSource named "kudu" cdc write kudu table "kudu_sink_table"

```hocon

env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "kudu"
schema = {
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_BEFORE
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_AFTER
fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = DELETE
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
}

sink {
kudu{
source_table_name = "kudu"
kudu_masters = "kudu-master-cdc:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add examples for kerberos enable.


## Changelog
Expand Down
117 changes: 82 additions & 35 deletions docs/en/connector-v2/source/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,105 @@

> Kudu source connector

## Description
## Support Kudu Version

Used to read data from Kudu.
- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

The tested kudu version is 1.11.1.
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| kudu_master | string | yes | - |
| kudu_table | string | yes | - |
| columnsList | string | yes | - |
| common-options | | no | - |

### kudu_master [string]

`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.

### kudu_table [string]

`kudu_table` The name of kudu table..

### columnsList [string]

`columnsList` Specifies the column names of the table.
## Description

### common options
Used to read data from Kudu.

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
The tested kudu version is 1.11.1.

## Examples
## Data Type Mapping

| kudu Data type | SeaTunnel Data type |
|--------------------------|---------------------|
| BOOL | BOOLEAN |
| INT8<br/>INT16<br/>INT32 | INT |
| INT64 | BIGINT |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| UNIXTIME_MICROS | TIMESTAMP |
| BINARY | BYTES |

## Source Options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format have some error.

Uploading image.png…


| Name | Type | Required | Default | Description |
|-------------------------------------------|--------|----------|------------------------------------------------|----------------------------------------------------------------------------------------------------------|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. |
| table_name | String | Yes | - | The name of kudu table. |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| enable_kerberos | Bool | No | false | Kerberos principal enable. |
| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. |
| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
| scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. |
| scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. |
| filter | Int | No | 1024 * 1024 | Kudu scan filter expressions,Not supported yet. |
| schema | Map | No | 1024 * 1024 | SeaTunnel Schema. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

> The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table"

```hocon
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}

source {
Kudu {
result_table_name = "studentlyh2"
kudu_master = "192.168.88.110:7051"
kudu_table = "studentlyh2"
columnsList = "id,name,age,sex"
}
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu{
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table"
result_table_name = "kudu"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}

transform {
}

sink {
console {
source_table_name = "kudu"
}

kudu{
source_table_name = "kudu"
kudu_masters = "kudu-master:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add kerberos enable examples.


## Changelog
Expand Down
14 changes: 14 additions & 0 deletions seatunnel-connectors-v2/connector-kudu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,19 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Loading
Loading